RxJavaExtensions
RxJava 2.x & 3.x extra sources, operators and components and ports of many 1.x companion libraries.
Install / Use
/learn @akarnokd/RxJavaExtensionsREADME
RxJavaExtensions
<a href='https://github.com/akarnokd/RxJavaExtensions/actions?query=workflow%3A%22Java+CI+with+Gradle%22'><img src='https://github.com/akarnokd/RxJavaExtensions/workflows/Java%20CI%20with%20Gradle/badge.svg'></a>
RxJava 3.x implementation of extra sources, operators and components and ports of many 1.x companion libraries.
Releases
gradle
dependencies {
implementation "com.github.akarnokd:rxjava3-extensions:3.1.1"
}
Javadoc: https://akarnokd.github.io/RxJavaExtensions/javadoc/index.html
Maven search:
Features
- Extra functional interfaces
- Mathematical operations over numerical sequences
- String operations
- Asynchronous jumpstarting a sequence
- Computational expressions
- Join patterns
- Debug support
- Custom Processors and Subjects
- FlowableProcessor utils
- Custom Schedulers
- Custom operators and transformers
- valve(), orderedMerge(), bufferWhile(),
- bufferUntil(), bufferSplit(), spanout(),
- mapFilter(), onBackpressureTimeout(), repeat(),
- repeatCallable(), every(), intervalBackpressure(),
- cacheLast(), timeoutLast(), timeoutLastAbsolute(),
- debounceFirst(), switchFlatMap(), flatMapSync(),
- flatMapAsync(), switchIfEmpty(),
- expand(), mapAsync(), filterAsync(),
- zipLatest(), coalesce(),
- windowWhile(), windowUntil(), windowSplit(),
- indexOf(), requestObserveOn(), requestSample()
- observeOnDrop(), observeOnLatest(), generateAsync(),
- partialCollect(), flatMapDrop(), flatMapLatest(),
- errorJump(), flatMap on signal type, switchOnFirst()
- Custom parallel operators and transformers
- Special Publisher implementations
- Custom consumers
Extra functional interfaces
Support the join-patterns and async-util with functional interfaces of consumers with 3-9 type arguments
and have functional interfaces of functions without the throws Exception.
SimpleCallable<T>-Callable<T>withoutthrows ExceptionConsumer3- 3 argumentConsumerConsumer4- 4 argumentConsumerConsumer5- 5 argumentConsumerConsumer6- 6 argumentConsumerConsumer7- 7 argumentConsumerConsumer8- 8 argumentConsumerConsumer9- 9 argumentConsumerPlainFunction-Functionwithoutthrows ExceptionPlainBiFunction-BiFunctionwithoutthrows ExceptionPlainFunction3-Function3withoutthrows ExceptionPlainFunction4-Function4withoutthrows ExceptionPlainFunction5-Function5withoutthrows ExceptionPlainFunction6-Function6withoutthrows ExceptionPlainFunction7-Function7withoutthrows ExceptionPlainFunction8-Function8withoutthrows ExceptionPlainFunction9-Function9withoutthrows Exception
Utility functions supporting these can be found in FunctionsEx class.
Mathematical operations over numerical sequences
Although most of the operations can be performed with reduce, these operators have lower overhead
as they cut out the reboxing of primitive intermediate values.
The following operations are available in MathFlowable for Flowable sequences and MathObservable in Observable
sequences:
averageDouble()averageFloat()max()min()sumDouble()sumFloat()sumInt()sumLong()
Example
MathFlowable.averageDouble(Flowable.range(1, 10))
.test()
.assertResult(5.5);
Flowable.just(5, 1, 3, 2, 4)
.to(MathFlowable::min)
.test()
.assertResult(1);
String operations
characters
The StringFlowable and StringObservable support streaming the characters of a CharSequence:
StringFlowable.characters("Hello world")
.map(v -> Characters.toLower((char)v))
.subscribe(System.out::print, Throwable::printStackTrace, System.out::println);
split
Splits an incoming sequence of Strings based on a Regex pattern within and between subsequent elements if necessary.
Flowable.just("abqw", "ercdqw", "eref")
.compose(StringFlowable.split("qwer"))
.test()
.assertResult("ab", "cd", "ef");
Flowable.just("ab", ":cde:" "fg")
.compose(StringFlowable.split(":"))
.test()
.assertResult("ab", "cde", "fg");
Asynchronous jumpstarting a sequence
Wrap functions and consumers into Flowables and Observables or into another layer of Functions.
Most of these can now be achieved via fromCallable and some function composition in plain RxJava.
start
Run a function or action once on a background thread and cache its result.
AtomicInteger counter = new AtomicInteger();
Flowable<Integer> source = AsyncFlowable.start(() -> counter.incrementAndGet());
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
toAsync
Call a function (with parameters) to call a function inside a Flowable/Observable with the same
parameter and have the result emitted by that Flowable/Observable from a background thread.
Function<Integer, Flowable<String>> func = AsyncFlowable.toAsync(
param -> "[" + param + "]"
);
func.apply(1)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult("[1]")
;
startFuture
Run a Supplier that returns a Future to call blocking get() on to get the solo value or exception.
ExecutorService exec = Executors.newSingleThreadedScheduler();
AsyncFlowable.startFuture(() -> exec.submit(() -> 1))
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
exec.shutdown();
deferFuture
Run a Supplier that returns a Future to call blocking get() on to get a Publisher to stream back.
ExecutorService exec = Executors.newSingleThreadedScheduler();
AsyncFlowable.startFuture(() -> exec.submit(() -> Flowable.range(1, 5)))
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5);
exec.shutdown();
forEachFuture
Consume a Publisher and have Future that completes when the consumption ends with onComplete or onError.
Future<Object> f = AsyncFlowable.forEachFuture(Flowable.range(1, 100), System.out::println);
f.get();
runAsync
Allows emitting multiple values through a Processor mediator from a background thread and allows disposing the sequence externally.
AsyncFlowable.runAsync(Schedulers.single(),
UnicastProcessor.<Object>create
