SkillAgentSearch skills...

Conseq4j

A Java concurrent API to asynchronously execute related tasks sequentially, and unrelated tasks concurrently.

Install / Use

/learn @q3769/Conseq4j

README

Maven Central

conseq4j

A Java concurrent API to asynchronously execute related tasks sequentially, and unrelated tasks concurrently.

  • conseq is short for concurrent sequencer.

User stories

  1. As an API client, I want to summon a sequential task executor by a sequence key, so that all the tasks sequentially submitted under the same sequence key will be executed by the same executor in the same order as submitted; meanwhile, the tasks with different sequence keys can be executed concurrently by different executors even when submitted sequentially.
  2. As an API client, I want to asynchronously submit a task for execution together with a sequence key, so that, across all such submissions, tasks submitted sequentially under the same sequence key are executed in the same order as submitted; meanwhile, tasks of different sequence keys are executed concurrently even when submitted sequentially.

Consider using conseq4j to achieve asynchronous concurrent processing globally while preserving meaningful local execution order at the same time.

Prerequisite

  • Java 8+ for versions before 20230922.20230925.0 (exclusive)
  • Java 21+ for versions after 20230922.20230925.0 (inclusive)

Get it...

Maven Central

Install as a compile-scope dependency in Maven or other build tools alike.


<dependency>
   <groupId>io.github.q3769</groupId>
   <artifactId>conseq4j</artifactId>
   <version>...</version>
</dependency>

Use it...

Some general notes:

  • Sequence keys

A sequence key cannot be null. Any two keys, sequenceKey1 and sequenceKey2, are considered "the same sequence key" if and only if Objects.equals(sequenceKey1, sequenceKey2) returns true.

  • Thread safety

A conseq4j instance is thread-safe in and of itself. The usual thread-safety rules and concerns, however, still apply when programming the executable tasks. Moreover, in the context of concurrency and sequencing, the thread-safety concern goes beyond concurrent modification of individual-task data, into that of meaningful execution order among multiple related tasks.

  • Concurrency and sequencing

First of all, by definition, there is no such thing as order or sequence among tasks submitted concurrently by different threads. No particular execution order is guaranteed on those concurrent tasks, regardless of their sequence keys. The conseq4j API only manages sequentially-submitted tasks - those that are submitted by a single thread, or by each single thread in case of multi-threading. To execute those sequential tasks, the conseq4j API provides both concurrency and sequencing: The tasks will be executed sequentially if they have the same sequence key, and concurrently if they have different sequence keys.

Technically, to form a sequence, the client task-submitting thread only needs to be "logically" single. It does not always have to be the same physical thread e.g. sometimes one thread may need to be replaced by another for various reasons. The conseq4j API should function correctly as long as the related tasks are submitted by at most one thread at any time, and with the right order of submission sequence over time. Fortunately, that is often naturally the case for the API client, e.g. when the task submission is managed by a messaging provider such as Kafka, JMS, x-MQ, TIBCO EMS, etc...

Style 1: summon a sequential executor by its sequence key, then use the executor as with a JDK ExecutorService

  • API

@FunctionalInterface
public interface ExecutorServiceFactory {
  /**
   * Produces sequential executor for the specified sequence key. All calls with the same sequence
   * key argument return the same executor. For concurrency, calls with different sequence key
   * arguments may return different executors that can run in parallel.
   *
   * @param sequenceKey an {@link Object} instance whose hash code is used to summon the
   *     corresponding executor.
   * @return the sequential executor of type {@link java.util.concurrent.ExecutorService} that
   *     executes all tasks of this sequence key in the same order as they are submitted.
   */
  ExecutorService getExecutorService(Object sequenceKey);
}

This API style loosely takes the form of "thread affinity". Sequence keys are used to summon executors of the ExecutorService JDK type. The same sequence key always gets back the same sequential executor. All tasks of that sequence key can then be "affined" to and executed sequentially by the summoned executor in the same submission order.

The total number of executors concurrently available at runtime is configurable, and default to the available runtime processors of the JVM. Since each executor is sequential, the number of available executors equals the number of tasks that can be executed in parallel.

Consider using this style when the summoned executor needs to provide the syntax and semantic richness of the JDK ExecutorService API.

  • Sample usage
public class MessageConsumer {
  /**
   * Default conseq's concurrency is java.lang.Runtime.availableProcessors.
   * <p>
   * Or to set the global concurrency to 10, for example:
   * <code>
   * private ExecutorServiceFactory conseqExecutorFactory = ConseqFactory.instance(10);
   * </code>
   */
  private final ExecutorServiceFactory conseqFactory = ConseqFactory.instance();

  @Autowired
  private ShoppingEventProcessor shoppingEventProcessor;

  /**
   * Suppose run-time invocation of this method is managed by the messaging provider. This is usually via a single 
   * caller thread.
   * <p>
   * Concurrency is achieved when shopping events of different shopping cart IDs are processed in parallel, by 
   * different executors. Sequence is maintained on all shopping events of the same shopping cart ID, by the same 
   * executor.
   */
  public void onMessage(Message shoppingEvent) {
    conseqFactory.getExecutorService(shoppingEvent.getShoppingCartId())
        .execute(() -> shoppingEventProcessor.process(shoppingEvent));
  }
}

The implementation of this thread-affinity style relies on hashing of the sequence keys into a fixed number of "buckets". These buckets are each associated with a sequential executor. The same sequence key is always hashed to and summons back the same executor. Single-threaded, each executor ensures the execution order of all its tasks is the same as they are submitted; excessive tasks pending execution are buffered in a FIFO task queue. Thus, the total number of buckets (i.e. the max number of available executors and the general concurrency) is the maximum number of tasks that can be executed in parallel at any given time.

As with hashing, collision may occur among different sequence keys. When hash collision happens, tasks of different sequence keys are assigned to the same executor. Due to the single-thread setup, the executor still ensures the local sequential execution order for each individual sequence key's tasks. However, unrelated tasks of different sequence keys now assigned to the same bucket/executor may delay each other's execution inadvertently while waiting in the executor's task queue. Consider this a trade-off of getting the syntax and semantic richness of the JDK ExecutorService.

To account for hash collision, this conseq4j style does not support any shutdown action on its provided executor (ExecutorService) instances (It'd throws a runtime exception). That is to prevent unintended task cancellation across different sequence keys in the same bucket. The Future instances subsequently produced by the executor, however, are still cancellable. The hash collision may not be an issue for workloads that are asynchronous and focused on overall through-put, but is something to be aware of.

The default general concurrency is the JVM run-time's availableProcessors:

ConseqFactory.instance();

The concurrency can be customized:

ConseqFactory.instance(10)

Style 2: submit each task directly for execution, together with its sequence key

  • API

@FunctionalInterface
public interface TaskExecutor {
  /**
   * Asynchronously executes specified command in sequence regulated by specified key while
   * providing concurrent execution capability to other tasks with different keys.
   *
   * @param command the Runnable task to run sequentially with others under the same sequence key
   * @param sequenceKey the key under which all tasks are executed sequentially
   * @return future holding run status of the submitted command
   */
  default Future<Void> execute(Runnable command, Object sequenceKey) {
    return submit(Executors.callable(command, null), sequenceKey);
  }

  /**
   * Asynchronously executes specified task in sequence regulated by specified key while providing
   * concurrent execution capability to other tasks with different keys.
   *
   * @param <T> the type of the task's result
   * @param task the Callable task to run sequentially with others under the same sequence key
   * @param sequenceKey the key under which all tasks are 
View on GitHub
GitHub Stars5
CategoryDevelopment
Updated8mo ago
Forks1

Languages

Java

Security Score

82/100

Audited on Aug 2, 2025

No findings