MPMCQueue.NET
Bounded multiple producers multiple consumers queue for .NET
Install / Use
/learn @alexandrnikitin/MPMCQueue.NETREADME
MPMCQueue.NET
Bounded multiple producers multiple consumers queue for .NET
Note: ConcurrentQueue<T> uses this algorithm since .NET Core and .NET >5.0. Please use that instead. Unless you want to modify this implementation for you needs, e.g. MPSC, SCMP.
Overview
This is an attempt to port [the famous Bounded MPMC queue algorithm by Dmitry Vyukov][1024-mpmc] to .NET. All credit goes to Dmitry Vyukov. I let myself quote the description:
According to the classification it's MPMC, array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO, blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lockfree in the official meaning, just implemented by means of atomic RMW operations w/o mutexes.
The cost of enqueue/dequeue is 1 CAS per operation. No amortization, just 1 CAS. No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue), i.e. do not touch the same data while queue is not empty.
Implementation
The queue class layout is shown below. The _buffer field stores enqueued elements and their sequences. It has size that is a power of two. The _bufferMask field is used to avoid the expensive modulo operation and use AND instead. There's padding applied to avoid [false sharing][false-sharing] of _enqueuePos and _dequeuePos counters. And [Volatile.Read/Write to suppress memory instructions reordering when read/write cell.Sequence][memory-barriers-in-dot-net].
[StructLayout(LayoutKind.Explicit, Size = 192, CharSet = CharSet.Ansi)]
public class MPMCQueue
{
[FieldOffset(0)]
private readonly Cell[] _buffer;
[FieldOffset(8)]
private readonly int _bufferMask;
[FieldOffset(64)]
private int _enqueuePos;
[FieldOffset(128)]
private int _dequeuePos;
...
}
The enqueue algorithm:
public bool TryEnqueue(object item)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var pos = _enqueuePos; // fetch the current position where to enqueue the item
var index = pos & _bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence wasn't touched by other producers
// and we can increment the enqueue position
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
// write the item we want to enqueue
buffer[index].Element = item;
// bump the sequence
Volatile.Write(ref buffer[index].Sequence, pos + 1);
return true;
}
// If the queue is full we cannot enqueue and just return false
if (cell.Sequence < pos)
{
return false;
}
// repeat the process if other producer managed to enqueue before us
} while (true);
}
The dequeue algorithm:
public bool TryDequeue(out object result)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var bufferMask = _bufferMask; // prefetch the buffer mask
var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
var index = pos & bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence was changed by a producer and wasn't changed by other consumers
// and we can increment the dequeue position
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
// read the item
result = cell.Element;
// no more reference the dequeue data
buffer[index].Element = null;
// update for the next round of the buffer
Volatile.Write(ref buffer[index].Sequence, pos + bufferMask + 1);
return true;
}
// If the queue is empty return false
if (cell.Sequence < pos + 1)
{
result = default(object);
return false;
}
// repeat the process if other consumer managed to dequeue before us
} while (true);
}
Benchmarks
Host Process Environment Information:
BenchmarkDotNet.Core=v0.9.9.0
OS=Microsoft Windows NT 6.2.9200.0
Processor=Intel(R) Core(TM) i7-4600U CPU 2.10GHz, ProcessorCount=4
Frequency=2630626 ticks, Resolution=380.1377 ns, Timer=TSC
CLR=MS.NET 4.0.30319.42000, Arch=64-bit RELEASE [RyuJIT]
GC=Concurrent Workstation
JitModules=clrjit-v4.6.1586.0
MPMCQueue.NET.MPMCQueue
Method | NumberOfThreads | Median | StdDev | --------------- |---------------- |----------- |----------- | EnqueueDequeue | 1 | 24.5941 ns | 6.0686 ns | EnqueueDequeue | 2 | 45.5109 ns | 12.0462 ns | EnqueueDequeue | 4 | 49.1997 ns | 4.3251 ns |
System.Collections.Concurrent.ConcurrentQueue
Method | NumberOfThreads | Median | StdDev | --------------- |---------------- |----------- |---------- | EnqueueDequeue | 1 | 34.1918 ns | 0.5379 ns | EnqueueDequeue | 2 | 72.1948 ns | 2.8465 ns | EnqueueDequeue | 4 | 63.6846 ns | 3.6718 ns |
MPMCQueue shows worse than ConcurrentQueue results on many core and multi socket CPUs systems because cmpxchg instruction doesn't scale well, read more
Assembly (RyuJIT x64, clrjit-v4.6.1586.0)
MPMCQueue.NET.MPMCQueue.TryEnqueue(System.Object)
var buffer = _buffer;
>>>
00007ffb`3a790660 57 push rdi
00007ffb`3a790661 56 push rsi
00007ffb`3a790662 53 push rbx
00007ffb`3a790663 4883ec20 sub rsp,20h
00007ffb`3a790667 4c8b4108 mov r8,qword ptr [rcx+8]
var pos = _enqueuePos;
00007ffb`3a79066b 8b7148 mov esi,dword ptr [rcx+48h]
var index = pos & _bufferMask;
00007ffb`3a79066e 448bce mov r9d,esi
00007ffb`3a790671 44234910 and r9d,dword ptr [rcx+10h]
var cell = buffer[index];
00007ffb`3a790675 453b4808 cmp r9d,dword ptr [r8+8]
00007ffb`3a790679 735e jae 00007ffb`3a7906d9
00007ffb`3a79067b 4963c1 movsxd rax,r9d
00007ffb`3a79067e 48c1e004 shl rax,4
00007ffb`3a790682 498d7c0010 lea rdi,[r8+rax+10h]
00007ffb`3a790687 488bc7 mov rax,rdi
00007ffb`3a79068a 8b5808 mov ebx,dword ptr [rax+8]
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
00007ffb`3a79068d 3bde cmp ebx,esi
00007ffb`3a79068f 753a jne 00007ffb`3a7906cb
00007ffb`3a790691 4c8d5148 lea r10,[rcx+48h]
00007ffb`3a790695 448d5e01 lea r11d,[rsi+1]
00007ffb`3a790699 8bc6 mov eax,esi
00007ffb`3a79069b f0450fb11a lock cmpxchg dword ptr [r10],r11d
00007ffb`3a7906a0 3bc6 cmp eax,esi
00007ffb`3a7906a2 7527 jne 00007ffb`3a7906cb
buffer[index].Element = item;
00007ffb`3a7906a4 4963c9 movsxd rcx,r9d
00007ffb`3a7906a7 48c1e104 shl rcx,4
00007ffb`3a7906ab 498d4c0810 lea rcx,[r8+rcx+10h]
00007ffb`3a7906b0 e83b37605f call clr+0x3df0 (00007ffb`99d93df0) (JitHelp: CORINFO_HELP_ASSIGN_REF)
Volatile.Write(ref buffer[index].Sequence, pos + 1);
00007ffb`3a7906b5 488d4708 lea rax,[rdi+8]
00007ffb`3a7906b9 8d5601 lea edx,[rsi+1]
00007ffb`3a7906bc 8910 mov dword ptr [rax],edx
return true;
00007ffb`3a7906be b801000000 mov eax,1
src\MPMCQueue.NET\MPMCQueue.cs @ 54: (return false;)
00007ffb`3a7906c3 4883c420 add rsp,20h
00007ffb`3a7906c7 5b pop rbx
00007ffb`3a7906c8 5e pop rsi
00007ffb`3a7906c9 5f pop rdi
00007ffb`3a7906ca c3 ret
if (cell.Sequence < pos)
00007ffb`3a7906cb 3bde cmp ebx,esi
00007ffb`3a7906cd 7d98 jge 00007ffb`3a790667
src\MPMCQueue.NET\MPMCQueue.cs @ 54: (return false;)
00007ffb`3a7906cf 33c0 xor eax,eax
00007ffb`3a7906d1 4883c420 add rsp,20h
00007ffb`3a7906d5 5b pop rbx
00007ffb`3a7906d6 5e pop rsi
00007ffb`3a7906d7 5f pop rdi
00007ffb`3a7906d8 c3 ret
src\MPMCQueue.NET\MPMCQueue.cs @ 41:
00007ffb`3a7906d9 e8e21caa5f call clr!TranslateSecurityAttributes+0x88050 (00007ffb`9a2323c0) (JitHelp: CORINFO_HELP_RNGCHKFAIL)
00007ffb`3a7906de cc int 3
MPMCQueue.NET.MPMCQueue.TryDequeue(System.Object ByRef)
var buffer = _buffer;
>>>
00007ffb`3a790700 4157 push r15
00007ffb`3a790702 4156 push r14
00007ffb`3a790704 4154 push r12
00007ffb`3a790706 57 push rdi
00007ffb`3a790707 56 push rsi
00007ffb`3a790708 55 push rbp
00007ffb`3a790709 53 push rbx
00007ffb`3a79070a 4883ec20 sub rsp,20h
00007ffb`3a79070e 4c8bc2 mov r8,rdx
00007ffb`3a790711 488b7108 mov rsi,qword ptr [rcx+8]
var bufferMask = _bufferMask;
00007ffb`3a790715 8b7910 mov edi,dword ptr [rcx+10h]
var pos = _dequeuePos;
00007ffb`3a790718 8b9988000000 mov ebx,dword ptr [rcx+88h]
var index = pos & bufferMask;
00007ffb`3a79071e 8beb mov ebp,ebx
00007ffb`3a790720 23ef and ebp,edi
var cell = buffer[index];
00007ffb`3a790722 3b6e08 cmp ebp,dword ptr [rsi+8]
00007ffb`3a790725 0f83840000
