Rxbp
A back-pressured rxpy extension
Install / Use
/learn @MichaelSchneeberger/RxbpREADME
ReactiveX Backpressure Library
rxbp is a Python library that integrates backpressure into the Observable via Flowables.
Features
- Observable Pattern: built on the reactive programming model.
- Backpressure: enables memory-safe handling of fast data producers and slow consumers.
- Continuation certificate: ensures that the execution of a Flowable completes, avoiding any continuation deadlock.
- RxPY compatibility: interoperates with RxPY, bridging classic observables and backpressure-aware Flowables.
- Favor usability - Favor an implementation that is simple, safe, and user-friendly, while accepting some computational overhead.
Installation
You can install rxbp using pip:
pip install rxbp
Example
import rxbp
source = rxbp.from_iterable(("Alpha", "Beta", "Gamma", "Delta", "Epsilon"))
flowable = (
source
.map(lambda s: len(s))
.filter(lambda i: i >= 5)
.tap(on_next=lambda v: print(f'Received {v}'))
)
# execute the flowable
rxbp.run(flowable)
<!-- ## Run a Flowable
`result = rxbp.run(flowable)`
## Share a Flowable -->
Operations
Create a Flowable
connectable- create a Flowable whose source must be specified by theconnectionsargument when calling therunfunctioncount- create a Flowable emitting 0, 1, 2, ...create- creates a Flowable from a ContinuationMonadempty- create a Flowable emitting no itemserror- create a Flowable emitting an exceptionfrom_iterable(orfrom_) - create a Flowable that emits each element of an iterablefrom_value(orreturn_) - create a Flowable that emits a single elementfrom_rx- wrap a rx.Observable and exposes it as a Flowable, relaying signals in a backpressure-aware manner.interval- create a Flowable emitting an item after every time intervalrepeat- create a Flowable that repeats the given elementschedule_on- schedule task on a dedicated schedulersleep(ordelay) - schedule task on a dedicated scheduler after a relative time
Transforming operators
accumulate- apply an accumulator function over a Flowable sequence and returns each intermediate result.batch- gathers items into batches of provided sizeconcat_map- apply a function to each item emitted by the source and flatten the results sequentiallydefault_if_empty- emits a given value if the source completes without emitting anythingfilter- emit only those items for which the given predicate holdsfirst- emit the first element onlyflat_map- apply a function to each item emitted by the source and flattens the resultlast- emit last itemmap- map each element emitted by the source by applying the given functionreduce- apply an accumulator function over a Flowable sequence and emits a single elementrepeat- returns a Flowable that will resubscribe to the source when the source completesrepeat_first- return a Flowable that repeats the first element it receives from the source forever (until disposed).skip- skip the first n itemsskip_while- skip first items while the given predicate holdstake- take the first n itemstake_while- take items while the given predicate holdstap- used to perform side-effects for notifications from the source Flowableto_list- create a new Flowable that collects the items from the source sequence, and emits a single itemzip_with_index- zip each item emitted by the source with the enumerated index
Combining operators
merge- merge the items of the Flowable sequences into a single Flowablezip- Create a new Flowable from two Flowables by combining their item in pairs in a strict sequence
Other operators
share- share a Flowable to possibly multiple subscribers
Output functions
to_rx- create a rx Observable from a Observable
RxPY integration
A Flowable can be created from an RxPY Observable using the rxbp.from_rx function.
Likewise, a Flowable can be converted to an RxPY Observable using the rxbp.to_rx function.
The example below demonstrates the two conversion:
import reactivex as rx
import rxbp
rx_source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
# convert Observable to Flowable
source = rxbp.from_rx(rx_source)
flowable = (
source
.map(lambda s: len(s))
.filter(lambda i: i >= 5)
)
# convert Flowable to Observable
rxbp.to_rx(flowable).subscribe(lambda v: print(f"Received {v}"))
Reference
Below are some references related to this project:
- continuationmonad is a Python library that implements stack-safe continuations based on schedulers.
- RxPY is rx extension for Python implementing the Observable pattern (without backpressure).
Related Skills
node-connect
335.8kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
claude-opus-4-5-migration
82.7kMigrate prompts and code from Claude Sonnet 4.0, Sonnet 4.5, or Opus 4.1 to Opus 4.5
frontend-design
82.7kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
model-usage
335.8kUse CodexBar CLI local cost usage to summarize per-model usage for Codex or Claude, including the current (most recent) model or a full model breakdown. Trigger when asked for model-level usage/cost data from codexbar, or when you need a scriptable per-model summary from codexbar cost JSON.
