From 6a112a2a40b921dcc3066e58af48dfec8b4153da Mon Sep 17 00:00:00 2001 From: FernTheDev <15272073+Fernthedev@users.noreply.github.com> Date: Wed, 30 Jun 2021 22:15:26 -0400 Subject: [PATCH 1/4] Add initial socket work, no testing or packet logic yet --- .../Interfaces/IKittenRawSocketProvider.cs | 11 + CatCore/Services/KittenRawSocketProvider.cs | 206 ++++++++++++++++++ CatCore/Services/Sockets/ClientSocket.cs | 123 +++++++++++ CatCore/Services/Sockets/Packet.cs | 20 ++ CatCore/Services/Sockets/ReceivedData.cs | 35 +++ 5 files changed, 395 insertions(+) create mode 100644 CatCore/Services/Interfaces/IKittenRawSocketProvider.cs create mode 100644 CatCore/Services/KittenRawSocketProvider.cs create mode 100644 CatCore/Services/Sockets/ClientSocket.cs create mode 100644 CatCore/Services/Sockets/Packet.cs create mode 100644 CatCore/Services/Sockets/ReceivedData.cs diff --git a/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs b/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs new file mode 100644 index 00000000..b33956e9 --- /dev/null +++ b/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading; +using CatCore.Models.Config; + +namespace CatCore.Services.Interfaces +{ + internal interface IKittenRawSocketProvider : INeedInitialization, IDisposable + { + bool isServerRunning(); + } +} \ No newline at end of file diff --git a/CatCore/Services/KittenRawSocketProvider.cs b/CatCore/Services/KittenRawSocketProvider.cs new file mode 100644 index 00000000..4ccf6d36 --- /dev/null +++ b/CatCore/Services/KittenRawSocketProvider.cs @@ -0,0 +1,206 @@ +using System; +using System.Collections.Concurrent; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using CatCore.Services.Interfaces; +using CatCore.Services.Sockets; +using Serilog; + +namespace CatCore.Services +{ + // Just copied sample code for socket server + internal class KittenRawSocketProvider : IKittenRawSocketProvider + { + private const int SOCKET_PORT = 8338; + + // Not sure if concurrent is needed here + private readonly ConcurrentDictionary _connectedClients = new ConcurrentDictionary(); + + // Thread signal. + private readonly SemaphoreSlim _allDone = new SemaphoreSlim(1, 1); + private readonly ILogger _logger; + + private bool _isServerRunning; + + internal CancellationTokenSource? ServerCts { get; private set; } + +#pragma warning disable 649 + public Action? OnConnect; + public Action? OnReceive; + public Action? OnDisconnect; +#pragma warning restore 649 + + public KittenRawSocketProvider(ILogger logger) + { + _logger = logger; + } + + private void ValidateServerNotRunning() + { + if (_allDone.CurrentCount > 0 || _isServerRunning || !ServerCts?.IsCancellationRequested != null) + { + throw new InvalidOperationException("The server is still running, what is wrong with you? The poor kitty can't handle two socket servers! ;-;"); + } + } + + private async void StartListening(CancellationTokenSource cts) + { + ServerCts = cts; + + // Establish the local endpoint for the socket. + // The DNS name of the computer + // running the listener is "host.contoso.com". + IPHostEntry ipHostInfo = await Dns.GetHostEntryAsync(Dns.GetHostName()); + IPAddress ipAddress = ipHostInfo.AddressList[0]; + IPEndPoint localEndPoint = new IPEndPoint(ipAddress, SOCKET_PORT); + + // Create a TCP/IP socket. + Socket listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + + // Bind the socket to the local endpoint and listen for incoming connections. + try + { + listener.Bind(localEndPoint); + listener.Listen(20); //back log is amount of clients allowed to wait + + _isServerRunning = true; + while (!ServerCts.IsCancellationRequested) + { + // Set the event to nonsignaled state. + _allDone.Release(); + + // Start an asynchronous socket to listen for connections. + _logger.Information("Waiting for a connection..."); + listener.BeginAccept( + AcceptCallback, + listener); + + + // Wait until a connection is made before continuing. + // this avoids eating CPU cycles + await _allDone.WaitAsync(ServerCts.Token).ConfigureAwait(false); + } + + } + catch (Exception e) + { + _logger.Fatal(e.Message, e); + } + + _isServerRunning = false; + } + + private void AcceptCallback(IAsyncResult ar) + { + // Signal the main thread to continue. + _allDone.Release(); + + if (ServerCts is null || ServerCts.IsCancellationRequested) + { + return; + } + + // Get the socket that handles the client request. + Socket listener = (Socket) ar.AsyncState; + Socket handler = listener.EndAccept(ar); + + var guid = Guid.NewGuid(); + + // Never have a duplicate + while (_connectedClients.ContainsKey(guid)) + { + guid = Guid.NewGuid(); + } + + ClientSocket clientSocket = new ClientSocket(handler, guid, ServerCts!, HandleDisconnect); + _connectedClients[guid] = clientSocket; + + + + OnConnect?.Invoke(clientSocket); + + // Create the state object. + ReceivedData state = new ReceivedData(clientSocket); + handler.BeginReceive(state.Buffer, 0, ReceivedData.BUFFER_SIZE, 0, ReadCallback, state); + } + + private void ReadCallback(IAsyncResult ar) + { + + // Retrieve the state object and the handler socket + // from the asynchronous state object. + ReceivedData state = (ReceivedData) ar.AsyncState; + ClientSocket handler = state.ClientSocket; + + if (!handler.WorkSocket.Connected) + { + HandleDisconnect(handler); + return; + } + + // Read data from the client socket. + var bytesRead = handler.WorkSocket.EndReceive(ar); + + // If 0, no more data is coming + if (bytesRead <= 0) + { + return; + } + + // There might be more data, so store the data received so far. + state.ReceivedDataStr.Append(Encoding.UTF8.GetString(state.Buffer, 0, bytesRead)); + + // Check for end-of-file tag. If it is not there, read + // more data. + string content = state.ReceivedDataStr.ToString(); + if (content.IndexOf("\n", StringComparison.Ordinal) > -1) + { + OnReceive?.Invoke(handler, state); + } + else + { + // Not all data received. Get more. + // Not all data gets received at once, so keep checking for more + handler.WorkSocket.BeginReceive(state.Buffer, 0, ReceivedData.BUFFER_SIZE, 0, ReadCallback, state); + } + } + + private void HandleDisconnect(ClientSocket clientSocket) + { + var socket = clientSocket.WorkSocket; + + if (socket.Connected) + { + socket.Shutdown(SocketShutdown.Both); + socket.Close(); + } + + _connectedClients.TryRemove(clientSocket.Uuid, out _); + } + + public void Initialize() + { + ValidateServerNotRunning(); + + ServerCts = new CancellationTokenSource(); + Task.Run(() => + { + StartListening(ServerCts); + }); + } + + public void Dispose() + { + + ServerCts!.Dispose(); + } + + public bool isServerRunning() + { + return _isServerRunning; + } + } +} \ No newline at end of file diff --git a/CatCore/Services/Sockets/ClientSocket.cs b/CatCore/Services/Sockets/ClientSocket.cs new file mode 100644 index 00000000..58215ac9 --- /dev/null +++ b/CatCore/Services/Sockets/ClientSocket.cs @@ -0,0 +1,123 @@ +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace CatCore.Services.Sockets +{ + + + // Not much, just represents a client + public class ClientSocket + { + // Client socket. + public readonly Socket WorkSocket; + public readonly Guid Uuid; + + private readonly BlockingCollection _packetsToSend = new BlockingCollection(); + private readonly SemaphoreSlim _sendSemaphore = new SemaphoreSlim(1,1); + + private readonly Action _onClose; + + public ClientSocket(Socket workSocket, Guid uuid, CancellationTokenSource cts, Action onClose) + { + Uuid = uuid; + WorkSocket = workSocket; + _onClose = onClose; + + Task.Run(async () => { + await SendTaskLoop(cts); + }, cts.Token); + } + + private async Task SendTaskLoop(CancellationTokenSource cts) + { + while (!cts.IsCancellationRequested) + { + // Stop trying to send data + if (!WorkSocket.Connected) + { + return; + } + + if (!_packetsToSend.TryTake(out var packet)) + { + await Task.Yield(); + } + + var bytesToSend = Encoding.UTF8.GetBytes(packet.ToJson()); + SendData(bytesToSend); + + await _sendSemaphore.WaitAsync(cts.Token); + } + } + + private void SendCallback(IAsyncResult ar) + { + try + { + // Retrieve the socket from the state object. + Socket handler = (Socket) ar.AsyncState; + + // Complete sending the data to the remote device. + handler.EndSend(ar); + + _sendSemaphore.Release(); + } + catch (Exception e) + { + // todo: use logger? + Console.WriteLine(e.ToString()); + } + } + + private void SendData(byte[] data) + { + // synchronous sending + // var bytesSent = WorkSocket.Send(data.ToArray()); + // + // if (bytesSent < data.Length) + // { + // SendData(data.Slice(bytesSent, data.Length).ToArray()); + // } + + // async sending + // Begin sending the data to the remote device. + WorkSocket.BeginSend(data, 0, data.Length, 0, SendCallback, WorkSocket); + } + + public async Task QueueSend(Packet packet) + { + if (!WorkSocket.Connected) + { + throw new IOException("Socket has been closed!"); + } + + // Avoid blocking, is this overkill? + await Task.Run(() => + { + _packetsToSend.Add(packet); + }); + } + + public void Close() + { + try + { + _sendSemaphore.Release(); + _sendSemaphore.Dispose(); + } + catch (Exception e) + { + // todo: use logger? + Console.WriteLine(e.ToString()); + } + + _onClose.Invoke(this); + } + } + +} \ No newline at end of file diff --git a/CatCore/Services/Sockets/Packet.cs b/CatCore/Services/Sockets/Packet.cs new file mode 100644 index 00000000..ab1ef963 --- /dev/null +++ b/CatCore/Services/Sockets/Packet.cs @@ -0,0 +1,20 @@ +using System.Text.Json; + +namespace CatCore.Services.Sockets +{ + /// + /// Abstract class to define packet types + /// + public abstract class Packet + { + public string ToJson() + { + return JsonSerializer.Serialize(this); + } + + public static T? FromJson(string json) where T : Packet + { + return JsonSerializer.Deserialize(json); + } + } +} \ No newline at end of file diff --git a/CatCore/Services/Sockets/ReceivedData.cs b/CatCore/Services/Sockets/ReceivedData.cs new file mode 100644 index 00000000..1aab039b --- /dev/null +++ b/CatCore/Services/Sockets/ReceivedData.cs @@ -0,0 +1,35 @@ +using System; +using System.Net.Sockets; +using System.Text; + +namespace CatCore.Services.Sockets +{ + // State object for reading client data asynchronously + public class ReceivedData + { + // Size of receive buffer. + public const int BUFFER_SIZE = 4096; + + // Receive buffer. + // the buffer is used to store the received bytes temporarily + // and cleared when they are later parsed into receivedData + public readonly byte[] Buffer = new byte[BUFFER_SIZE]; + + // Received data string. + public readonly StringBuilder ReceivedDataStr = new StringBuilder(); + + // Client socket. + public readonly ClientSocket ClientSocket; + + public readonly Guid ClientUuid; + + public readonly Socket WorkSocket; + + public ReceivedData(ClientSocket clientSocket) + { + ClientSocket = clientSocket; + ClientUuid = clientSocket.Uuid; + WorkSocket = clientSocket.WorkSocket; + } + } +} \ No newline at end of file From 3dd778f6eaeb772ae698101056cd182e057682b4 Mon Sep 17 00:00:00 2001 From: FernTheDev <15272073+Fernthedev@users.noreply.github.com> Date: Thu, 1 Jul 2021 20:59:27 -0400 Subject: [PATCH 2/4] Started testing and fixed bugs, not nearly finalized and lots of ugly code. Receiving and sending works --- CatCore/CatCoreInstance.cs | 29 +++++ .../Interfaces/IKittenRawSocketProvider.cs | 11 +- CatCore/Services/KittenRawSocketProvider.cs | 103 +++++++----------- CatCore/Services/Sockets/ClientSocket.cs | 68 +++++++++++- CatCore/Services/Sockets/Packet.cs | 81 +++++++++++++- 5 files changed, 216 insertions(+), 76 deletions(-) diff --git a/CatCore/CatCoreInstance.cs b/CatCore/CatCoreInstance.cs index ebdb6776..ecdf9f7c 100644 --- a/CatCore/CatCoreInstance.cs +++ b/CatCore/CatCoreInstance.cs @@ -12,11 +12,14 @@ using CatCore.Services; using CatCore.Services.Interfaces; using CatCore.Services.Multiplexer; +using CatCore.Services.Sockets; +using CatCore.Services.Sockets.Packets; using CatCore.Services.Twitch; using CatCore.Services.Twitch.Interfaces; using CatCore.Services.Twitch.Media; using DryIoc; using JetBrains.Annotations; +using ImTools; using Serilog; using Serilog.Events; using Serilog.Formatting.Display; @@ -153,6 +156,32 @@ private void CreateContainer() _container.Register(Reuse.Singleton); _container.Register(Reuse.Singleton); + // Register socket services + _container.Register(Reuse.Singleton); + _container.RegisterInitializer((service, context) => service.Initialize()); + + _ = Task.Run(() => + { + var socket = _container.Resolve(); + socket.Initialize(); + socket.OnConnect += clientSocket => + { + Log.Logger.Information($"Client connected from {clientSocket.WorkSocket.RemoteEndPoint} and {clientSocket.Uuid}"); + }; + + socket.OnReceive += (clientSocket, data) => + { + Log.Logger.Information($"Received from {clientSocket.Uuid}: {data.ReceivedDataStr}"); + + var _ = clientSocket.QueueSend(new RespondHello(data.ReceivedDataStr.ToString())); + }; + + socket.OnDisconnect += clientSocket => + { + Log.Logger.Information($"Client disconnected from {clientSocket.WorkSocket.RemoteEndPoint} and {clientSocket.Uuid}"); + }; + }); + // Spin up internal web api service _ = Task.Run(() => { diff --git a/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs b/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs index b33956e9..851a43ad 100644 --- a/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs +++ b/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs @@ -1,11 +1,18 @@ using System; -using System.Threading; -using CatCore.Models.Config; +using CatCore.Services.Sockets; namespace CatCore.Services.Interfaces { internal interface IKittenRawSocketProvider : INeedInitialization, IDisposable { bool isServerRunning(); + +#pragma warning disable 649 + event Action? OnConnect; + event Action? OnReceive; + event Action? OnDisconnect; +#pragma warning restore 649 + + } } \ No newline at end of file diff --git a/CatCore/Services/KittenRawSocketProvider.cs b/CatCore/Services/KittenRawSocketProvider.cs index 4ccf6d36..0563524c 100644 --- a/CatCore/Services/KittenRawSocketProvider.cs +++ b/CatCore/Services/KittenRawSocketProvider.cs @@ -2,7 +2,6 @@ using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; -using System.Text; using System.Threading; using System.Threading.Tasks; using CatCore.Services.Interfaces; @@ -19,8 +18,13 @@ internal class KittenRawSocketProvider : IKittenRawSocketProvider // Not sure if concurrent is needed here private readonly ConcurrentDictionary _connectedClients = new ConcurrentDictionary(); + private static SemaphoreSlim NewSemaphore() + { + return new SemaphoreSlim(0, 1); + } + // Thread signal. - private readonly SemaphoreSlim _allDone = new SemaphoreSlim(1, 1); + private SemaphoreSlim _allDone = NewSemaphore(); private readonly ILogger _logger; private bool _isServerRunning; @@ -28,9 +32,9 @@ internal class KittenRawSocketProvider : IKittenRawSocketProvider internal CancellationTokenSource? ServerCts { get; private set; } #pragma warning disable 649 - public Action? OnConnect; - public Action? OnReceive; - public Action? OnDisconnect; + public event Action? OnConnect; + public event Action? OnReceive; + public event Action? OnDisconnect; #pragma warning restore 649 public KittenRawSocketProvider(ILogger logger) @@ -38,12 +42,9 @@ public KittenRawSocketProvider(ILogger logger) _logger = logger; } - private void ValidateServerNotRunning() + private bool ValidateServerNotRunning() { - if (_allDone.CurrentCount > 0 || _isServerRunning || !ServerCts?.IsCancellationRequested != null) - { - throw new InvalidOperationException("The server is still running, what is wrong with you? The poor kitty can't handle two socket servers! ;-;"); - } + return !_isServerRunning && !ServerCts?.IsCancellationRequested == null; } private async void StartListening(CancellationTokenSource cts) @@ -51,10 +52,7 @@ private async void StartListening(CancellationTokenSource cts) ServerCts = cts; // Establish the local endpoint for the socket. - // The DNS name of the computer - // running the listener is "host.contoso.com". - IPHostEntry ipHostInfo = await Dns.GetHostEntryAsync(Dns.GetHostName()); - IPAddress ipAddress = ipHostInfo.AddressList[0]; + IPAddress ipAddress = IPAddress.Any; IPEndPoint localEndPoint = new IPEndPoint(ipAddress, SOCKET_PORT); // Create a TCP/IP socket. @@ -63,15 +61,16 @@ private async void StartListening(CancellationTokenSource cts) // Bind the socket to the local endpoint and listen for incoming connections. try { + _logger.Information($"Binding to port {localEndPoint.Address.MapToIPv4()}:{localEndPoint.Port}"); listener.Bind(localEndPoint); listener.Listen(20); //back log is amount of clients allowed to wait + // Set the event to nonsignaled state. + _allDone = NewSemaphore(); + _isServerRunning = true; while (!ServerCts.IsCancellationRequested) { - // Set the event to nonsignaled state. - _allDone.Release(); - // Start an asynchronous socket to listen for connections. _logger.Information("Waiting for a connection..."); listener.BeginAccept( @@ -87,7 +86,7 @@ private async void StartListening(CancellationTokenSource cts) } catch (Exception e) { - _logger.Fatal(e.Message, e); + _logger.Fatal(e.Message, e,ToString()); } _isServerRunning = false; @@ -115,59 +114,18 @@ private void AcceptCallback(IAsyncResult ar) guid = Guid.NewGuid(); } - ClientSocket clientSocket = new ClientSocket(handler, guid, ServerCts!, HandleDisconnect); + ClientSocket clientSocket = new ClientSocket(handler, guid, ServerCts!, HandleDisconnect, HandleRead); _connectedClients[guid] = clientSocket; - - OnConnect?.Invoke(clientSocket); - - // Create the state object. - ReceivedData state = new ReceivedData(clientSocket); - handler.BeginReceive(state.Buffer, 0, ReceivedData.BUFFER_SIZE, 0, ReadCallback, state); } - private void ReadCallback(IAsyncResult ar) + private void HandleRead(ClientSocket clientSocket, ReceivedData receivedData) { - - // Retrieve the state object and the handler socket - // from the asynchronous state object. - ReceivedData state = (ReceivedData) ar.AsyncState; - ClientSocket handler = state.ClientSocket; - - if (!handler.WorkSocket.Connected) - { - HandleDisconnect(handler); - return; - } - - // Read data from the client socket. - var bytesRead = handler.WorkSocket.EndReceive(ar); - - // If 0, no more data is coming - if (bytesRead <= 0) - { - return; - } - - // There might be more data, so store the data received so far. - state.ReceivedDataStr.Append(Encoding.UTF8.GetString(state.Buffer, 0, bytesRead)); - - // Check for end-of-file tag. If it is not there, read - // more data. - string content = state.ReceivedDataStr.ToString(); - if (content.IndexOf("\n", StringComparison.Ordinal) > -1) - { - OnReceive?.Invoke(handler, state); - } - else - { - // Not all data received. Get more. - // Not all data gets received at once, so keep checking for more - handler.WorkSocket.BeginReceive(state.Buffer, 0, ReceivedData.BUFFER_SIZE, 0, ReadCallback, state); - } + OnReceive?.Invoke(clientSocket, receivedData); } + private void HandleDisconnect(ClientSocket clientSocket) { var socket = clientSocket.WorkSocket; @@ -179,16 +137,31 @@ private void HandleDisconnect(ClientSocket clientSocket) } _connectedClients.TryRemove(clientSocket.Uuid, out _); + + OnDisconnect?.Invoke(clientSocket); } public void Initialize() { - ValidateServerNotRunning(); + if (!ValidateServerNotRunning()) + { + _logger.Warning("(This can be ignored if intentional) The server is still running, what is wrong with you? The poor kitty can't handle two socket servers! ;-;"); + return; + } + + _logger.Information("Starting socket server"); ServerCts = new CancellationTokenSource(); Task.Run(() => { - StartListening(ServerCts); + try + { + StartListening(ServerCts); + } + catch (Exception e) + { + _logger.Error(e.Message, e.ToString()); + } }); } diff --git a/CatCore/Services/Sockets/ClientSocket.cs b/CatCore/Services/Sockets/ClientSocket.cs index 58215ac9..999da02d 100644 --- a/CatCore/Services/Sockets/ClientSocket.cs +++ b/CatCore/Services/Sockets/ClientSocket.cs @@ -3,8 +3,10 @@ using System.IO; using System.Net.Sockets; using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using CatCore.Services.Sockets.Packets; namespace CatCore.Services.Sockets { @@ -18,19 +20,28 @@ public class ClientSocket public readonly Guid Uuid; private readonly BlockingCollection _packetsToSend = new BlockingCollection(); - private readonly SemaphoreSlim _sendSemaphore = new SemaphoreSlim(1,1); + private readonly SemaphoreSlim _sendSemaphore = new SemaphoreSlim(0,1); + private readonly SemaphoreSlim _receiveSemaphore = new SemaphoreSlim(0,1); private readonly Action _onClose; + private readonly Action _onRead; - public ClientSocket(Socket workSocket, Guid uuid, CancellationTokenSource cts, Action onClose) + public ClientSocket(Socket workSocket, Guid uuid, CancellationTokenSource cts, Action onClose, Action onReceive) { Uuid = uuid; WorkSocket = workSocket; _onClose = onClose; + _onRead = onReceive; Task.Run(async () => { await SendTaskLoop(cts); }, cts.Token); + + Task.Run(async () => + { + await ReceiveTaskLoop(cts); + }, cts.Token); + } private async Task SendTaskLoop(CancellationTokenSource cts) @@ -46,15 +57,66 @@ private async Task SendTaskLoop(CancellationTokenSource cts) if (!_packetsToSend.TryTake(out var packet)) { await Task.Yield(); + continue; } - var bytesToSend = Encoding.UTF8.GetBytes(packet.ToJson()); + var bytesToSend = JsonSerializer.SerializeToUtf8Bytes(packet, packet.GetType()); SendData(bytesToSend); await _sendSemaphore.WaitAsync(cts.Token); } } + private async Task ReceiveTaskLoop(CancellationTokenSource cts) + { + while (!cts.IsCancellationRequested) + { + // Stop trying to receive data + if (!WorkSocket.Connected) + { + return; + } + + // Create the state object. + ReceivedData state = new ReceivedData(this); + WorkSocket.BeginReceive(state.Buffer, 0, ReceivedData.BUFFER_SIZE, 0, ReadCallback, state); + + await _receiveSemaphore.WaitAsync(cts.Token); + } + } + + + private void ReadCallback(IAsyncResult ar) + { + // Retrieve the state object and the handler socket + // from the asynchronous state object. + ReceivedData state = (ReceivedData) ar.AsyncState; + ClientSocket handler = state.ClientSocket; + + if (!handler.WorkSocket.Connected) + { + Close(); + return; + } + + // Read data from the client socket. + var bytesRead = handler.WorkSocket.EndReceive(ar); + + // If 0, no more data is coming + if (bytesRead <= 0) + { + _receiveSemaphore.Release(); + _onRead(this, state); + return; + } + + // There might be more data, so store the data received so far. + state.ReceivedDataStr.Append(Encoding.UTF8.GetString(state.Buffer, 0, bytesRead)); + + _receiveSemaphore.Release(); + _onRead(this, state); + } + private void SendCallback(IAsyncResult ar) { try diff --git a/CatCore/Services/Sockets/Packet.cs b/CatCore/Services/Sockets/Packet.cs index ab1ef963..9f185be8 100644 --- a/CatCore/Services/Sockets/Packet.cs +++ b/CatCore/Services/Sockets/Packet.cs @@ -1,20 +1,89 @@ -using System.Text.Json; +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading.Tasks; -namespace CatCore.Services.Sockets +namespace CatCore.Services.Sockets.Packets { /// /// Abstract class to define packet types /// public abstract class Packet { - public string ToJson() + // Force packet name to be compile time constant + public string PacketName => GetType().Name; + + + /// This is probably overkill but WHO CARES? WHY WOULD ANYONE CARE BECAUSE IT'S ASYNC SO IT MUST BE FAST! + /// FAST LIKE FAST AND FURIOUS! IDK I NEVER WATCHED THE MOVIES + public static async Task GetPacketFromJson(string json) { - return JsonSerializer.Serialize(this); + if (string.IsNullOrEmpty(json)) + { + return null; + } + + return await Task.Run((() => + { + var jsonNode = JsonSerializer.Deserialize>(json); + + if (jsonNode == null) + { + return null; + } + + if (!jsonNode.TryGetValue(nameof(PacketName), out var packetNameElm)) + { + return null; + } + + var packetName = packetNameElm.GetString(); + + if (packetName == null) + { + return null; + } + + var type = GetPacketTypeByName(packetName!); + + return type == null ? null : JsonSerializer.Deserialize(json, type!) as Packet; + })); } - public static T? FromJson(string json) where T : Packet + private static Type? GetPacketTypeByName(string name) { - return JsonSerializer.Deserialize(json); + var type = Type.GetType($"{typeof(Packet).Namespace}.{name}"); + + if (type == null || type == typeof(Packet) || !typeof(Packet).IsAssignableFrom(type.BaseType)) + { + return null; + } + + return type; } } + + public class GetHello : Packet + { + [JsonConstructor] + private GetHello(string hello) + { + this.Hello = hello; + } + + public string Hello { get; } + } + + public class RespondHello : Packet + { + [JsonConstructor] + public RespondHello(string helloToSend) + { + HelloBack = helloToSend; + } + + [JsonInclude] + public string HelloBack { get; } + } } \ No newline at end of file From a00b35a0a014c66baa301f8c5f4dc56928b6f738 Mon Sep 17 00:00:00 2001 From: FernTheDev <15272073+Fernthedev@users.noreply.github.com> Date: Thu, 25 Nov 2021 15:03:24 -0400 Subject: [PATCH 3/4] Far better networking logic, lots of useless printing and yet to decide on JSON or Protobuf. Let's not use a delimiter hopefully --- CatCore/CatCoreInstance.cs | 6 +- .../Interfaces/IKittenRawSocketProvider.cs | 3 +- CatCore/Services/KittenRawSocketProvider.cs | 21 +- CatCore/Services/Sockets/ClientSocket.cs | 231 ++++++++++-------- CatCore/Services/Sockets/Packet.cs | 55 +++-- CatCore/Services/Sockets/ReceivedData.cs | 35 --- 6 files changed, 184 insertions(+), 167 deletions(-) delete mode 100644 CatCore/Services/Sockets/ReceivedData.cs diff --git a/CatCore/CatCoreInstance.cs b/CatCore/CatCoreInstance.cs index ecdf9f7c..32b07e94 100644 --- a/CatCore/CatCoreInstance.cs +++ b/CatCore/CatCoreInstance.cs @@ -169,11 +169,11 @@ private void CreateContainer() Log.Logger.Information($"Client connected from {clientSocket.WorkSocket.RemoteEndPoint} and {clientSocket.Uuid}"); }; - socket.OnReceive += (clientSocket, data) => + socket.OnReceive += (clientSocket, data, str) => { - Log.Logger.Information($"Received from {clientSocket.Uuid}: {data.ReceivedDataStr}"); + Log.Logger.Information($"Received from {clientSocket.Uuid}: {str}"); - var _ = clientSocket.QueueSend(new RespondHello(data.ReceivedDataStr.ToString())); + clientSocket.QueueSend(new RespondHello(str)); }; socket.OnDisconnect += clientSocket => diff --git a/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs b/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs index 851a43ad..6af6983a 100644 --- a/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs +++ b/CatCore/Services/Interfaces/IKittenRawSocketProvider.cs @@ -1,5 +1,6 @@ using System; using CatCore.Services.Sockets; +using CatCore.Services.Sockets.Packets; namespace CatCore.Services.Interfaces { @@ -9,7 +10,7 @@ internal interface IKittenRawSocketProvider : INeedInitialization, IDisposable #pragma warning disable 649 event Action? OnConnect; - event Action? OnReceive; + event Action? OnReceive; event Action? OnDisconnect; #pragma warning restore 649 diff --git a/CatCore/Services/KittenRawSocketProvider.cs b/CatCore/Services/KittenRawSocketProvider.cs index 0563524c..63f7302b 100644 --- a/CatCore/Services/KittenRawSocketProvider.cs +++ b/CatCore/Services/KittenRawSocketProvider.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using CatCore.Services.Interfaces; using CatCore.Services.Sockets; +using CatCore.Services.Sockets.Packets; using Serilog; namespace CatCore.Services @@ -33,7 +34,7 @@ private static SemaphoreSlim NewSemaphore() #pragma warning disable 649 public event Action? OnConnect; - public event Action? OnReceive; + public event Action? OnReceive; public event Action? OnDisconnect; #pragma warning restore 649 @@ -63,7 +64,7 @@ private async void StartListening(CancellationTokenSource cts) { _logger.Information($"Binding to port {localEndPoint.Address.MapToIPv4()}:{localEndPoint.Port}"); listener.Bind(localEndPoint); - listener.Listen(20); //back log is amount of clients allowed to wait + listener.Listen(2); //back log is amount of clients allowed to wait // Set the event to nonsignaled state. _allDone = NewSemaphore(); @@ -86,7 +87,7 @@ private async void StartListening(CancellationTokenSource cts) } catch (Exception e) { - _logger.Fatal(e.Message, e,ToString()); + _logger.Error(e, "Failed to start Kitty socket listening server"); } _isServerRunning = false; @@ -120,9 +121,17 @@ private void AcceptCallback(IAsyncResult ar) OnConnect?.Invoke(clientSocket); } - private void HandleRead(ClientSocket clientSocket, ReceivedData receivedData) + private void HandleRead(ClientSocket clientSocket, string receivedData) { - OnReceive?.Invoke(clientSocket, receivedData); + var p = Packet.TryGetPacketFromJson(receivedData, out _); + + if (p is null) + { + Log.Logger.Warning($"Unable to parse packet from {clientSocket.Uuid}, type is unknown"); + // return; + } + + OnReceive?.Invoke(clientSocket, p, receivedData); } @@ -160,7 +169,7 @@ public void Initialize() } catch (Exception e) { - _logger.Error(e.Message, e.ToString()); + _logger.Error(e, "Failed to start Kitty socket server"); } }); } diff --git a/CatCore/Services/Sockets/ClientSocket.cs b/CatCore/Services/Sockets/ClientSocket.cs index 999da02d..a64181ef 100644 --- a/CatCore/Services/Sockets/ClientSocket.cs +++ b/CatCore/Services/Sockets/ClientSocket.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.IO; +using System.Linq; using System.Net.Sockets; using System.Text; using System.Text.Json; @@ -19,166 +20,200 @@ public class ClientSocket public readonly Socket WorkSocket; public readonly Guid Uuid; - private readonly BlockingCollection _packetsToSend = new BlockingCollection(); - private readonly SemaphoreSlim _sendSemaphore = new SemaphoreSlim(0,1); - private readonly SemaphoreSlim _receiveSemaphore = new SemaphoreSlim(0,1); - + private readonly BlockingCollection _packetsToSend = new(); private readonly Action _onClose; - private readonly Action _onRead; + private readonly Action _onRead; + private readonly NetworkStream _socketStream; + + private bool _closed; - public ClientSocket(Socket workSocket, Guid uuid, CancellationTokenSource cts, Action onClose, Action onReceive) + // Size of receive buffer. + private const int BUFFER_SIZE = 4096; + private const char DELIMETER = '\n'; // Environment.NewLine; + + public ClientSocket(Socket workSocket, Guid uuid, CancellationTokenSource cts, Action onClose, Action onReceive) { Uuid = uuid; WorkSocket = workSocket; + _socketStream = new NetworkStream(WorkSocket, false); + + // timeout in ms + // todo: configurable + _socketStream.ReadTimeout = 5000; + _socketStream.WriteTimeout = 5000; + _onClose = onClose; _onRead = onReceive; - Task.Run(async () => { + _ = Task.Run(async () => + { await SendTaskLoop(cts); }, cts.Token); - Task.Run(async () => + + _ = Task.Run(async () => { - await ReceiveTaskLoop(cts); + await ReceiveTaskLoopStart(cts); }, cts.Token); - } private async Task SendTaskLoop(CancellationTokenSource cts) { - while (!cts.IsCancellationRequested) + try { - // Stop trying to send data - if (!WorkSocket.Connected) - { - return; - } - - if (!_packetsToSend.TryTake(out var packet)) + while (!cts.IsCancellationRequested && !_closed) { - await Task.Yield(); - continue; + // Stop trying to send data + if (!WorkSocket.Connected) + { + Close(); + return; + } + + if (!_packetsToSend.TryTake(out var packet)) + { + await Task.Yield(); + continue; + } + + var bytesToSend = JsonSerializer.SerializeToUtf8Bytes(packet, packet.GetType()); + + await _socketStream.WriteAsync(bytesToSend, 0, bytesToSend.Length, cts.Token); + await _socketStream.FlushAsync(); } - - var bytesToSend = JsonSerializer.SerializeToUtf8Bytes(packet, packet.GetType()); - SendData(bytesToSend); - - await _sendSemaphore.WaitAsync(cts.Token); + } + catch (SocketException e) + { + Console.Error.WriteLine(e); + Close(); } } - private async Task ReceiveTaskLoop(CancellationTokenSource cts) + private async Task ReceiveTaskLoopStart(CancellationTokenSource cts) { - while (!cts.IsCancellationRequested) + try { - // Stop trying to receive data - if (!WorkSocket.Connected) + // Received data string. + var receivedDataStr = new StringBuilder(); + + void ReadFlush(StringBuilder data) { - return; + try + { + // All data has been finalized, invoke callback + _onRead(this, data.ToString()); + } + catch (Exception e) + { + Console.Error.WriteLine(e); + } + + // Clear string + data.Clear(); } - // Create the state object. - ReceivedData state = new ReceivedData(this); - WorkSocket.BeginReceive(state.Buffer, 0, ReceivedData.BUFFER_SIZE, 0, ReadCallback, state); + while (!cts.IsCancellationRequested && !_closed) + { + // Stop trying to receive data + if (!WorkSocket.Connected) + { + Close(); + return; + } - await _receiveSemaphore.WaitAsync(cts.Token); - } - } + // Receive buffer. + // the buffer is used to store the received bytes temporarily + // and cleared when they are later parsed into receivedData + var buffer = new byte[BUFFER_SIZE]; - private void ReadCallback(IAsyncResult ar) - { - // Retrieve the state object and the handler socket - // from the asynchronous state object. - ReceivedData state = (ReceivedData) ar.AsyncState; - ClientSocket handler = state.ClientSocket; - if (!handler.WorkSocket.Connected) - { - Close(); - return; - } + var bytesRead = await _socketStream.ReadAsync(buffer, 0, BUFFER_SIZE, cts.Token); - // Read data from the client socket. - var bytesRead = handler.WorkSocket.EndReceive(ar); + // If 0, no more data is coming + if (bytesRead <= 0) + { + ReadFlush(receivedDataStr); + } + else + { + var str = Encoding.UTF8.GetString(buffer, 0, bytesRead); - // If 0, no more data is coming - if (bytesRead <= 0) - { - _receiveSemaphore.Release(); - _onRead(this, state); - return; - } + // if the string already contains a delimiter, + // split it. This way, multiple strings sent at once can be parsed + if (str.Contains(DELIMETER)) + { + var strings = str.Split(DELIMETER); - // There might be more data, so store the data received so far. - state.ReceivedDataStr.Append(Encoding.UTF8.GetString(state.Buffer, 0, bytesRead)); + var index = 0; - _receiveSemaphore.Release(); - _onRead(this, state); - } + foreach (var s in strings) + { + if (index >= strings.Length - 1) + { + break; + } - private void SendCallback(IAsyncResult ar) - { - try - { - // Retrieve the socket from the state object. - Socket handler = (Socket) ar.AsyncState; + receivedDataStr.Append(s); + + ReadFlush(receivedDataStr); + index++; + } - // Complete sending the data to the remote device. - handler.EndSend(ar); + continue; + } - _sendSemaphore.Release(); + // There might be more data, so store the data received so far. + receivedDataStr.Append(str); + + Console.WriteLine(receivedDataStr); + } + } } - catch (Exception e) + catch (SocketException e) { - // todo: use logger? - Console.WriteLine(e.ToString()); + Console.Error.WriteLine(e); + Close(); } - } - private void SendData(byte[] data) - { - // synchronous sending - // var bytesSent = WorkSocket.Send(data.ToArray()); - // - // if (bytesSent < data.Length) - // { - // SendData(data.Slice(bytesSent, data.Length).ToArray()); - // } - - // async sending - // Begin sending the data to the remote device. - WorkSocket.BeginSend(data, 0, data.Length, 0, SendCallback, WorkSocket); + Console.WriteLine("Done listening"); } - public async Task QueueSend(Packet packet) + public void QueueSend(Packet packet) { if (!WorkSocket.Connected) { + Close(); throw new IOException("Socket has been closed!"); } - // Avoid blocking, is this overkill? - await Task.Run(() => - { - _packetsToSend.Add(packet); - }); + _packetsToSend.Add(packet); } - public void Close() + private void Close() { + if (_closed) + { + return; + } + try { - _sendSemaphore.Release(); - _sendSemaphore.Dispose(); + if (WorkSocket.Connected) + { + WorkSocket.Disconnect(true); + WorkSocket.Close(); + } } catch (Exception e) { - // todo: use logger? - Console.WriteLine(e.ToString()); + Console.Error.WriteLine(e); } + _socketStream.Dispose(); + _onClose.Invoke(this); + _closed = true; } } diff --git a/CatCore/Services/Sockets/Packet.cs b/CatCore/Services/Sockets/Packet.cs index 9f185be8..526ee3c6 100644 --- a/CatCore/Services/Sockets/Packet.cs +++ b/CatCore/Services/Sockets/Packet.cs @@ -11,44 +11,51 @@ namespace CatCore.Services.Sockets.Packets /// public abstract class Packet { - // Force packet name to be compile time constant + // Force packet name to be runtime constant public string PacketName => GetType().Name; - - /// This is probably overkill but WHO CARES? WHY WOULD ANYONE CARE BECAUSE IT'S ASYNC SO IT MUST BE FAST! - /// FAST LIKE FAST AND FURIOUS! IDK I NEVER WATCHED THE MOVIES - public static async Task GetPacketFromJson(string json) + public static Packet? GetPacketFromJson(string json) { if (string.IsNullOrEmpty(json)) { return null; } - return await Task.Run((() => - { - var jsonNode = JsonSerializer.Deserialize>(json); + var jsonNode = JsonSerializer.Deserialize>(json); - if (jsonNode == null) - { - return null; - } + if (jsonNode == null) + { + return null; + } - if (!jsonNode.TryGetValue(nameof(PacketName), out var packetNameElm)) - { - return null; - } + if (!jsonNode.TryGetValue(nameof(PacketName), out var packetNameElm)) + { + return null; + } - var packetName = packetNameElm.GetString(); + var packetName = packetNameElm.GetString(); - if (packetName == null) - { - return null; - } + if (packetName == null) + { + return null; + } - var type = GetPacketTypeByName(packetName!); + var type = GetPacketTypeByName(packetName); + return type == null ? null : JsonSerializer.Deserialize(json, type) as Packet; + } - return type == null ? null : JsonSerializer.Deserialize(json, type!) as Packet; - })); + public static Packet? TryGetPacketFromJson(string json, out Exception? exception) + { + exception = null; + try + { + return GetPacketFromJson(json); + } + catch (Exception e) + { + exception = e; + return null; + } } private static Type? GetPacketTypeByName(string name) diff --git a/CatCore/Services/Sockets/ReceivedData.cs b/CatCore/Services/Sockets/ReceivedData.cs deleted file mode 100644 index 1aab039b..00000000 --- a/CatCore/Services/Sockets/ReceivedData.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System; -using System.Net.Sockets; -using System.Text; - -namespace CatCore.Services.Sockets -{ - // State object for reading client data asynchronously - public class ReceivedData - { - // Size of receive buffer. - public const int BUFFER_SIZE = 4096; - - // Receive buffer. - // the buffer is used to store the received bytes temporarily - // and cleared when they are later parsed into receivedData - public readonly byte[] Buffer = new byte[BUFFER_SIZE]; - - // Received data string. - public readonly StringBuilder ReceivedDataStr = new StringBuilder(); - - // Client socket. - public readonly ClientSocket ClientSocket; - - public readonly Guid ClientUuid; - - public readonly Socket WorkSocket; - - public ReceivedData(ClientSocket clientSocket) - { - ClientSocket = clientSocket; - ClientUuid = clientSocket.Uuid; - WorkSocket = clientSocket.WorkSocket; - } - } -} \ No newline at end of file From 54fa268cafd27a35abae9f7cfe55e38b83a97d92 Mon Sep 17 00:00:00 2001 From: FernTheDev <15272073+Fernthedev@users.noreply.github.com> Date: Thu, 25 Nov 2021 15:20:02 -0400 Subject: [PATCH 4/4] Unnecessary disconnect, handled by KittenRawSocketProvider --- CatCore/Services/Sockets/ClientSocket.cs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/CatCore/Services/Sockets/ClientSocket.cs b/CatCore/Services/Sockets/ClientSocket.cs index a64181ef..eb8703b7 100644 --- a/CatCore/Services/Sockets/ClientSocket.cs +++ b/CatCore/Services/Sockets/ClientSocket.cs @@ -197,19 +197,6 @@ private void Close() return; } - try - { - if (WorkSocket.Connected) - { - WorkSocket.Disconnect(true); - WorkSocket.Close(); - } - } - catch (Exception e) - { - Console.Error.WriteLine(e); - } - _socketStream.Dispose(); _onClose.Invoke(this);