SerialPortRx
A Reactive Serial, TCP, and UDP I/O library that exposes incoming data as IObservable streams and accepts writes via simple methods. Ideal for event-driven, message-framed, and polling scenarios.
Install / Use
/learn @ChrisPulman/SerialPortRxREADME
SerialPortRx
A Reactive Serial, TCP, and UDP I/O library that exposes incoming data as IObservable streams and accepts writes via simple methods. Ideal for event-driven, message-framed, and polling scenarios.
Features
- SerialPortRx: Reactive wrapper for System.IO.Ports.SerialPort
- UdpClientRx and TcpClientRx: Reactive wrappers exposing a common IPortRx interface
- Observables:
- DataReceived: IObservable<char> for serial text flow
- DataReceivedBytes: IObservable<byte> for raw byte stream (auto-receive mode)
- Lines: IObservable<string> of complete lines split by NewLine
- BytesReceived: IObservable<int> for byte stream emitted when using ReadAsync
- IsOpenObservable: IObservable<bool> for connection state
- ErrorReceived: IObservable<Exception> for errors
- PinChanged: IObservable<SerialPinChangedEventArgs> for pin state changes (Windows only)
- Synchronous read methods for manual data consumption
- TCP/UDP batched reads:
- TcpClientRx.DataReceivedBatches: IObservable<byte[]> chunks per read loop
- UdpClientRx.DataReceivedBatches: IObservable<byte[]> per received datagram
- Helpers:
- PortNames(): reactive port enumeration with change notifications
- BufferUntil(): message framing between start and end delimiters with timeout
- WhileIsOpen(): periodic observable that fires only while a port is open
- Cross-targeted: netstandard2.0, net8.0, net9.0, net10.0, and Windows-specific TFMs
Installation
- dotnet add package SerialPortRx
Supported target frameworks
- netstandard2.0
- net8.0, net9.0, net10.0
- net8.0-windows10.0.19041.0, net9.0-windows10.0.19041.0, net10.0-windows10.0.19041.0 (adds Windows-only APIs guarded by HasWindows)
Quick start (Serial)
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using CP.IO.Ports;
using ReactiveMarbles.Extensions;
var disposables = new CompositeDisposable();
var port = new SerialPortRx("COM3", 115200) { ReadTimeout = -1, WriteTimeout = -1 };
// Observe line/state/errors
port.IsOpenObservable.Subscribe(isOpen => Console.WriteLine($"Open: {isOpen}")).DisposeWith(disposables);
port.ErrorReceived.Subscribe(ex => Console.WriteLine($"Error: {ex.Message}")).DisposeWith(disposables);
// Raw character stream
port.DataReceived.Subscribe(ch => Console.Write(ch)).DisposeWith(disposables);
await port.Open();
port.WriteLine("AT");
// Close when done
port.Close();
disposables.Dispose();
Discovering serial ports
// Emits the list of available port names whenever it changes
SerialPortRx.PortNames(pollInterval: 500)
.Subscribe(names => Console.WriteLine(string.Join(", ", names)));
To auto-connect when a specific COM port appears:
var target = "COM3";
var comDisposables = new CompositeDisposable();
SerialPortRx.PortNames()
.Do(names =>
{
if (comDisposables.Count == 0 && Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
{
var port = new SerialPortRx(target, 115200);
port.DisposeWith(comDisposables);
port.ErrorReceived.Subscribe(Console.WriteLine).DisposeWith(comDisposables);
port.IsOpenObservable.Subscribe(open => Console.WriteLine($"{target}: {(open ? "Open" : "Closed")}"))
.DisposeWith(comDisposables);
port.Open();
}
else if (!Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
{
comDisposables.Dispose(); // auto-cleanup if device removed
}
})
.ForEach()
.Subscribe();
Message framing with BufferUntil
BufferUntil helps extract framed messages from the character stream between a start and end delimiter within a timeout.
// Example: messages start with '!' and end with '\n' and must complete within 100ms
var start = 0x21.AsObservable(); // '!'
var end = 0x0a.AsObservable(); // '\n'
port.DataReceived
.BufferUntil(start, end, timeOut: 100)
.Subscribe(msg => Console.WriteLine($"MSG: {msg}"));
A variant returns a default message on timeout:
port.DataReceived
.BufferUntil(start, end, defaultValue: Observable.Return("<timeout>"), timeOut: 100)
.Subscribe(msg => Console.WriteLine($"MSG: {msg}"));
Periodic work while the port is open
// Write a heartbeat every 500ms but only while the port remains open
port.WhileIsOpen(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => port.Write("PING\n"));
Reading raw bytes with ReadAsync
Use ReadAsync for binary protocols or fixed-length reads. Each byte successfully read is also pushed to BytesReceived.
var buffer = new byte[64];
int read = await port.ReadAsync(buffer, 0, buffer.Length);
Console.WriteLine($"Read {read} bytes");
port.BytesReceived.Subscribe(b => Console.WriteLine($"Byte: {b:X2}"));
Notes:
- DataReceived is a char stream produced from SerialPort.ReadExisting() when EnableAutoDataReceive is true (default).
- DataReceivedBytes emits raw bytes alongside DataReceived in auto-receive mode.
- BytesReceived emits bytes read by your ReadAsync calls (not from ReadExisting()).
- Concurrent ReadAsync calls are serialized internally for safety.
Automatic vs Manual Data Reception
By default, EnableAutoDataReceive = true automatically feeds incoming data to DataReceived and DataReceivedBytes observables. Set this to false before calling Open() if you want to use synchronous read methods instead.
// Automatic mode (default) - data flows to observables
var port = new SerialPortRx("COM3", 115200);
port.DataReceived.Subscribe(ch => Console.Write(ch));
await port.Open();
// Manual mode - use synchronous reads
var port = new SerialPortRx("COM3", 115200) { EnableAutoDataReceive = false };
await port.Open();
string data = port.ReadExisting();
If you disable auto-receive but later want reactive streaming, call StartDataReception():
port.EnableAutoDataReceive = false;
await port.Open();
// Later, enable reactive streaming manually
var reception = port.StartDataReception(pollingIntervalMs: 10);
port.DataReceived.Subscribe(ch => Console.Write(ch));
// Stop when done
reception.Dispose();
Synchronous Read Methods
When EnableAutoDataReceive = false, use these synchronous methods for manual data consumption:
var port = new SerialPortRx("COM3", 115200) { EnableAutoDataReceive = false, ReadTimeout = 1000 };
await port.Open();
// Read all available data as string
string existing = port.ReadExisting();
// Read a single byte (-1 if none available)
int b = port.ReadByte();
// Read a single character (-1 if none available)
int ch = port.ReadChar();
// Read into a byte buffer
var buffer = new byte[64];
int bytesRead = port.Read(buffer, 0, buffer.Length);
// Read into a char buffer
var charBuffer = new char[64];
int charsRead = port.Read(charBuffer, 0, charBuffer.Length);
// Read until newline (respects NewLine property)
string line = port.ReadLine();
// Read until a specific delimiter
string data = port.ReadTo(">");
Reading lines
Use ReadLineAsync to await a single complete line split by the configured NewLine. Supports single- and multi-character newline sequences and respects ReadTimeout (> 0).
port.NewLine = "\r\n"; // optional: default is "\n"
var line = await port.ReadLineAsync();
Console.WriteLine($"Line: {line}");
You can also pass a CancellationToken:
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var line = await port.ReadLineAsync(cts.Token);
ReadToAsync
Read data up to a specific delimiter asynchronously:
// Read until '>' delimiter
var data = await port.ReadToAsync(">");
Console.WriteLine($"Received: {data}");
// With cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var data = await port.ReadToAsync(">", cts.Token);
Line streaming with Lines
Subscribe to Lines to get a continuous stream of complete lines:
port.NewLine = "\n";
port.Lines.Subscribe(line => Console.WriteLine($"LINE: {line}"));
Writing
port.Write(string text)- Write a stringport.WriteLine(string text)- Write a string followed by NewLineport.Write(byte[] buffer)- Write entire byte arrayport.Write(byte[] buffer, int offset, int count)- Write portion of byte arrayport.Write(char[] buffer)- Write entire char arrayport.Write(char[] buffer, int offset, int count)- Write portion of char array
Modern .NET Write Overloads (net8.0+)
On modern .NET targets, additional Span-based overloads are available:
// Write from ReadOnlySpan<byte>
ReadOnlySpan<byte> data = stackalloc byte[] { 0x01, 0x02, 0x03 };
port.Write(data);
// Write from ReadOnlyMemory<byte>
ReadOnlyMemory<byte> memory = new byte[] { 0x01, 0x02, 0x03 };
port.Write(memory);
// Write from ReadOnlySpan<char>
ReadOnlySpan<char> chars = "Hello".AsSpan();
port.Write(chars);
Error handling and state
- Subscribe to
port.ErrorReceivedfor exceptions and serial errors. - Subscribe to
port.IsOpenObservableto react to open/close transitions. - Call
port.Close()or dispose subscriptions (DisposeWith) to release the port.
Buffer Management
// Discard pending input data
port.DiscardInBuffer();
// Discard pending output data
port.DiscardOutBuffer();
// Check buffer sizes
Console.WriteLine($"Bytes to read: {port.BytesToRead}");
Console.WriteLine($"Bytes to write: {port.BytesToWrite}");
Windows-only: Pin Changed Events
On Windows targets, subscribe to pin state changes:
#if HasWin
