JetBlack.Network
An experiment in using Rx (reactive extensions) with network sockets for TCP in C#.
Install / Use
/learn @rob-blackbourn/JetBlack.NetworkREADME
JetBlack.Network
An experiment in using reactive extensions with network sockets over TCP.
Four versions are implemented:
- Use
TcpListener/TcpClientclasses with asynchronous listen, connect, and stream methods. - Use
Socketas the driving class providing asynchronous listen and connect methods, but using with asynchronous stream methods. - Use
Socketas the driving class providing asynchronous methods for listen, connect, send and receive. - Use
Socketas the driving class with non-blocking sockets and a select loop.
News
2015-09-04
I have folded in the changes I made while using these classes. The major change
is that the ByteBuffer has been replaced by System.ArraySegment<byte>
which has effectively the same functionality. This means I can use the methods
on Socket which take IList<ArraySegment<byte>>, and delete a class!
The DisposableBuffer has been replaced by a generic wrapper class
DisposableValue.
I hope these changes don't screw things up for people. I think this solution is neater, and more sympathetic to the underlying classes. I do like to delete code whenever possible!
Description
Listening
The natural approach for a listener would be to subscribe an endpoint, and
receive clients as they connect. This is achieved by an extension method
ToListenerObservable which produces an observable of the form:
IObservable<TcpClient> or IObservable<Socket>. So you might do the
following:
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9211)
.ToListenerObservable(10)
.Subscribe(socket => DoSomething(socket));
The 10 is the backlog.
Clients
Clients read with an IObservable<ArraySegment<byte>> and write with an IObserver<ArraySegment<byte>> and are created by
extension methods which take a Socket
or TcpClient.
There is also an ISubject<ArraySegment<byte>, ArraySegment<byte>> for
reading and writing with the same object. So you might do the following:
socket.ToClientObservable(1024)
.Subscribe(buffer => DoSomething(buffer));
The ArraySegment<byte> class has a buffer and a length (the buffer may not be full). The 1024 argument was the size
of the buffer to create. typically the extension method will also take a CancellationToken as an argument.
Frame Clients
Frame Clients follow the same pattern to the clients, but use a DisposableValue<ArraySegment<byte>>
and send/receive the length of the buffer. This ensures the full message is
received. They also take a BufferManager
to reduce garbage collection.
Connectors
The client connection can be performed asynchronously. ClientConnectors are IObservable<Socket> or IObservable<TcpClient> and
are created by extension methods which take IPEndPoint. So you might do the following:
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9211)
.ToConnectObservable()
.Subscribe(socket => DoSomething(socket));
Examples
For each implementation there is an example echo client and server. The following shows the RxSocket implementation.
Echo Server
var endpoint = ProgramArgs.Parse(args, new[] { "127.0.0.1:9211" }).EndPoint;
var cts = new CancellationTokenSource();
endpoint.ToListenerObservable(10)
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(
client =>
client.ToClientObservable(1024, SocketFlags.None)
.Subscribe(client.ToClientObserver(1024, SocketFlags.None), cts.Token),
error => Console.WriteLine("Error: " + error.Message),
() => Console.WriteLine("OnCompleted"),
cts.Token);
Console.WriteLine("Press <ENTER> to quit");
Console.ReadLine();
cts.Cancel();
Echo Client
var endpoint = ProgramArgs.Parse(args, new[] { "127.0.0.1:9211" }).EndPoint;
var cts = new CancellationTokenSource();
var bufferManager = BufferManager.CreateBufferManager(2 << 16, 2 << 8);
var frameClientSubject = endpoint.ToFrameClientSubject(SocketFlags.None, bufferManager, cts.Token);
var observerDisposable =
frameClientSubject
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(
disposableBuffer =>
{
Console.WriteLine("Read: " + Encoding.UTF8.GetString(disposableBuffer.Value.Array, 0, disposableBuffer.Value.Count));
disposableBuffer.Dispose();
},
error => Console.WriteLine("Error: " + error.Message),
() => Console.WriteLine("OnCompleted: FrameReceiver"));
Console.In.ToLineObservable()
.Subscribe(
line =>
{
var writeBuffer = Encoding.UTF8.GetBytes(line);
frameClientSubject.OnNext(DisposableValue.Create(new ArraySegment<byte>(writeBuffer, 0, writeBuffer.Length), Disposable.Empty));
},
error => Console.WriteLine("Error: " + error.Message),
() => Console.WriteLine("OnCompleted: LineReader"));
observerDisposable.Dispose();
cts.Cancel();
Implementation
RxTcp
Listening
This implementation is the most straightforward. The TcpListener
and TcpClient
classes have asynchronous methods which can be used with await when
connecting and listening. The provide a
NetworkStream
which implement asynchronous methods declared by Stream.
The listen is implemented in the following manner:
public static IObservable<TcpClient> ToListenerObservable(this IPEndPoint endpoint, int backlog)
{
return new TcpListener(endpoint).ToListenerObservable(backlog);
}
public static IObservable<TcpClient> ToListenerObservable(this TcpListener listener, int backlog)
{
return Observable.Create<TcpClient>(async (observer, token) =>
{
listener.Start(backlog);
try
{
while (!token.IsCancellationRequested)
observer.OnNext(await listener.AcceptTcpClientAsync());
observer.OnCompleted();
listener.Stop();
}
catch (Exception error)
{
observer.OnError(error);
}
});
}
Note that the observable factory method used is the asynchonous version which
provides a cancellation token. We can use this to control exit from the listen
loop and produce the OnCompleted action.
Connecting
Connecting works in a similar manner to listening. We observe on and endpoint and receive a client.
public static IObservable<TcpClient> ToConnectObservable(this IPEndPoint endpoint)
{
return Observable.Create<TcpClient>(async (observer, token) =>
{
try
{
var client = new TcpClient();
await client.ConnectAsync(endpoint.Address, endpoint.Port);
token.ThrowIfCancellationRequested();
observer.OnNext(client);
observer.OnCompleted();
}
catch (Exception error)
{
observer.OnError(error);
}
});
}
As with the listener we use the asynchronous factory method. As the connect may take some time I have added a cancellation token check after the connection returns.
Reading and writing
I have implemented two readers and writers. One for bytes, and another for "frames" which are discussed below. Note that when byte arrays are sent and received they may be fragmented (split into separate blocks).
It is often more efficient to manage the byte arrays in a pool. When we do
this the buffers may be larger than the payload, so I use ArraySegment<byte>
to hold the byte array and payload length.
The clients are thin wrappers around the streams:
public static ISubject<ArraySegment<byte>, ArraySegment<byte>> ToClientSubject(this TcpClient client, int size, CancellationToken token)
{
return Subject.Create(client.ToClientObserver(token), client.ToClientObservable(size));
}
public static IObservable<ArraySegment<byte>> ToClientObservable(this TcpClient client, int size)
{
return client.GetStream().ToStreamObservable(size);
}
public static IObserver<ArraySegment<byte>> ToClientObserver(this TcpClient client, CancellationToken token)
{
return client.GetStream().ToStreamObserver(token);
}
The stream observer (writer) is the most straightforward as the write method guarantees to send the entire buffer.
public static IObserver<ArraySegment<byte>> ToStreamObserver(this Stream stream, CancellationToken token)
{
return Observer.Create<ArraySegment<byte>>(async buffer =>
{
await stream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, token);
});
}
The stream observable follows a similar pattern to the previous observables.
public static IObservable<ArraySegment<byte>> ToStreamObservable(this Stream stream, int size)
{
return Observable.Create<ArraySegment<byte>>(async (observer, token) =>
{
var buffer = new byte[size];
try
{
while (!token.IsCancellationReques
