RxSockets
A minimal reactive / async streams socket implementation.
Install / Use
/learn @dshe/RxSocketsREADME
RxSockets

Minimal Reactive / Async Streams Socket Implementation
- .NET 8.0 library
- connect: asynchronous
- send: synchronous
- receive: observable or async enumerable
- accept: observable or async enumerable
- simple and intuitive API
- major dependencies: System.Reactive
installation
PM> Install-Package RxSockets
example
using System.Reactive.Linq;
using System.Linq;
using Xunit;
using RxSockets;
Server
interface IRxSocketServer : IAsyncDisposable
{
EndPoint LocalEndPoint { get; }
IObservable<IRxSocketClient> AcceptObservable { get; }
IAsyncEnumerable<IRxSocketClient> AcceptAllAsync { get; }
}
// Create a server using an available port on the local machine.
IRxSocketServer server = RxSocketServer.Create();
// Prepare to start accepting connections from clients.
server
.AcceptObservable
.Subscribe(onNext: acceptClient =>
{
// After the server accepts a client connection,
// start receiving messages from the client and ...
acceptClient
.ReceiveObservable
.ToStrings()
.Subscribe(onNext: message =>
{
// Echo each message received back to the client.
acceptClient.Send(message.ToByteArray());
});
});
Client
interface IRxSocketClient : IAsyncDisposable
{
EndPoint RemoteEndPoint { get; }
bool Connected { get; }
int Send(ReadOnlySpan<byte> buffer);
IObservable<byte> ReceiveObservable { get; }
IAsyncEnumerable<byte> ReceiveAllAsync { get; }
}
// Create a client connected to EndPoint of the server.
IRxSocketClient client = await server.LocalEndPoint.CreateRxSocketClientAsync();
// Send the message "Hello!" to the server,
// which the server will then echo back to the client.
client.Send("Hello!".ToByteArray());
// Receive the message from the server.
string message = await client.ReceiveAllAsync.ToStrings().FirstAsync();
Assert.Equal("Hello!", message);
await client.DisposeAsync();
await server.DisposeAsync();
notes
To communicate using strings (see example above), the following extension methods are provided:
byte[] ToByteArray(this string source);
byte[] ToByteArray(this IEnumerable<string> source)
IEnumerable<string> ToStrings(this IEnumerable<byte> source)
IObservable<string> ToStrings(this IObservable<byte> source)
IAsyncEnumerable<string> ToStrings(this IAsyncEnumerable<byte> source)
To communicate using byte arrays with a 4 byte BigEndian integer length prefix, the following extension methods are provided:
byte[] ToByteArrayWithLengthPrefix(this byte[] source)
IEnumerable<byte[]> ToArraysFromBytesWithLengthPrefix(this IEnumerable<byte> source)
IObservable<byte[]> ToArraysFromBytesWithLengthPrefix(this IObservable<byte> source)
IAsyncEnumerable<byte[]> ToArraysFromBytesWithLengthPrefix(this IAsyncEnumerable<byte> source)
To support multiple simultaneous observers, use:
Observable.Publish().[RefCount() | AutoConnect()]
Related Skills
node-connect
349.0kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
109.4kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
349.0kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
349.0kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
