SkillAgentSearch skills...

Aioreactive

Async/await reactive tools for Python 3.10+

Install / Use

/learn @dbrattli/Aioreactive
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<img src="logo/logo.jpg" alt="drawing" width="200"/>

aioreactive - ReactiveX for asyncio using async and await

PyPI Python package Publish Package codecov

NEWS: Project rebooted Nov. 2020. Rebuilt using Expression.

Aioreactive is RxPY for asyncio. It's an asynchronous and reactive Python library for asyncio using async and await. Aioreactive is built on the Expression functional library and, integrates naturally with the Python language.

aioreactive is the unification of RxPY and reactive programming with asyncio using async and await.

The design goals for aioreactive

  • Python 3.10+ only. We have a hard dependency Expression v5.
  • All operators and tools are implemented as plain old functions.
  • Everything is async. Sending values is async, subscribing to observables is async. Disposing subscriptions is async.
  • One scheduler to rule them all. Everything runs on the asyncio base event-loop.
  • No multi-threading. Only async and await with concurrency using asyncio. Threads are hard, and in many cases it doesn’t make sense to use multi-threading in Python applications. If you need to use threads you may wrap them with concurrent.futures and compose them into the chain with flat_map() or similar. See parallel.py for an example.
  • Simple, clean and use few abstractions. Try to align with the itertools package, and reuse as much from the Python standard library as possible.
  • Support type hints and static type checking using Pylance.
  • Implicit synchronous back-pressure ™. Producers of events will simply be awaited until the event can be processed by the down-stream consumers.

AsyncObservable and AsyncObserver

With aioreactive you subscribe observers to observables, and the key abstractions of aioreactive can be seen in this single line of code:

subscription = await observable.subscribe_async(observer)

The difference from RxPY can be seen with the await expression. Aioreactive is built around the asynchronous duals, or opposites of the AsyncIterable and AsyncIterator abstract base classes. These async classes are called AsyncObservable and AsyncObserver.

AsyncObservable is a producer of events. It may be seen as the dual or opposite of AsyncIterable and provides a single setter method called subscribe_async() that is the dual of the __aiter__() getter method:

from abc import ABC, abstractmethod

class AsyncObservable(ABC):
    @abstractmethod
    async def subscribe_async(self, observer):
        return NotImplemented

AsyncObserver is a consumer of events and is modeled after the so-called consumer interface, the enhanced generator interface in PEP-342 and async generators in PEP-525. It is the dual of the AsyncIterator __anext__() method, and expands to three async methods asend(), that is the opposite of __anext__(), athrow() that is the opposite of an raise Exception() and aclose() that is the opposite of raise StopAsyncIteration:

from abc import ABC, abstractmethod

class AsyncObserver(ABC):
    @abstractmethod
    async def asend(self, value):
        return NotImplemented

    @abstractmethod
    async def athrow(self, error):
        return NotImplemented

    @abstractmethod
    async def aclose(self):
        return NotImplemented

Subscribing to observables

An observable becomes hot and starts streaming items by using the subscribe_async() method. The subscribe_async() method takes an observable and returns a disposable subscription. So the subscribe_async() method is used to attach a observer to the observable.

async def asend(value):
    print(value)

disposable = await subscribe_async(source, AsyncAnonymousObserver(asend))

AsyncAnonymousObserver is an anonymous observer that constructs an AsyncObserver out of plain async functions, so you don't have to implement a new named observer every time you need one.

The subscription returned by subscribe_async() is disposable, so to unsubscribe you need to await the dispose_async() method on the subscription.

await subscription.dispose_async()

Asynchronous iteration

Even more interesting, with to_async_iterable you can flip around from AsyncObservable to an AsyncIterable and use async-for to consume the stream of events.

import aioreactive as rx

xs = rx.from_iterable([1, 2, 3])
async for x in xs:
    print(x)

They effectively transform us from an async push model to an async pull model, and lets us use the awesome new language features such as async for and async-with. We do this without any queueing, as a push by the AsyncObservable will await the pull by the `AsyncIterator. This effectively applies so-called "back-pressure" up the subscription as the producer will await the iterator to pick up the item send.

The for-loop may be wrapped with async-with to control the lifetime of the subscription:

import aioreactive as rx

xs = rx.from_iterable([1, 2, 3])
result = []

obv = rx.AsyncIteratorObserver(xs)
async with await xs.subscribe_async(obv) as subscription:
    async for x in obv:
        result.append(x)

assert result == [1, 2, 3]

Async streams

An async stream is both an async observer and an async observable. Aioreactive lets you create streams explicitly.

import aioreactive as rx

stream = AsyncSubject()  # Alias for AsyncMultiStream

sink = rx.AsyncAnonymousObserver()
await stream.subscribe_async(sink)
await stream.asend(42)

You can create streams directly from AsyncMultiStream or AsyncSingleStream. AsyncMultiStream supports multiple observers, and is hot in the sense that it will drop any event that is sent if there are currently no observers attached. AsyncSingleStream on the other hand supports a single observer, and is cold in the sense that it will await any producer until there is an observer attached.

Operators

The Rx operators in aioreactive are all plain old functions. You can apply them to an observable and compose it into a transformed, filtered, aggregated or combined observable. This transformed observable can be streamed into an observer.

Observable -> Operator -> Operator -> Operator -> Observer

Aioreactive contains many of the same operators as you know from RxPY. Our goal is not to implement them all, but to provide the most essential ones.

  • concat -- Concatenates two or more observables.
  • choose -- Filters and/or transforms the observable.
  • choose_asnc -- Asynchronously filters and/or transforms the observable.
  • debounce -- Throttles an observable.
  • delay -- delays the items within an observable.
  • distinct_until_changed -- an observable with continuously distinct values.
  • filter -- filters an observable.
  • filteri -- filters an observable with index.
  • flat_map -- transforms an observable into a stream of observables and flattens the resulting observable.
  • flat_map_latest -- transforms an observable into a stream of observables and flattens the resulting observable by producing values from the latest observable.
  • from_iterable -- Create an observable from an (async) iterable.
  • subscribe -- Subscribes an observer to an observable. Returns a subscription.
  • map -- transforms an observable.
  • mapi -- transforms an observable with index.
  • map_async -- transforms an observable asynchronously.
  • mapi_async -- transforms an observable asynchronously with index.
  • merge_inner -- Merges an observable of observables.
  • merge -- Merge one observable with another observable.
  • merge_seq -- Merge a sequence of observables.
  • run -- Awaits the future returned by subscribe. Returns when the subscription closes.
  • slice -- Slices an observable.
  • skip -- Skip items from the start of the observable stream.
  • skip_last -- Skip items from the end of the observable stream.
  • starfilter -- Filters an observable with a predicate and spreads the arguments.
  • starmap -- Transforms and async observable and spreads the arguments to the mapper.
  • switch_latest -- Merges the latest stream in an observable of streams.
  • take -- Take a number of items from the start of the observable stream.
  • take_last -- Take a number of items from the end of the observable stream.
  • unit -- Converts a value or future to an observable.
  • with_latest_from -- Combines two observables into one.

Functional or object-oriented, reactive or interactive

With aioreactive you can choose to program functionally with plain old functions, or object-oriented with classes and methods. Aioreactive supports both method chaining or forward pipe progr

Related Skills

View on GitHub
GitHub Stars400
CategoryDevelopment
Updated7d ago
Forks25

Languages

Python

Security Score

100/100

Audited on Mar 31, 2026

No findings