I'm relatively new to .Net/UWP networking and have been trying to make sense of the APIs. I'd like to implement a TCP client that is capable of reading and sending messages independently (there is no protocol and all messages are independent).
Typically (e.g. in a POSIX C++ server), this would be done by spinning up a thread and reading continuously (sleeping periodically if no data) or using asynchronous I/O (epoll, select-poll, etc.)
I've implemented a UWP-compatible client that attempts to use ReadAsync as well as cancellation tokens to shut the client down.
This is a pretty rough cut and I'd like to flesh it out more, especially wrt error handling. Some questions:
Is looping continuously as I've done in ReadMessages() the recommended/idiomatic way of using these functions?
Am I using the cancellation tokens appropriately? My intent is to be able to kill a ReadAsync() request when the client object is destructed (or, eventually, when a Disconnect() method is added).
Where should I be detecting disconnects? try-catch blocks around the ReadAsync methods?
Lastly, messages (their byte[] buffers, rather), would be output using a fast queue. I've commented that portion out. The intent is to use this as a component in a larger application (e.g., Unity).
Thank you. Code below.
namespace Messages
{
[StructLayout(LayoutKind.Sequential, Pack = 1)]
public struct Header
{
public UInt32 size;
public UInt32 id;
public UInt64 received_timestamp;
}
[StructLayout(LayoutKind.Sequential, Pack = 1)]
public struct PingMessage
{
public Header header;
public UInt64 timestamp;
public void Init()
{
header.size = (UInt32)Marshal.SizeOf(typeof(PingMessage));
header.id = 0xdeaddead;
timestamp = (UInt64)System.Diagnostics.Stopwatch.GetTimestamp();
}
}
[StructLayout(LayoutKind.Sequential, Pack = 1)]
public struct PongMessage
{
public Header header;
public UInt64 ping_timestamp;
public void Init()
{
header.size = (UInt32)Marshal.SizeOf(typeof(PongMessage));
header.id = 0xdeadbeef;
}
}
}
public class Client
{
private Windows.Networking.Sockets.StreamSocket m_socket;
private System.Threading.Tasks.Task m_connectionTask;
private Windows.Storage.Streams.DataWriter m_writer;
private System.IO.Stream m_inputStream;
private Windows.Networking.HostName m_hostname;
private string m_port;
private bool m_connected = false;
private System.Threading.CancellationTokenSource m_cancellationSource = new System.Threading.CancellationTokenSource();
private byte[] m_header = new byte[Marshal.SizeOf(typeof(Messages.Header))];
private byte[] m_messageBuffer = null;
//private BlockingRingBuffer<byte[]> m_receiveQueue = new BlockingRingBuffer<byte[]>(1024 * 1024, true);
public static byte[] Serialize<T>(ref T str)
{
int size = Marshal.SizeOf(str);
byte[] arr = new byte[size];
GCHandle h = default(GCHandle);
try
{
h = GCHandle.Alloc(arr, GCHandleType.Pinned);
Marshal.StructureToPtr<T>(str, h.AddrOfPinnedObject(), false);
}
finally
{
if (h.IsAllocated)
{
h.Free();
}
}
return arr;
}
public static T Deserialize<T>(byte[] arr) where T : struct
{
T str = default(T);
GCHandle h = default(GCHandle);
try
{
h = GCHandle.Alloc(arr, GCHandleType.Pinned);
str = (T)Marshal.PtrToStructure(h.AddrOfPinnedObject(), typeof(T));
}
finally
{
if (h.IsAllocated)
{
h.Free();
}
}
return str;
}
public async void Send(byte[] bytes)
{
await m_connectionTask;
if (!m_connected)
{
Console.WriteLine("NOT CONNECTED: dropping message");
return;
}
try
{
m_writer.WriteBytes(bytes);
await m_writer.StoreAsync();
Console.WriteLine("Sent " + bytes.Length + " bytes");
}
catch (Exception exception)
{
// If this is an unknown status it means that the error if fatal and retry will likely fail.
if (Windows.Networking.Sockets.SocketError.GetStatus(exception.HResult) == Windows.Networking.Sockets.SocketErrorStatus.Unknown)
{
throw;
}
Console.WriteLine("Error: Failed to send: " + exception.Message);
}
}
/*
public byte[] GetNextMessage()
{
byte[] buffer;
m_receiveQueue.TryDequeue(out buffer);
return buffer;
}
*/
private async void ReadMessages()
{
//TODO: handle disconnections and exceptions?
while (true)
{
// Read header
await m_inputStream.ReadAsync(m_header, 0, m_header.Length, m_cancellationSource.Token);
if (m_cancellationSource.IsCancellationRequested)
{
return;
}
// Read remainder of message
Messages.Header header = Deserialize<Messages.Header>(m_header);
m_messageBuffer = new byte[header.size];
Buffer.BlockCopy(m_header, 0, m_messageBuffer, 0, m_header.Length);
await m_inputStream.ReadAsync(m_messageBuffer, m_header.Length, (int)header.size - m_header.Length, m_cancellationSource.Token);
if (m_cancellationSource.IsCancellationRequested)
{
return;
}
// Enqueue
//TODO: put on ring buffer
Console.WriteLine("Read message: {0:x}, {1} bytes, thread={2}", header.id, header.size, System.Threading.Thread.CurrentThread.ManagedThreadId);
m_messageBuffer = null;
}
}
public async void Connect(bool blockUntilConnected, Action<bool, Exception> OnConnected = null)
{
m_connectionTask = System.Threading.Tasks.Task.Run(
async () =>
{
Exception e = null;
try
{
await m_socket.ConnectAsync(m_hostname, m_port);
m_writer = new Windows.Storage.Streams.DataWriter(m_socket.OutputStream);
m_inputStream = m_socket.InputStream.AsStreamForRead();
m_connected = true;
Console.WriteLine("Remote Address: " + m_socket.Information.RemoteAddress);
}
catch (Exception exception)
{
Console.WriteLine("Error: Failed to connect: " + exception.Message);
m_connected = false;
e = exception;
}
if (OnConnected != null)
{
OnConnected(m_connected, e);
}
}
);
if (blockUntilConnected)
{
m_connectionTask.Wait();
}
await m_connectionTask;
if (m_connected)
{
ReadMessages();
}
}
public Client(string hostname, int port)
{
m_socket = new Windows.Networking.Sockets.StreamSocket();
m_hostname = new Windows.Networking.HostName(hostname);
m_port = port.ToString();
}
~Client()
{
m_cancellationSource.Cancel();
//m_receiveQueue.StopWaiting(); // queue cannot be reused after this
m_writer.Dispose();
m_socket.Dispose();
}
}
User contributions licensed under CC BY-SA 3.0