SkillAgentSearch skills...

Assembler

Assembler is a reactive data aggregation library for querying and merging data from multiple data sources/services. Assembler enables efficient implementation of the API Composition Pattern and is also designed to solve the N + 1 query problem. Architecture-agnostic, it can be used as part of a monolithic or microservice architecture.

Install / Use

/learn @pellse/Assembler

README

Assembler

Maven Central

Assembler is a reactive, functional, type-safe, and stateless data aggregation library for querying and merging data from multiple data sources/services. Assembler enables efficient implementation of the API Composition Pattern and is also designed to solve the N + 1 query problem in a data polyglot environment. Assembler is architecture-agnostic, making it versatile for use in monolithic or microservice architectures, implementing REST or GraphQL endpoints, stream processing, and other scenarios.

Internally, Assembler leverages Project Reactor to implement end-to-end reactive stream pipelines and maintain all the reactive stream properties as defined by the Reactive Manifesto, including responsiveness, resilience, elasticity, message-driven with back-pressure, non-blocking, and more.

See the demo app for a comprehensive project utilizing Assembler.

Check out this brief presentation for a walkthrough of the Assembler API for the real-time streaming example from the demo app, which integrates Assembler with Spring WebFlux and Spring GraphQL to implement real-time data composition of multiple data sources:

https://github.com/user-attachments/assets/5d9efa18-521f-4bcc-b6ec-5bb0d9ca3a59

You can also view the presentation here and go through each slide at your own speed.

Table of Contents

Use Cases

Assembler can be used in situations where an application needs to access data or functionality that is spread across multiple services. Some common use cases include:

  1. CQRS/Event Sourcing: Assembler can be used on the read side of a CQRS and Event Sourcing architecture to efficiently build materialized views that aggregate data from multiple sources.
  2. API Gateway: Assembler can be used in conjunction with an API Gateway, which acts as a single entry point for all client requests. The API Gateway can combine multiple APIs into a single, unified API, simplifying the client's interactions with the APIs and providing a unified interface for the client to use.
  3. Backends for Frontends: Assembler can also be used in conjunction with Backends for Frontends (BFFs). A BFF is a dedicated backend service that provides a simplified and optimized API specifically tailored for a particular client or group of clients.
  4. Reduce network overhead: By combining multiple APIs into a single API, Assembler can reduce the amount of network traffic required for a client to complete a task. This can improve the performance of the client application and reduce the load on the server.
  5. Solve the N + 1 Query Problem: Assembler can solve the N + 1 query problem by allowing a client to make a single request to a unified API that includes all the necessary data. This approach reduces the number of requests required and database queries, further optimizing the application's performance.

:arrow_up:

Basic Usage

Here is an example of how to use Assembler to generate transaction information from a list of customers of an online store. This example assumes the following fictional data model and API to access different services:

public record Customer(Long customerId, String name) {}
public record BillingInfo(Long id, Long customerId, String creditCardNumber) {}
public record OrderItem(String id, Long customerId, String orderDescription, Double price) {}
public record Transaction(Customer customer, BillingInfo billingInfo, List<OrderItem> orderItems) {}
classDiagram
    direction LR

    class Customer {
        Long customerId
        String name
    }

    class BillingInfo {
        Long id
        Long customerId
        String creditCardNumber
    }

    class OrderItem {
        String id
        Long customerId
        String orderDescription
        Double price
    }

    class Transaction {
        Customer customer
        BillingInfo billingInfo
        List~OrderItem~ orderItems
    }

    Transaction o-- Customer
    Transaction o-- BillingInfo
    Transaction o-- OrderItem
    BillingInfo --> Customer : customerId
    OrderItem --> Customer : customerId
Flux<Customer> getCustomers(); // e.g. call to a microservice or a Flux connected to a Kafka source
Flux<BillingInfo> getBillingInfo(List<Long> customerIds); // e.g. connects to relational database (R2DBC)
Flux<OrderItem> getAllOrders(List<Long> customerIds); // e.g. connects to MongoDB

In cases where the getCustomers() method returns a substantial number of customers, retrieving the associated BillingInfo for each customer would require an additional call per customerId. This would result in a considerable increase in network calls, causing the N + 1 queries issue. To mitigate this, we can retrieve all the BillingInfo for all the customers returned by getCustomers() with a single additional call. The same approach can be used for retrieving OrderItem information.

As we are working with three distinct and independent data sources, the process of joining data from Customer, BillingInfo, and OrderItem into a Transaction must be performed at the application level. This is the primary objective of Assembler.

When utilizing the Assembler, the aggregation of multiple reactive data sources and the implementation of the API Composition Pattern can be accomplished as follows:

import reactor.core.publisher.Flux;
import io.github.pellse.assembler.Assembler;

import static io.github.pellse.assembler.AssemblerBuilder.assemblerOf;
import static io.github.pellse.assembler.RuleMapper.oneToMany;
import static io.github.pellse.assembler.RuleMapper.oneToOne;
import static io.github.pellse.assembler.RuleMapperSource.call;
import static io.github.pellse.assembler.Rule.rule;

Assembler<Customer, Transaction> assembler = assemblerOf(Transaction.class)
  .withCorrelationIdResolver(Customer::customerId)
  .withRules(
    rule(BillingInfo::customerId, oneToOne(call(this::getBillingInfo))),
    rule(OrderItem::customerId, oneToMany(OrderItem::id, call(this::getAllOrders))),
    Transaction::new)
  .build();

Flux<Transaction> transactionFlux = assembler.assemble(getCustomers());

The code snippet above demonstrates the process of first retrieving all customers, followed by the concurrent retrieval of all billing information and orders (in a single query) associated with the previously retrieved customers, as defined by the Assembler rules. The final step involves aggregating each customer, their respective billing information, and list of order items (related by the same customer id) into a Transaction object. This results in a reactive stream (Flux) of Transaction objects.

:arrow_up:

Default values for missing data

To provide a default value for each missing values from the result of the API call, a factory function can also be supplied as a 2nd parameter to the oneToOne() function. For example, when getCustomers() returns 3 Customer [C1, C2, C3], and getBillingInfo([ID1, ID2, ID3]) returns only 2 associated BillingInfo [B1, B2], the missing value B3 can be generated as a default value. By doing so, a null BillingInfo is never passed to the Transaction constructor:

rule(BillingInfo::customerId, oneToOne(call(this::getBillingInfo), customerId -> createDefaultBillingInfo(customerId)))

or more concisely:

rule(BillingInfo::customerId, oneToOne(call(this::getBillingInfo), this::createDefaultBillingInfo))

Unlike the oneToOne() function, oneToMany() will always default to generating an empty collection. Therefore, providing a default factory function is not needed. In the example above, an empty List<OrderItem> is passed to the Transaction constructor if getAllOrders([1, 2, 3]) returns null.

:arrow_up:

Infinite Stream of Data

In situations where an infinite or very large stream of data is being handled, such as dealing with 100,000+ customers, Assembler needs to completely drain the upstream from getCustomers() to gather all correlation IDs (customerId). This can lead to resource exhaustion if not handled correctly. To mitigate this issue, the stream can be split into multiple smaller streams and processed in batches. Most reactive libraries already support this concept. Below is an example of this approach, utilizing Project Reactor:

Flux<Transaction> transactionFlux = getCustomers()
  .windowTimeout(100, ofSeconds(5))
  .flatMapSequential(assembler::assemble);

:arrow_up:

ID

Related Skills

View on GitHub
GitHub Stars130
CategoryDesign
Updated4d ago
Forks17

Languages

Java

Security Score

100/100

Audited on Mar 22, 2026

No findings