CoralQueue
CoralQueue is an ultra-low-latency, lock-free, garbage-free, batching and concurrent collection of circular data structures for inter-thread communication in Java.
Install / Use
/learn @coralblocks/CoralQueueREADME
CoralQueue
CoralQueue is an ultra-low-latency, lock-free, garbage-free, batching and concurrent collection of circular data structures for inter-thread communication in Java. It uses memory barriers through <i>volatile sequences</i> instead of locks to allow Java threads (producers and consumers) to exchange messages as fast as possible. All data structures are circular and bounded, requiring producer/consumer waiting (but not locking) when they are full/empty through a wait strategy or busy spinning.
For some performance numbers you can check this link.
The data structures are: Queue (one-producer-to-one-consumer), Multiplexer (many-producers-to-one-consumer), Demultiplexer (one-producer-to-many-consumers), MpMc (many-producers-to-many-consumers), Broadcaster (one-producer-to-many-consumers), MpMcBroadcaster (many-producers-to-many-consumers), Diamond (worker threads) and RawQueue (binary raw).
<pre> <b>Note:</b> CoralQueue allows the exchange of messages <i>inside the same JVM</i>. For a multicast message middleware solution to build distributed systems across multiple machines using the <i>sequencer architecture</i> you should refer to <a href="https://www.coralblocks.com/index.php/state-of-the-art-distributed-systems-with-coralmq/">this article</a>. </pre>Queue
<img src="images/Queue.png" alt="Queue" width="50%" height="50%" />The Queue allows a single producer thread sending messages to the queue and a single consumer thread receiving messages from the queue, both running inside the same JVM. The consumer reads the messages (all the messages) in the same order that they were sent by the producer.
- Click here for a minimal example of using the Queue
- Click here for a basic example of using the Queue
All about using the Queue
The queue is a circular data structure with pre-allocated <i> data transfer mutable objects</i>. You should see these data transfer mutable objects as <i>carriers of data</i>, in other words, they are there to allow you to transfer <i>data</i> (and not object references) from producers to consumers. The steps are:
- A producer fetches an available data transfer mutable object from the queue
- The producer populates the mutable object with the data it wants to transfer (i.e. send) to the consumer(s)
- The producer flushes to notify the consumer(s)
- A consumer fetches an available data transfer mutable object from the queue
- The consumer reads the data from the mutable object
- The consumer calls <code>doneFetching()</code> to notify the producer(s)
Below we use a <code>StringBuilder</code> as our data transfer mutable object to create an <code>AtomicQueue</code>:
final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(StringBuilder.class); // default queue capacity is 1024
You can also specify the capacity of the queue, which must be a power of two:
final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(512, StringBuilder.class); // specifying the queue capacity
The code above creates a queue with 512 pre-allocated StringBuilders. Note that it uses the default constructor of StringBuilder which by default creates a StringBuilder with size 16. That may be too small for our data transfer objects as we don’t want the StringBuilder resizing itself during runtime and creating garbage. So to create a bigger StringBuilder we can use a <code>com.coralblocks.coralqueue.util.Builder</code> like below:
Builder<StringBuilder> builder = new Builder<StringBuilder>() {
@Override
public StringBuilder newInstance() {
return new StringBuilder(1024);
}
};
And pass this builder to the constructor of our <code>AtomicQueue</code>:
final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(512, builder); // using a builder instead of the class
Sending messages to the queue
To send a message to the queue, you grab a data transfer mutable object from the queue, fill it with your data and call <code>flush()</code> as the code below illustrates:
StringBuilder sb;
while((sb = queue.nextToDispatch()) == null); // busy spin...
sb.setLength(0);
sb.append("Hello there!");
queue.flush();
Note that if the queue is full we just <i>busy spin</i> until a data transfer object becomes available. Later we will see how we can also use a <code>WaitStrategy</code> instead of busy spinning.
You can (and should) send messages in batches:
StringBuilder sb;
while((sb = queue.nextToDispatch()) == null); // busy spin...
sb.setLength(0);
sb.append("Hello there!");
while((sb = queue.nextToDispatch()) == null); // busy spin...
sb.setLength(0);
sb.append("Hello again!");
queue.flush();
Reading messages from the queue
To read messages from the queue you fetch them from a consumer thread, as the code below shows:
long avail;
while((avail = queue.availableToFetch()) == 0); // busy spin
for(int i = 0; i < avail; i++) {
StringBuilder sb = queue.fetch();
// do whatever you want with the StringBuilder
// just do not create garbage
// copy char by char if needed
// or copy the contents to an external StringBuilder
}
queue.doneFetching();
Again we busy spin if the queue is empty. Later we will see how we can also use a <code>WaitStrategy</code> instead of busy spinning.
Note that we fetch in batches, reducing the number of times we have to check for an empty queue through <code>availableToFetch()</code>.
</details> <details> <summary>Click here for all the details of how to use Wait Strategies</summary>All about using Wait Strategies
By default, you should busy-spin when the queue is full or empty. That’s usually the fastest approach but not always the best as you might want to allow other threads to use the CPU core. CoralQueue comes with a variety of wait strategies that you can use instead of busy spinning, and you can also create your own by implementing the <code>WaitStrategy</code> interface. Below are some examples of wait strategies that come with CoralQueue:
- ParkBackOffWaitStrategy: park (i.e. sleep) for 1 microsecond backing off up to a maximum of 1 millisecond in steps of 1 microsecond. The start, max and step values can be configured.
- BusySpinParkBackOffWaitStrategy: first busy spins for 10,000,000 cycles then it starts to park (i.e. sleep) by using the ParkBackOffWaitStrategy above. This is an example of a composite wait strategy, which combines multiple wait stratgies in a single one. The number of busy-spin cycles can be configured.
- BusySpinYieldSleepWaitStrategy: busy spins for 10,000,000 cycles, yields for 100 cycles then starts to sleep for 1 millisecond. All previous values can be changed/configured.
To use a wait strategy, all you have to do is call its <code>await()</code> and <code>reset()</code> methods instead of busy spinning:
Producer using a Wait Strategy
WaitStrategy producerWaitStrategy = new ParkWaitStrategy();
StringBuilder sb;
while((sb = queue.nextToDispatch()) == null) {
producerWaitStrategy.await(); // <=====
}
producerWaitStrategy.reset(); // <=====
sb.setLength(0);
sb.append("Hello there!");
queue.flush();
Consumer using a Wait Strategy
WaitStrategy consumerWaitStrategy = new BusySpinYieldSleepWaitStrategy();
long avail;
while((avail = queue.availableToFetch()) == 0) {
consumerWaitStrategy.await(); // <=====
}
consumerWaitStrategy.reset(); // <=====
for(int i = 0; i < avail; i++) {
StringBuilder sb = queue.fetch();
// do whatever you want with the StringBuilder
// just do not create garbage
// copy char by char if needed
// or copy the contents to an external StringBuilder
}
queue.doneFetching();
</details>
<details>
<summary>Click here for all the details of how to use Semi-Volatile Writes</summary>
All about using Semi-Volatile Writes (lazySet)
To squeeze every bit of performance out of CoralQueue, you can use <i>semi-volatile writes</i> (equivalent to VarHandle.setRelease) when sending and receiving messages. Basically, a semi-volatile write is done through the <code>lazySet</code> method from <code>java.util.concurrent.AtomicLong</code>. It is a faster operation for the thread that’s modifying the variable at the expense of the thread that’s interested in knowing about updates in the variable. For example, if you want to minimize the latency in the producer, you should use lazySet. On the other hand, if you want to minimize the message transit time, you should not use lazySet so the consumer is notified as soon as possible about a new message in the queue.
By default, CoralQueue does not use <code>lazySet</code>, in other words the other thread is notified immediately (or as soon as possible). But you can easily take control of that by using the methods below:
// producer notifying consumer(s)
queue.flush(); // no lazySet by default (notify the consumer thread immediately at the expense of the producer thread)
queue.flush(true); // use lazySet (tak
