SkillAgentSearch skills...

MPMCQueue.NET

Bounded multiple producers multiple consumers queue for .NET

Install / Use

/learn @alexandrnikitin/MPMCQueue.NET
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

MPMCQueue.NET

Build Status

Bounded multiple producers multiple consumers queue for .NET

Note: ConcurrentQueue<T> uses this algorithm since .NET Core and .NET >5.0. Please use that instead. Unless you want to modify this implementation for you needs, e.g. MPSC, SCMP.

Overview

This is an attempt to port [the famous Bounded MPMC queue algorithm by Dmitry Vyukov][1024-mpmc] to .NET. All credit goes to Dmitry Vyukov. I let myself quote the description:

According to the classification it's MPMC, array-based, fails on overflow, does not require GC, w/o priorities, causal FIFO, blocking producers and consumers queue. The algorithm is pretty simple and fast. It's not lockfree in the official meaning, just implemented by means of atomic RMW operations w/o mutexes.

The cost of enqueue/dequeue is 1 CAS per operation. No amortization, just 1 CAS. No dynamic memory allocation/management during operation. Producers and consumers are separated from each other (as in the two-lock queue), i.e. do not touch the same data while queue is not empty.

Implementation

The queue class layout is shown below. The _buffer field stores enqueued elements and their sequences. It has size that is a power of two. The _bufferMask field is used to avoid the expensive modulo operation and use AND instead. There's padding applied to avoid [false sharing][false-sharing] of _enqueuePos and _dequeuePos counters. And [Volatile.Read/Write to suppress memory instructions reordering when read/write cell.Sequence][memory-barriers-in-dot-net].

[StructLayout(LayoutKind.Explicit, Size = 192, CharSet = CharSet.Ansi)]
public class MPMCQueue
{
    [FieldOffset(0)]
    private readonly Cell[] _buffer;
    [FieldOffset(8)]
    private readonly int _bufferMask;
    [FieldOffset(64)]
    private int _enqueuePos;
    [FieldOffset(128)]
    private int _dequeuePos;

    ...
}

The enqueue algorithm:

public bool TryEnqueue(object item)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var pos = _enqueuePos; // fetch the current position where to enqueue the item
        var index = pos & _bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence wasn't touched by other producers
        // and we can increment the enqueue position
        if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
        {
            // write the item we want to enqueue
            buffer[index].Element = item;
            // bump the sequence
            Volatile.Write(ref buffer[index].Sequence, pos + 1);
            return true;
        }

        // If the queue is full we cannot enqueue and just return false
        if (cell.Sequence < pos)
        {
            return false;
        }

        // repeat the process if other producer managed to enqueue before us
    } while (true);
}

The dequeue algorithm:

public bool TryDequeue(out object result)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var bufferMask = _bufferMask; // prefetch the buffer mask
        var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
        var index = pos & bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence was changed by a producer and wasn't changed by other consumers
        // and we can increment the dequeue position
        if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
        {
            // read the item
            result = cell.Element;
            // no more reference the dequeue data
            buffer[index].Element = null;
            // update for the next round of the buffer
            Volatile.Write(ref buffer[index].Sequence, pos + bufferMask + 1);
            return true;
        }

        // If the queue is empty return false
        if (cell.Sequence < pos + 1)
        {
            result = default(object);
            return false;
        }

        // repeat the process if other consumer managed to dequeue before us
    } while (true);
}

Benchmarks


Host Process Environment Information:
BenchmarkDotNet.Core=v0.9.9.0
OS=Microsoft Windows NT 6.2.9200.0
Processor=Intel(R) Core(TM) i7-4600U CPU 2.10GHz, ProcessorCount=4
Frequency=2630626 ticks, Resolution=380.1377 ns, Timer=TSC
CLR=MS.NET 4.0.30319.42000, Arch=64-bit RELEASE [RyuJIT]
GC=Concurrent Workstation
JitModules=clrjit-v4.6.1586.0

MPMCQueue.NET.MPMCQueue

Method | NumberOfThreads | Median | StdDev | --------------- |---------------- |----------- |----------- | EnqueueDequeue | 1 | 24.5941 ns | 6.0686 ns | EnqueueDequeue | 2 | 45.5109 ns | 12.0462 ns | EnqueueDequeue | 4 | 49.1997 ns | 4.3251 ns |

System.Collections.Concurrent.ConcurrentQueue

Method | NumberOfThreads | Median | StdDev | --------------- |---------------- |----------- |---------- | EnqueueDequeue | 1 | 34.1918 ns | 0.5379 ns | EnqueueDequeue | 2 | 72.1948 ns | 2.8465 ns | EnqueueDequeue | 4 | 63.6846 ns | 3.6718 ns |

MPMCQueue shows worse than ConcurrentQueue results on many core and multi socket CPUs systems because cmpxchg instruction doesn't scale well, read more

Assembly (RyuJIT x64, clrjit-v4.6.1586.0)

MPMCQueue.NET.MPMCQueue.TryEnqueue(System.Object)

var buffer = _buffer;
>>> 
00007ffb`3a790660 57              push    rdi
00007ffb`3a790661 56              push    rsi
00007ffb`3a790662 53              push    rbx
00007ffb`3a790663 4883ec20        sub     rsp,20h
00007ffb`3a790667 4c8b4108        mov     r8,qword ptr [rcx+8]

var pos = _enqueuePos;
00007ffb`3a79066b 8b7148          mov     esi,dword ptr [rcx+48h]

var index = pos & _bufferMask;
00007ffb`3a79066e 448bce          mov     r9d,esi
00007ffb`3a790671 44234910        and     r9d,dword ptr [rcx+10h]

var cell = buffer[index];
00007ffb`3a790675 453b4808        cmp     r9d,dword ptr [r8+8]
00007ffb`3a790679 735e            jae     00007ffb`3a7906d9
00007ffb`3a79067b 4963c1          movsxd  rax,r9d
00007ffb`3a79067e 48c1e004        shl     rax,4
00007ffb`3a790682 498d7c0010      lea     rdi,[r8+rax+10h]
00007ffb`3a790687 488bc7          mov     rax,rdi
00007ffb`3a79068a 8b5808          mov     ebx,dword ptr [rax+8]

if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
00007ffb`3a79068d 3bde            cmp     ebx,esi
00007ffb`3a79068f 753a            jne     00007ffb`3a7906cb
00007ffb`3a790691 4c8d5148        lea     r10,[rcx+48h]
00007ffb`3a790695 448d5e01        lea     r11d,[rsi+1]
00007ffb`3a790699 8bc6            mov     eax,esi
00007ffb`3a79069b f0450fb11a      lock cmpxchg dword ptr [r10],r11d
00007ffb`3a7906a0 3bc6            cmp     eax,esi
00007ffb`3a7906a2 7527            jne     00007ffb`3a7906cb

buffer[index].Element = item;
00007ffb`3a7906a4 4963c9          movsxd  rcx,r9d
00007ffb`3a7906a7 48c1e104        shl     rcx,4
00007ffb`3a7906ab 498d4c0810      lea     rcx,[r8+rcx+10h]
00007ffb`3a7906b0 e83b37605f      call    clr+0x3df0 (00007ffb`99d93df0) (JitHelp: CORINFO_HELP_ASSIGN_REF)

Volatile.Write(ref buffer[index].Sequence, pos + 1);
00007ffb`3a7906b5 488d4708        lea     rax,[rdi+8]
00007ffb`3a7906b9 8d5601          lea     edx,[rsi+1]
00007ffb`3a7906bc 8910            mov     dword ptr [rax],edx

return true;
00007ffb`3a7906be b801000000      mov     eax,1

src\MPMCQueue.NET\MPMCQueue.cs @ 54: (return false;)
00007ffb`3a7906c3 4883c420        add     rsp,20h
00007ffb`3a7906c7 5b              pop     rbx
00007ffb`3a7906c8 5e              pop     rsi
00007ffb`3a7906c9 5f              pop     rdi
00007ffb`3a7906ca c3              ret

if (cell.Sequence < pos)
00007ffb`3a7906cb 3bde            cmp     ebx,esi
00007ffb`3a7906cd 7d98            jge     00007ffb`3a790667

src\MPMCQueue.NET\MPMCQueue.cs @ 54: (return false;)
00007ffb`3a7906cf 33c0            xor     eax,eax
00007ffb`3a7906d1 4883c420        add     rsp,20h
00007ffb`3a7906d5 5b              pop     rbx
00007ffb`3a7906d6 5e              pop     rsi
00007ffb`3a7906d7 5f              pop     rdi
00007ffb`3a7906d8 c3              ret

src\MPMCQueue.NET\MPMCQueue.cs @ 41: 
00007ffb`3a7906d9 e8e21caa5f      call    clr!TranslateSecurityAttributes+0x88050 (00007ffb`9a2323c0) (JitHelp: CORINFO_HELP_RNGCHKFAIL)
00007ffb`3a7906de cc              int     3

MPMCQueue.NET.MPMCQueue.TryDequeue(System.Object ByRef)

var buffer = _buffer;
>>> 
00007ffb`3a790700 4157            push    r15
00007ffb`3a790702 4156            push    r14
00007ffb`3a790704 4154            push    r12
00007ffb`3a790706 57              push    rdi
00007ffb`3a790707 56              push    rsi
00007ffb`3a790708 55              push    rbp
00007ffb`3a790709 53              push    rbx
00007ffb`3a79070a 4883ec20        sub     rsp,20h
00007ffb`3a79070e 4c8bc2          mov     r8,rdx
00007ffb`3a790711 488b7108        mov     rsi,qword ptr [rcx+8]

var bufferMask = _bufferMask;
00007ffb`3a790715 8b7910          mov     edi,dword ptr [rcx+10h]

var pos = _dequeuePos;
00007ffb`3a790718 8b9988000000    mov     ebx,dword ptr [rcx+88h]

var index = pos & bufferMask;
00007ffb`3a79071e 8beb            mov     ebp,ebx
00007ffb`3a790720 23ef            and     ebp,edi

var cell = buffer[index];
00007ffb`3a790722 3b6e08          cmp     ebp,dword ptr [rsi+8]
00007ffb`3a790725 0f83840000
View on GitHub
GitHub Stars59
CategoryDevelopment
Updated4mo ago
Forks9

Languages

C#

Security Score

97/100

Audited on Nov 23, 2025

No findings