SkillAgentSearch skills...

MessagePipe

High performance in-memory/distributed messaging pipeline for .NET and Unity.

Install / Use

/learn @Cysharp/MessagePipe
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

MessagePipe

GitHub Actions Releases

MessagePipe is a high-performance in-memory/distributed messaging pipeline for .NET and Unity. It supports all cases of Pub/Sub usage, mediator pattern for CQRS, EventAggregator of Prism(V-VM decoupling), IPC(Interprocess Communication)-RPC, etc.

  • Dependency-injection first
  • Filter pipeline
  • better event
  • sync/async
  • keyed/keyless
  • buffered/bufferless
  • singleton/scoped
  • broadcast/response(+many)
  • in-memory/interprocess/distributed

MessagePipe is faster than standard C# event and 78 times faster than Prism's EventAggregator.

Of course, memory allocation per publish operation is less(zero).

Also providing roslyn-analyzer to prevent subscription leak.

Getting Started

For .NET, use NuGet. For Unity, please read Unity section.

PM> Install-Package MessagePipe

MessagePipe is built on top of a Microsoft.Extensions.DependencyInjection(for Unity, VContainer or Zenject or Builtin Tiny DI) so set up via ConfigureServices in .NET Generic Host. Generic Host is widely used in .NET such as ASP.NET Core, MagicOnion, ConsoleAppFramework, MAUI, WPF(with external support), etc so easy to setup.

using MessagePipe;
using Microsoft.Extensions.DependencyInjection;

Host.CreateDefaultBuilder()
    .ConfigureServices((ctx, services) =>
    {
        services.AddMessagePipe(); // AddMessagePipe(options => { }) for configure options
    })

Get the IPublisher<T> for publisher, Get the ISubscribe<T> for subscriber, like a Logger<T>. T can be any type, primitive(int, string, etc...), struct, class, enum, etc.

using MessagePipe;

public struct MyEvent { }

public class SceneA
{
    readonly IPublisher<MyEvent> publisher;
    
    public SceneA(IPublisher<MyEvent> publisher)
    {
        this.publisher = publisher;
    }

    void Send()
    {
        this.publisher.Publish(new MyEvent());
    }
}

public class SceneB
{
    readonly ISubscriber<MyEvent> subscriber;
    readonly IDisposable disposable;

    public SceneB(ISubscriber<MyEvent> subscriber)
    {
        var bag = DisposableBag.CreateBuilder(); // composite disposable for manage subscription
        
        subscriber.Subscribe(x => Console.WriteLine("here")).AddTo(bag);

        disposable = bag.Build();
    }

    void Close()
    {
        disposable.Dispose(); // unsubscribe event, all subscription **must** Dispose when completed
    }
}

It is similar to event, but decoupled by type as key. The return value of Subscribe is IDisposable, which makes it easier to unsubscribe than event. You can release many subscriptions at once by DisposableBag(CompositeDisposable). See the Managing Subscription and Diagnostics section for more details.

The publisher/subscriber(internally we called MessageBroker) is managed by DI, it is possible to have different broker for each scope. Also, all subscriptions are unsubscribed when the scope is disposed, which prevents subscription leaks.

Default is singleton, you can configure MessagePipeOptions.InstanceLifetime to Singleton or Scoped.

IPublisher<T>/ISubscriber<T> is keyless(type only) however MessagePipe has similar interface IPublisher<TKey, TMessage>/ISubscriber<TKey, TMessage> that is keyed(topic) interface.

For example, our real usecase, There is an application that connects Unity and MagicOnion (a real-time communication framework like SignalR) and delivers it via a browser by Blazor. At that time, we needed something to connect Blazor's page (Browser lifecycle) and MagicOnion's Hub (Connection lifecycle) to transmit data. We also need to distribute the connections by their IDs.

Browser <-> Blazor <- [MessagePipe] -> MagicOnion <-> Unity

We solved this with the following code.

// MagicOnion(similar as SignalR, realtime event framework for .NET and Unity)
public class UnityConnectionHub : StreamingHubBase<IUnityConnectionHub, IUnityConnectionHubReceiver>, IUnityConnectionHub
{
    readonly IPublisher<Guid, UnitEventData> eventPublisher;
    readonly IPublisher<Guid, ConnectionClose> closePublisher;
    Guid id;

    public UnityConnectionHub(IPublisher<Guid, UnitEventData> eventPublisher, IPublisher<Guid, ConnectionClose> closePublisher)
    {
        this.eventPublisher = eventPublisher;
        this.closePublisher = closePublisher;
    }

    override async ValueTask OnConnected()
    {
        this.id = Guid.Parse(Context.Headers["id"]);
    }

    override async ValueTask OnDisconnected()
    {
        this.closePublisher.Publish(id, new ConnectionClose()); // publish to browser(Blazor)
    }

    // called from Client(Unity)
    public Task<UnityEventData> SendEventAsync(UnityEventData data)
    {
        this.eventPublisher.Publish(id, data); // publish to browser(Blazor)
    }
}

// Blazor
public partial class BlazorPage : ComponentBase, IDisposable
{
    [Parameter]
    public Guid ID { get; set; }

    [Inject]
    ISubscriber<Guid, UnitEventData> UnityEventSubscriber { get; set; }

    [Inject]
    ISubscriber<Guid, ConnectionClose> ConnectionCloseSubscriber { get; set; }

    IDisposable subscription;

    protected override void OnInitialized()
    {
        // receive event from MagicOnion(that is from Unity)
        var d1 = UnityEventSubscriber.Subscribe(ID, x =>
        {
            // do anything...
        });

        var d2 = ConnectionCloseSubscriber.Subscribe(ID, _ =>
        {
            // show disconnected thing to view...
            subscription?.Dispose(); // and unsubscribe events.
        });

        subscription = DisposableBag.Create(d1, d2); // combine disposable.
    }
    
    public void Dispose()
    {
        // unsubscribe event when browser is closed.
        subscription?.Dispose();
    }
}

The main difference of Reactive Extensions' Subject is has no OnCompleted. OnCompleted may or may not be used, making it very difficult to determine the intent to the observer(subscriber). Also, we usually subscribe to multiple events from the same (different event type)publisher, and it is difficult to handle duplicate OnCompleted in that case. For this reason, MessagePipe only provides a simple Publish(OnNext). If you want to convey completion, please receive a separate event and perform dedicated processing there.

In other words, this is the equivalent of Relay in RxSwift.

In addition to standard Pub/Sub, MessagePipe supports async handlers, mediator patterns with handlers that accept return values, and filters for pre-and-post execution customization.

This image is a visualization of the connection between all those interfaces.

image

You may be confused by the number of interfaces, but many functions can be written with a similar, unified API.

Publish/Subscribe

Publish/Subscribe interface has keyed(topic) and keyless, sync and async interface.

// keyless-sync
public interface IPublisher<TMessage>
{
    void Publish(TMessage message);
}

public interface ISubscriber<TMessage>
{
    IDisposable Subscribe(IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
}

// keyless-async
public interface IAsyncPublisher<TMessage>
{
    // async interface's publish is fire-and-forget
    void Publish(TMessage message, CancellationToken cancellationToken = default(CancellationToken));
    ValueTask PublishAsync(TMessage message, CancellationToken cancellationToken = default(CancellationToken));
    ValueTask PublishAsync(TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default(CancellationToken));
}

public interface IAsyncSubscriber<TMessage>
{
    IDisposable Subscribe(IAsyncMessageHandler<TMessage> asyncHandler, params AsyncMessageHandlerFilter<TMessage>[] filters);
}

// keyed-sync
public interface IPublisher<TKey, TMessage>
    where TKey : notnull
{
    void Publish(TKey key, TMessage message);
}

public interface ISubscriber<TKey, TMessage>
    where TKey : notnull
{
    IDisposable Subscribe(TKey key, IMessageHandler<TMessage> handler, params MessageHandlerFilter<TMessage>[] filters);
}

// keyed-async
public interface IAsyncPublisher<TKey, TMessage>
    where TKey : notnull
{
    void Publish(TKey key, TMessage message, CancellationToken cancellationToken = default(CancellationToken));
    ValueTask PublishAsync(TKey key, TMessage message, CancellationToken cancellationToken = default(CancellationToken));
    ValueTask PublishAsync(TKey key, TMessage message, AsyncPublishStrategy publishStrategy, CancellationToken cancellationToken = default(CancellationToken));
}

public interface IAsyncSubscriber<TKey, TMessage>
    where TKey : notnull
{
    IDisposable Subscribe(TKey key, IAsyncMessageHandler<TMessage> asyncHandler, params AsyncMessageHandlerFilter<TMessage>[] filters);
}

All are available in the form of IPublisher/Subscribe<T> in the DI. async handler can await all subscribers completed by await PublishAsync. Asynchro

View on GitHub
GitHub Stars1.8k
CategoryDevelopment
Updated1d ago
Forks121

Languages

C#

Security Score

95/100

Audited on Apr 5, 2026

No findings