Design Critique: Proper use of C#'s ReadAsync() for reading socket asynchronously?

-1

I'm relatively new to .Net/UWP networking and have been trying to make sense of the APIs. I'd like to implement a TCP client that is capable of reading and sending messages independently (there is no protocol and all messages are independent).

Typically (e.g. in a POSIX C++ server), this would be done by spinning up a thread and reading continuously (sleeping periodically if no data) or using asynchronous I/O (epoll, select-poll, etc.)

I've implemented a UWP-compatible client that attempts to use ReadAsync as well as cancellation tokens to shut the client down.

This is a pretty rough cut and I'd like to flesh it out more, especially wrt error handling. Some questions:

  1. Is looping continuously as I've done in ReadMessages() the recommended/idiomatic way of using these functions?

  2. Am I using the cancellation tokens appropriately? My intent is to be able to kill a ReadAsync() request when the client object is destructed (or, eventually, when a Disconnect() method is added).

  3. Where should I be detecting disconnects? try-catch blocks around the ReadAsync methods?

Lastly, messages (their byte[] buffers, rather), would be output using a fast queue. I've commented that portion out. The intent is to use this as a component in a larger application (e.g., Unity).

Thank you. Code below.

namespace Messages
{
  [StructLayout(LayoutKind.Sequential, Pack = 1)]
  public struct Header
  {
    public UInt32 size;
    public UInt32 id;
    public UInt64 received_timestamp;
  }

  [StructLayout(LayoutKind.Sequential, Pack = 1)]
  public struct PingMessage
  {
    public Header header;
    public UInt64 timestamp;

    public void Init()
    {
      header.size = (UInt32)Marshal.SizeOf(typeof(PingMessage));
      header.id = 0xdeaddead;
      timestamp = (UInt64)System.Diagnostics.Stopwatch.GetTimestamp();
    }
  }

  [StructLayout(LayoutKind.Sequential, Pack = 1)]
  public struct PongMessage
  {
    public Header header;
    public UInt64 ping_timestamp;

    public void Init()
    {
      header.size = (UInt32)Marshal.SizeOf(typeof(PongMessage));
      header.id = 0xdeadbeef;
    }
  }
}
  public class Client
  {
    private Windows.Networking.Sockets.StreamSocket m_socket;
    private System.Threading.Tasks.Task m_connectionTask;
    private Windows.Storage.Streams.DataWriter m_writer;
    private System.IO.Stream m_inputStream;

    private Windows.Networking.HostName m_hostname;
    private string m_port;

    private bool m_connected = false;
    private System.Threading.CancellationTokenSource m_cancellationSource = new System.Threading.CancellationTokenSource();

    private byte[] m_header = new byte[Marshal.SizeOf(typeof(Messages.Header))];
    private byte[] m_messageBuffer = null;

    //private BlockingRingBuffer<byte[]> m_receiveQueue = new BlockingRingBuffer<byte[]>(1024 * 1024, true);

    public static byte[] Serialize<T>(ref T str)
    {
      int size = Marshal.SizeOf(str);
      byte[] arr = new byte[size];
      GCHandle h = default(GCHandle);
      try
      {
        h = GCHandle.Alloc(arr, GCHandleType.Pinned);
        Marshal.StructureToPtr<T>(str, h.AddrOfPinnedObject(), false);
      }
      finally
      {
        if (h.IsAllocated)
        {
          h.Free();
        }
      }
      return arr;
    }

    public static T Deserialize<T>(byte[] arr) where T : struct
    {
      T str = default(T);
      GCHandle h = default(GCHandle);
      try
      {
        h = GCHandle.Alloc(arr, GCHandleType.Pinned);
        str = (T)Marshal.PtrToStructure(h.AddrOfPinnedObject(), typeof(T));
      }
      finally
      {
        if (h.IsAllocated)
        {
          h.Free();
        }
      }
      return str;
    }

    public async void Send(byte[] bytes)
    {
      await m_connectionTask;

      if (!m_connected)
      {
        Console.WriteLine("NOT CONNECTED: dropping message");
        return;
      }

      try
      {
        m_writer.WriteBytes(bytes);
        await m_writer.StoreAsync();
        Console.WriteLine("Sent " + bytes.Length + " bytes");
      }
      catch (Exception exception)
      {
        // If this is an unknown status it means that the error if fatal and retry will likely fail.
        if (Windows.Networking.Sockets.SocketError.GetStatus(exception.HResult) == Windows.Networking.Sockets.SocketErrorStatus.Unknown)
        {
          throw;
        }
        Console.WriteLine("Error: Failed to send: " + exception.Message);
      }
    }

    /*
    public byte[] GetNextMessage()
    {
      byte[] buffer;
      m_receiveQueue.TryDequeue(out buffer);
      return buffer;
    }
    */

    private async void ReadMessages()
    {
      //TODO: handle disconnections and exceptions?
      while (true)
      {
        // Read header
        await m_inputStream.ReadAsync(m_header, 0, m_header.Length, m_cancellationSource.Token);
        if (m_cancellationSource.IsCancellationRequested)
        {
          return;
        }

        // Read remainder of message
        Messages.Header header = Deserialize<Messages.Header>(m_header);
        m_messageBuffer = new byte[header.size];
        Buffer.BlockCopy(m_header, 0, m_messageBuffer, 0, m_header.Length);
        await m_inputStream.ReadAsync(m_messageBuffer, m_header.Length, (int)header.size - m_header.Length, m_cancellationSource.Token);
        if (m_cancellationSource.IsCancellationRequested)
        {
          return;
        }

        // Enqueue
        //TODO: put on ring buffer
        Console.WriteLine("Read message: {0:x}, {1} bytes, thread={2}", header.id, header.size, System.Threading.Thread.CurrentThread.ManagedThreadId);
        m_messageBuffer = null;
      }
    }

    public async void Connect(bool blockUntilConnected, Action<bool, Exception> OnConnected = null)
    {
      m_connectionTask = System.Threading.Tasks.Task.Run(
        async () =>
        {
          Exception e = null;

          try
          {
            await m_socket.ConnectAsync(m_hostname, m_port);
            m_writer = new Windows.Storage.Streams.DataWriter(m_socket.OutputStream);
            m_inputStream = m_socket.InputStream.AsStreamForRead();
            m_connected = true;
            Console.WriteLine("Remote Address: " + m_socket.Information.RemoteAddress);
          }
          catch (Exception exception)
          {
            Console.WriteLine("Error: Failed to connect: " + exception.Message);
            m_connected = false;
            e = exception;
          }

          if (OnConnected != null)
          {
            OnConnected(m_connected, e);
          }
        }
      );

      if (blockUntilConnected)
      {
        m_connectionTask.Wait();
      }

      await m_connectionTask;

      if (m_connected)
      {
        ReadMessages();
      }
    }

    public Client(string hostname, int port)
    {
      m_socket = new Windows.Networking.Sockets.StreamSocket();
      m_hostname = new Windows.Networking.HostName(hostname);
      m_port = port.ToString();
    }

    ~Client()
    {
      m_cancellationSource.Cancel();
      //m_receiveQueue.StopWaiting(); // queue cannot be reused after this
      m_writer.Dispose();
      m_socket.Dispose();
    }
  }
c#
networking
tcp
uwp
async-await
asked on Stack Overflow Feb 14, 2019 by trzy

0 Answers

Nobody has answered this question yet.


User contributions licensed under CC BY-SA 3.0