Rx
Reactive Extension for Go
Install / Use
/learn @reactivego/RxREADME
rx
import "github.com/reactivego/rx"
Package rx provides Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to process streams of data seamlessly.
Prerequisites
You’ll need Go 1.23 or later, as the implementation depends on language support for generics and iterators.
Observables
In rx, an Observables represents a stream of data that can emit items over time, while an Observer subscribes to it to receive and react to those emissions. This reactive approach enables asynchronous and concurrent operations without blocking execution. Instead of waiting for values to become available, an observer passively listens and responds whenever the observable emits data, errors, or a completion signal.
This page introduces the reactive pattern, explaining what Observables and Observers are and how subscriptions work. Other sections explore the powerful set of Operators that allow you to transform, combine, and control data streams efficiently.
An Observable:
- is a stream of events.
- assumes zero to many values over time.
- pushes values
- can take any amount of time to complete (or may never)
- is cancellable
- is lazy (it doesn't do anything until you subscribe).
Example
package main
import "github.com/reactivego/x"
func main() {
x.From[any](1,"hi",2.3).Println().Wait()
}
Note the program creates a mixed type
anyobservable from an int, string and a float64.
Output
1
hi
2.3
Example
package main
import "github.com/reactivego/rx"
func main() {
rx.From(1,2,3).Println().Wait()
}
Note the program uses inferred type
intfor the observable.
Output
1
2
3
Observables in rx offer several advantages over standard Go channels:
Hot vs Cold Observables
-
Hot Observables emit values regardless of subscription status. Like a live broadcast, any values emitted when no subscribers are listening are permanently missed. Examples include system events, mouse movements, or real-time data feeds.
-
Cold Observables begin emission only when subscribed to, ensuring subscribers receive the complete data sequence from the beginning. Examples include file contents, database queries, or HTTP requests that are executed on-demand.
Rich Lifecycle Management
Observables offer comprehensive lifecycle handling. They can complete normally, terminate with errors, or continue indefinitely. Subscriptions provide fine-grained control, allowing subscribers to cancel at any point, preventing resource leaks and unwanted processing.
Time-Varying Data Model
Unlike traditional variables that represent static values, Observables elegantly model how values evolve over time. They represent the entire progression of a value's state changes, not just its current state, making them ideal for reactive programming paradigms.
Native Concurrency Support
Concurrency is built into the Observable paradigm. Each Observable conceptually operates as an independent process that asynchronously pushes values to subscribers. This approach naturally aligns with concurrent programming models while abstracting away much of the complexity typically associated with managing concurrent operations.
Operators
Operators form a language for expressing programs with Observables. They transform, filter, and combine one or more Observables into new Observables, allowing for powerful data stream processing. Each operator performs a specific function in the reactive pipeline, enabling you to compose complex asynchronous workflows through method chaining.
Index
All converts an Observable stream into a Go 1.22+ iterator sequence that provides each emitted value paired with its sequential zero-based index
Append creates a pipe that appends emitted values to a provided slice while forwarding them to the next observer, with a method variant available for chaining.
AsObservable provides type conversion between observables, allowing you to safely cast an Observable of one type to another, and to convert a typed Observable to an Observable of 'any' type (and vice versa).
AsObserver converts an Observer of type any to an Observer of a specific type T.
Assign stores each emitted value from an Observable into a provided pointer variable while passing all emissions through to the next observer, enabling value capture during stream processing.
AutoConnect makes a (Connectable) Multicaster behave like an ordinary Observable that automatically connects the mullticaster to its source when the specified number of observers have subscribed to it.
Catch recovers from an error notification by continuing the sequence without emitting the error but switching to the catch ObservableInt to provide items.
CatchError catches errors on the Observable to be handled by returning a new Observable or throwing error.
CombineAll
CombineLatest combines multiple Observables into one by emitting an array containing the latest values from each source whenever any input Observable emits a value, with variants (CombineLatest2, CombineLatest3, CombineLatest4, CombineLatest5) that return strongly-typed tuples for 2-5 input Observables respectively.
Concat combines multiple Observables sequentially by emitting all values from the first Observable before proceeding to the next one, ensuring emissions never overlap.
ConcatAll transforms a higher-order Observable (an Observable that emits other Observables) into a first-order Observable by subscribing to each inner Observable only after the previous one completes.
ConcatMap projects each source value to an Observable, subscribes to it, and emits its values, waiting for each one to complete before processing the next source value.
ConcatWith extends an Observable by appending additional Observables, ensuring that emissions from each Observable only begin after the previous one completes.
Connectable is an Observable with delayed connection to its source, combining both Observable and Connector interfaces. It separates the subscription process into two parts: observers can register via Subscribe, but the Observable won't subscribe to its source until Connect is explicitly called. This enables multiple observers to subscribe before any emissions begin (multicast behavior), allowing a single source Observable to be efficiently shared among multiple consumers. Besides inheriting all methods from Observable and Connector, Connectable provides the convenience methods AutoConnect and RefCount to manage connection behavior.
Connect establishes a connection to the source Observable and returns a Subscription that can be used to cancel the connection when no longer needed.
Connector provides a mechanism for controlling when a Connectable Observable subscribes to its source, allowing you to connect the Observable independently from when observers subscribe to it. This separation enables multiple subscribers to prepare their subscriptions before the source begins emitting items. It has a single method Connect.
Constraints type constraints Signed, Unsigned, Integer and Float copied verbatim from golang.org/x/exp so we could drop the dependency on that package.
Count returns an Observable that emits a single value representing the total number of items emitted by the source Observable before it completes.
Create constructs a new Observable from a Creator function, providing a bridge between imperative code and the reactive Observable pattern. The Observable will continue producing values until the Creator signals completion, the Observer unsubscribes, or the Creator returns an error.
Creator is a function type that generates values for an Observable stream. It receives a zero-based index for the current iteration and returns a tuple containing the next value to emit, any error that occurred, and a boolean flag indicating whether the sequence is complete.
Defer
Delay
DistinctUntilChanged only emits when the current value is different from the last.
Do calls a function for each next value passing through the observable.
ElementAt emit only item n emitted by an Observable.
Empty creates an Observable that emits no items but terminates normally.
EndWith
Equal
Err
ExhaustAll
ExhaustMap
Filter emits only those items from an observable that pass a predicate test.
First emits only the first item from an Observable.
Fprint
Fprintf
Fprintln
From creates an observable from multiple values passed in.
Go subscribes to the observable and starts execution on a separate goroutine, ignoring all emissions from the observable sequence. This makes it useful when you only care about side effects and not the actual values. Returns a Subscription that can be used to cancel the subscription when no longer needed.
Ignore[T] creates
