SkillAgentSearch skills...

RxJavaExtensions

RxJava 2.x & 3.x extra sources, operators and components and ports of many 1.x companion libraries.

Install / Use

/learn @akarnokd/RxJavaExtensions
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

RxJavaExtensions

<a href='https://github.com/akarnokd/RxJavaExtensions/actions?query=workflow%3A%22Java+CI+with+Gradle%22'><img src='https://github.com/akarnokd/RxJavaExtensions/workflows/Java%20CI%20with%20Gradle/badge.svg'></a> codecov.io Maven Central Maven Central

RxJava 3.x implementation of extra sources, operators and components and ports of many 1.x companion libraries.

Releases

gradle

dependencies {
    implementation "com.github.akarnokd:rxjava3-extensions:3.1.1"
}

Javadoc: https://akarnokd.github.io/RxJavaExtensions/javadoc/index.html

Maven search:

http://search.maven.org

Features

Extra functional interfaces

Support the join-patterns and async-util with functional interfaces of consumers with 3-9 type arguments and have functional interfaces of functions without the throws Exception.

  • SimpleCallable<T> - Callable<T> without throws Exception
  • Consumer3 - 3 argument Consumer
  • Consumer4 - 4 argument Consumer
  • Consumer5 - 5 argument Consumer
  • Consumer6 - 6 argument Consumer
  • Consumer7 - 7 argument Consumer
  • Consumer8 - 8 argument Consumer
  • Consumer9 - 9 argument Consumer
  • PlainFunction - Function without throws Exception
  • PlainBiFunction - BiFunction without throws Exception
  • PlainFunction3 - Function3 without throws Exception
  • PlainFunction4 - Function4 without throws Exception
  • PlainFunction5 - Function5 without throws Exception
  • PlainFunction6 - Function6 without throws Exception
  • PlainFunction7 - Function7 without throws Exception
  • PlainFunction8 - Function8 without throws Exception
  • PlainFunction9 - Function9 without throws Exception

Utility functions supporting these can be found in FunctionsEx class.

Mathematical operations over numerical sequences

Although most of the operations can be performed with reduce, these operators have lower overhead as they cut out the reboxing of primitive intermediate values.

The following operations are available in MathFlowable for Flowable sequences and MathObservable in Observable sequences:

  • averageDouble()
  • averageFloat()
  • max()
  • min()
  • sumDouble()
  • sumFloat()
  • sumInt()
  • sumLong()

Example

MathFlowable.averageDouble(Flowable.range(1, 10))
.test()
.assertResult(5.5);

Flowable.just(5, 1, 3, 2, 4)
.to(MathFlowable::min)
.test()
.assertResult(1);

String operations

characters

The StringFlowable and StringObservable support streaming the characters of a CharSequence:

StringFlowable.characters("Hello world")
.map(v -> Characters.toLower((char)v))
.subscribe(System.out::print, Throwable::printStackTrace, System.out::println);

split

Splits an incoming sequence of Strings based on a Regex pattern within and between subsequent elements if necessary.

Flowable.just("abqw", "ercdqw", "eref")
.compose(StringFlowable.split("qwer"))
.test()
.assertResult("ab", "cd", "ef");

Flowable.just("ab", ":cde:" "fg")
.compose(StringFlowable.split(":"))
.test()
.assertResult("ab", "cde", "fg");

Asynchronous jumpstarting a sequence

Wrap functions and consumers into Flowables and Observables or into another layer of Functions. Most of these can now be achieved via fromCallable and some function composition in plain RxJava.

start

Run a function or action once on a background thread and cache its result.

AtomicInteger counter = new AtomicInteger();

Flowable<Integer> source = AsyncFlowable.start(() -> counter.incrementAndGet());

source.test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1);

source.test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1);

toAsync

Call a function (with parameters) to call a function inside a Flowable/Observable with the same parameter and have the result emitted by that Flowable/Observable from a background thread.


Function<Integer, Flowable<String>> func = AsyncFlowable.toAsync(
    param -> "[" + param + "]"
);

func.apply(1)
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult("[1]")
;

startFuture

Run a Supplier that returns a Future to call blocking get() on to get the solo value or exception.

ExecutorService exec = Executors.newSingleThreadedScheduler();

AsyncFlowable.startFuture(() -> exec.submit(() -> 1))
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1);
    
exec.shutdown();

deferFuture

Run a Supplier that returns a Future to call blocking get() on to get a Publisher to stream back.

ExecutorService exec = Executors.newSingleThreadedScheduler();

AsyncFlowable.startFuture(() -> exec.submit(() -> Flowable.range(1, 5)))
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1, 2, 3, 4, 5);
    
exec.shutdown();

forEachFuture

Consume a Publisher and have Future that completes when the consumption ends with onComplete or onError.

Future<Object> f = AsyncFlowable.forEachFuture(Flowable.range(1, 100), System.out::println);

f.get();

runAsync

Allows emitting multiple values through a Processor mediator from a background thread and allows disposing the sequence externally.

AsyncFlowable.runAsync(Schedulers.single(),
        UnicastProcessor.<Object>create
View on GitHub
GitHub Stars684
CategoryDevelopment
Updated8d ago
Forks51

Languages

Java

Security Score

100/100

Audited on Mar 23, 2026

No findings