SkillAgentSearch skills...

PipelinR

PipelinR is a lightweight command processing pipeline ❍ ⇢ ❍ ⇢ ❍ for your Java awesome app.

Install / Use

/learn @sizovs/PipelinR

README

PipelinR

Build Status Test Coverage Maven Central Version libs.tech recommends

PipelinR is a lightweight command processing pipeline ❍ ⇢ ❍ ⇢ ❍ for your awesome Java app.

PipelinR has been battle-proven on production as a service layer for some serious FinTech apps. PipelinR has helped teams switch from giant service classes handling all use cases to small handlers, following the single responsibility principle. It's similar to a popular MediatR .NET library.

⚡ Tested and works with plain Java, Kotlin, Spring, and Jakarta EE.

Table of contents

How to use

PipelinR has no dependencies and weights only ~30kb. All you need is a single library:

Maven:

<dependency>
  <groupId>net.sizovs</groupId>
  <artifactId>pipelinr</artifactId>
  <version>0.11</version>
</dependency>

Gradle:

dependencies {
    compile 'net.sizovs:pipelinr:0.11'
}

Java version required: 1.8+.

Commands

Commands is a request that can return a value. The following Ping command returns a string:

class Ping implements Command<String> {
    public final String host;

    public Ping(String host) {
        this.host = host;
    }
}

If a command has nothing to return, use a built-in Voidy return type:

class Ping implements Command<Voidy> {
    public final String host;

    public Ping(String host) {
        this.host = host;
    }
}

Handlers

For every command you must define a Handler, that knows how to handle the command.

Create a handler by implementing Command.Handler<C, R> interface, where C is a command type and R is a return type. Handler's return type must match command's return type:

class Pong implements Command.Handler<Ping, String> {
    @Override
    public String handle(Ping command) {
        return "Pong from " + command.host;
    }
}

Pipeline

A pipeline mediates between commands and handlers. You send commands to the pipeline. When the pipeline receives a command, it sends the command through a sequence of middlewares and finally invokes the matching command handler. Pipelinr is a default implementation of Pipeline interface.

To construct a Pipeline, create an instance of Pipelinr and provide a list of command handlers:

Pipeline pipeline = new Pipelinr().with(() -> Stream.of(new Pong()));

Send a command for handling:

pipeline.send(new Ping("localhost"));

since v0.4, you can execute commands more naturally:

new Ping("localhost").execute(pipeline);

Pipelinr can receive an optional, ordered list of middlewares. Every command will go through the middlewares before being handled. Use middlewares to add extra behavior to command handlers, such as validation, logging, transactions, or metrics:

class LoggingMiddleware implements Command.Middleware {
    @Override
    public <R, C extends Command<R>> R invoke(C command, Next<R> next) {
        // log command
        R response = next.invoke();
        // log response
        return response;
    }
}

class TxMiddleware implements Command.Middleware {
    @Override
    public <R, C extends Command<R>> R invoke(C command, Next<R> next) {
        // start tx
        R response = next.invoke();
        // end tx
        return response;
    }
}

class ValidationMiddleware implements Command.Middleware {
   private final ObjectProvider<CommandValidator> validators; // requires Spring 5+. For older versions, use BeanFactory.

   ValidationMiddleware(ObjectProvider<CommandValidator> validators) {
      this.validators = validators;
    }

    @Override
    public <R, C extends Command<R>> R invoke(C command, Next<R> next) {
        validators.stream().filter(v -> v.matches(command)).findFirst().ifPresent(v -> v.validate(command));
        return next.invoke();
    }
}

interface CommandValidator<C extends Command<R>, R> {
    void validate(C command);

    default boolean matches(C command) {
        Generic<C> commandType = new Generic<C>(getClass()) { // since 0.10
        };

        return commandType.resolve().isAssignableFrom(command.getClass());
    }
}

In the following pipeline, every command will be logged, wrapped in a transaction, and validated (in that order):

Pipeline pipeline = new Pipelinr()
    .with(() -> Stream.of(new Pong()))
    .with(() -> Stream.of(new LoggingMiddleware(), new TxMiddleware(), new ValidationMiddleware(...)));

By default, command handlers are resolved using generics. By overriding command handler's matches method, you can dynamically select a matching handler:

class LocalhostPong implements Command.Handler<Ping, String> {
    @Override
    public boolean matches(Ping command) {
        return command.host.equals("localhost");
    }
}
class RemotePong implements Command.Handler<Ping, String> {
    @Override
    public boolean matches(Ping command) {
        return !command.host.equals("localhost");
    }
}

Notifications

Since version 0.5, PipelinR supports Notifications, dispatched to multiple handlers.

For notifications, first create your notification message:

class Ping implements Notification {}

Next, create zero or more handlers for your notification:

public class Pong1 implements Notification.Handler<Ping> {
    @Override
    public void handle(Ping notification) {
      System.out.printn("Pong 1");
    }
}

public class Pong2 implements Notification.Handler<Ping> {
    @Override
    public void handle(Ping notification) {
      System.out.printn("Pong 2");
    }
}

Finally, send notification to the pipeline:

new Ping().send(pipeline);

💡 Remember to provide notification handlers to PipelinR:

new Pipelinr().with(() -> Stream.of(new Pong1(), new Pong2()))

Notification middlewares

Notifications, like commands, support middlewares. Notification middlewares will run before every notification handler:

class MyNotificationMiddleware implements Notification.Middleware {
    @Override
    public <N extends Notification> void invoke(N notification, Next next) {
        // ... code that runs before invocation
        next.invoke();
        // ... code that runs after invocation
    }
}

new Pipelinr().with(() -> Stream.of(new MyNotificationMiddleware()))

Notification handling strategies

The default implementation loops through the notification handlers and awaits each one. This ensures each handler is run after one another.

Depending on your use-case for sending notifications, you might need a different strategy for handling the notifications, such running handlers in parallel.

PipelinR supports the following strategies:

  • an.awesome.pipelinr.StopOnException runs each notification handler after one another; returns when all handlers are finished or an exception has been thrown; in case of an exception, any handlers after that will not be run; this is a default strategy.
  • an.awesome.pipelinr.ContinueOnException runs each notification handler after one another; returns when all handlers are finished; in case of any exception(s), they will be captured in an AggregateException.
  • an.awesome.pipelinr.Async runs all notification handlers asynchronously; returns when all handlers are finished; in case of any exception(s), they will be captured in an AggregateException.
  • an.awesome.pipelinr.ParallelNoWait runs each notification handler in a thread pool; returns immediately and does not wait for any handlers to finish; cannot capture any exceptions.
  • an.awesome.pipelinr.ParallelWhenAny runs each notification handler in a thread pool; returns when any thread (handler) is finished; all exceptions that happened before returning are captured in an AggregateException.
  • an.awesome.pipelinr.ParallelWhenAll runs each notification handler in a thread pool; returns when all threads (handlers) are finished; in case of any exception(s), they are captured in an AggregateException.

You can override default strategy via:

new Pipelinr().with(new ContinueOnException());

Spring Example

PipelinR works well with Spring and Spring Boot.

Start by configuring a Pipeline. Create an instance of Pipelinr and inject all command handlers and ordered middlewares:

@Configuration
class PipelinrConfiguration {
    @Bean
    Pipeline pipeline(ObjectProvider<Command.Handler> commandHandlers, ObjectProvider<Notification.Handler> notificationHandlers, ObjectProvider<Command.Middleware> middlewares) {
      return new Pipelinr()
        .with(() -> commandHandlers.stream())
        .with(() -> notificationHandlers.stream())
        .with(() -> middlewares.orderedStream());
    }
}

Define a command:

class Wave implements Command<String> {}

Define a handler and annotate it with @Component annotation:

@Component
class WaveBack implements Command.Handler<Wave, String> {
    // ...
}

Optionally, define Order-ed middlewares:

@Component
@Order(1)
class Loggable implements Command.Middleware {
    // ...
}

@Component
@Order(2)
class Transactional implements Command.Middleware {
    // ...
}

To use notifications, define a notification:

View on GitHub
GitHub Stars490
CategoryDevelopment
Updated21h ago
Forks65

Languages

Java

Security Score

100/100

Audited on Mar 29, 2026

No findings