SkillAgentSearch skills...

R3

The new future of dotnet/reactive and UniRx.

Install / Use

/learn @Cysharp/R3
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

R3

The new future of dotnet/reactive and UniRx, which support many platforms including Unity, Godot, Avalonia, WPF, WinForms, WinUI3, Stride, LogicLooper, MAUI, MonoGame, Blazor, Uno.

I have over 10 years of experience with Rx, experience in implementing a custom Rx runtime (UniRx) for game engine, and experience in implementing an asynchronous runtime (UniTask) for game engine. Based on those experiences, I came to believe that there is a need to implement a new Reactive Extensions for .NET, one that reflects modern C# and returns to the core values of Rx.

  • Stopping the pipeline at OnError is a mistake.
  • IScheduler is the root of poor performance.
  • Frame-based operations, a missing feature in Rx, are especially important in game engines.
  • Single asynchronous operations should be entirely left to async/await.
  • Synchronous APIs should not be implemented.
  • Query syntax is a bad notation except for SQL.
  • The Necessity of a subscription list to prevent subscription leaks (similar to a Parallel Debugger)
  • Backpressure should be left to IAsyncEnumerable and Channels.
  • For distributed processing and queries, there are GraphQL, Kubernetes, Orleans, Akka.NET, gRPC, MagicOnion.

In other words, LINQ is not for EveryThing, and we believe that the essence of Rx lies in the processing of in-memory messaging (LINQ to Events), which will be our focus. We are not concerned with communication processes like Reactive Streams.

To address the shortcomings of dotnet/reactive, we have made changes to the core interfaces. In recent years, Rx-like frameworks optimized for language features, such as Kotlin Flow and Swift Combine, have been standardized. C# has also evolved significantly, now at C# 12, and we believe there is a need for an Rx that aligns with the latest C#.

Improving performance was also a theme in the reimplementation. For example, this is the result of the terrible performance of IScheduler and the performance difference caused by its removal.

image Observable.Range(1, 10000).Subscribe()

You can also see interesting results in allocations with the addition and deletion to Subject.

image x10000 subject.Subscribe() -> x10000 subscription.Dispose()

This is because dotnet/reactive has adopted ImmutableArray (or its equivalent) for Subject, which results in the allocation of a new array every time one is added or removed. Depending on the design of the application, a large number of subscriptions can occur (we have seen this especially in the complexity of games), which can be a critical issue. In R3, we have devised a way to achieve high performance while avoiding ImmutableArray.

For those interested in learning more about the implementation philosophy and comparisons, please refer to my blog article R3 — A New Modern Reimplementation of Reactive Extensions for C#.

Core Interface

This library is distributed via NuGet packages/R3, supporting .NET Standard 2.0, .NET Standard 2.1, .NET 6(.NET 7) and .NET 8 or above.

dotnet add package R3

Some platforms(WPF, Avalonia, Unity, Godot, etc...) requires additional step to install. Please see Platform Supports section in below.

R3 code is mostly the same as standard Rx. Make the Observable via factory methods(Timer, Interval, FromEvent, Subject, etc...) and chain operator via LINQ methods. Therefore, your knowledge about Rx and documentation on Rx can be almost directly applied. If you are new to Rx, the ReactiveX website and Introduction to Rx.NET would be useful resources for reference.

using R3;

var subscription = Observable.Interval(TimeSpan.FromSeconds(1))
    .Select((_, i) => i)
    .Where(x => x % 2 == 0)
    .Subscribe(x => Console.WriteLine($"Interval:{x}"));

var cts = new CancellationTokenSource();
_ = Task.Run(() => { Console.ReadLine(); cts.Cancel(); });

await Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3))
    .TakeUntil(cts.Token)
    .ForEachAsync(x => Console.WriteLine($"Timer"));

subscription.Dispose();

The surface API remains the same as normal Rx, but the interfaces used internally are different and are not IObservable<T>/IObserver<T>.

IObservable<T> being the dual of IEnumerable<T> is a beautiful definition, but it was not very practical in use.

public abstract class Observable<T>
{
    public IDisposable Subscribe(Observer<T> observer);
}

public abstract class Observer<T> : IDisposable
{
    public void OnNext(T value);
    public void OnErrorResume(Exception error);
    public void OnCompleted(Result result); // Result is (Success | Failure)
}

The biggest difference is that in normal Rx, when an exception occurs in the pipeline, it flows to OnError and the subscription is unsubscribed, but in R3, it flows to OnErrorResume and the subscription is not unsubscribed.

I consider the automatic unsubscription by OnError to be a bad design for event handling. It's very difficult and risky to resolve it within an operator like Retry, and it also led to poor performance (there are many questions and complex answers about stopping and resubscribing all over the world). Also, converting OnErrorResume to OnError(OnCompleted(Result.Failure)) is easy and does not degrade performance, but the reverse is impossible. Therefore, the design was changed to not stop by default and give users the choice to stop.

Since the original Rx contract was OnError | OnCompleted, it was changed to OnCompleted(Result result) to consolidate into one method. Result is a readonly struct with two states: Success() | Failure(Exception).

The reason for changing to an abstract class instead of an interface is that Rx has implicit complex contracts that interfaces do not guarantee. By making it an abstract class, we fully controlled the behavior of Subscribe, OnNext, and Dispose. This made it possible to manage the list of all subscriptions and prevent subscription leaks.

image

Subscription leaks are a common problem in applications with long lifecycles, such as GUIs or games. Tracking all subscriptions makes it easy to prevent leaks.

Internally, when subscribing, an Observer is always linked to the target Observable and doubles as a Subscription. This ensures that Observers are reliably connected from top to bottom, making tracking certain and clear that they are released on OnCompleted/Dispose. In terms of performance, because the Observer itself always becomes a Subscription, there is no need for unnecessary IDisposable allocations.

TimeProvider instead of IScheduler

In traditional Rx, IScheduler was used as an abstraction for time-based processing, but in R3, we have discontinued its use and instead opted for the TimeProvider introduced in .NET 8. For example, the operators are defined as follows:

public static Observable<Unit> Interval(TimeSpan period, TimeProvider timeProvider);
public static Observable<T> Delay<T>(this Observable<T> source, TimeSpan dueTime, TimeProvider timeProvider)
public static Observable<T> Debounce<T>(this Observable<T> source, TimeSpan timeSpan, TimeProvider timeProvider) // same as Throttle in dotnet/reactive

Originally, IScheduler had performance issues, and the internal implementation of dotnet/reactive was peppered with code that circumvented these issues using PeriodicTimer and IStopwatch, leading to unnecessary complexity. These can be better expressed with TimeProvider (TimeProvider.CreateTimer(), TimeProvider.GetTimestamp()).

While TimeProvider is an abstraction for asynchronous operations, excluding the Fake for testing purposes, IScheduler included synchronous schedulers like ImmediateScheduler and CurrentThreadScheduler. However, these were also meaningless as applying them to time-based operators would cause blocking, and CurrentThreadScheduler had poor performance.

image Observable.Range(1, 10000).Subscribe()

In R3, anything that requires synchronous execution (like Range) is treated as Immediate, and everything else is considered asynchronous and handled through TimeProvider.

As for the implementation of TimeProvider, the standard TimeProvider.System using the ThreadPool is the default. For unit testing, FakeTimeProvider (Microsoft.Extensions.TimeProvider.Testing) is available. Additionally, many TimeProvider implementations are provided for different platforms, such as DispatcherTimeProvider for WPF and UpdateTimeProvider for Unity, enhancing ease of use tailored to each platform.

Frame based operations

In GUI applications, there's the message loop,

Related Skills

View on GitHub
GitHub Stars3.7k
CategoryDevelopment
Updated21h ago
Forks163

Languages

C#

Security Score

95/100

Audited on Mar 22, 2026

No findings