ConcurrentNativeQueue
Lock-free MPSC (Multi-Producer, Single-Consumer) native queue for .NET 6.0+ — zero GC, fully native (NativeMemory), exponentially growing segments.
Install / Use
/learn @VAllens/ConcurrentNativeQueueREADME
English | 简体中文
ConcurrentNativeQueue<T>
高性能无锁 MPSC(多生产者单消费者)原生队列,基于分段链表设计,适用于 .NET 6+。
所有内存(段头结构体 + 槽位数组)均通过 NativeMemory 分配,零 GC 压力、零托管堆分配;段满时自动分配新段链接到尾部,槽位数组在段消费完后优先回收到单槽缓存(用于复用),段头在 Dispose 时统一回收。
注意:这是一个完全使用
Claude Opus 4.6 High Thinking编写的项目。
虽然通过单元测试,但暂未投入生产使用。
若要使用,请自行严谨测试,风险自负,本仓库作者和AI概不负责。
特性
- 无锁并发 — 入队通过
Volatile.Read+Interlocked.CompareExchange(CAS) 占位,段满检测为纯读操作,不产生原子写开销 - 批量入队 —
EnqueueRange单次 CAS 占位多个槽位,分摊原子操作开销;跨段时自动分批写入 - 分段链表 — 段按需分配并链接;默认模式容量指数增长(上限 1M),稳态模式容量保持稳定
- 容量策略开关 —
preferSteadyStateCapacity=false(默认)偏向峰值吞吐;true偏向稳态复用与缓存命中 - 全 Native 内存 — 段头结构体与槽位数组均通过
NativeMemory分配,不产生托管堆分配 - 单槽复用缓存 — 段消费完后槽位数组回收到单槽缓存;新段分配优先复用;缓存被替换时释放旧槽位数组
- 段头延迟回收 — 段头可能被生产者暂时持有,统一在
Dispose释放 - False Sharing 防护 — 生产者/消费者热点字段通过缓存行填充隔离,
EnqueuePos使用 128 字节独占布局 - FIFO 保序 — 单生产者视角下严格先入先出,多生产者时每个生产者的消息顺序不变
- 非托管类型约束 —
where T : unmanaged,支持int、long、自定义值类型结构体等
快速开始
using ConcurrentNativeQueueLibrary;
// 创建队列(默认段大小 32,默认高吞吐扩容模式)
using var queue = new ConcurrentNativeQueue<long>();
// 生产者线程入队
queue.Enqueue(42);
queue.Enqueue(100);
// 批量入队
queue.EnqueueRange(new long[] { 200, 300, 400 });
// 查看头部元素(不移除)
if (queue.TryPeek(out long head))
Console.WriteLine(head); // 42
// 消费者线程出队
if (queue.TryDequeue(out long item))
Console.WriteLine(item); // 42
构造参数示例
// 指定初始段大小(默认:后续段容量指数增长,上限 1M)
using var q1 = new ConcurrentNativeQueue<int>(128);
// 指定稳态容量模式(后续段保持同容量,提升复用命中)
using var q2 = new ConcurrentNativeQueue<int>(128, preferSteadyStateCapacity: true);
// 使用默认段大小 + 指定模式
using var q3 = new ConcurrentNativeQueue<int>(preferSteadyStateCapacity: true);
MPSC 并发模式
using var queue = new ConcurrentNativeQueue<long>();
const int producerCount = 4;
const int itemsPerProducer = 50_000;
int totalItems = producerCount * itemsPerProducer;
var barrier = new Barrier(producerCount + 1);
var tasks = new Task[producerCount];
for (int p = 0; p < producerCount; p++)
{
int pid = p;
tasks[p] = Task.Run(() =>
{
barrier.SignalAndWait();
for (int i = 0; i < itemsPerProducer; i++)
queue.Enqueue((long)pid * itemsPerProducer + i);
});
}
barrier.SignalAndWait();
int consumed = 0;
while (consumed < totalItems)
{
if (queue.TryDequeue(out long item))
consumed++;
}
Task.WaitAll(tasks);
API
| 成员 | 说明 |
|---|---|
| ConcurrentNativeQueue() | 以默认段大小 32 创建队列,默认高吞吐扩容模式 |
| ConcurrentNativeQueue(int segmentSize) | 指定初始段大小创建队列(最小为 2) |
| ConcurrentNativeQueue(bool preferSteadyStateCapacity) | 使用默认段大小 32,并指定容量策略模式 |
| ConcurrentNativeQueue(int segmentSize, bool preferSteadyStateCapacity) | 指定初始段大小与容量策略模式 |
| void Enqueue(T item) | 入队。线程安全,支持多生产者并发调用。段满时自动分配新段 |
| void EnqueueRange(ReadOnlySpan<T> items) | 批量入队。单次 CAS 占位多个槽位,跨段时自动分批。线程安全 |
| bool TryPeek(out T item) | 查看头部元素但不移除。成功返回 true;队列为空返回 false。仅限单消费者调用 |
| bool TryDequeue(out T item) | 出队。成功返回 true;队列为空返回 false。仅限单消费者调用 |
| int Count | 当前元素数量(并发场景下为近似值) |
| bool IsEmpty | 队列是否为空 |
| void Dispose() | 释放所有段的原生内存。可多次调用,不会抛出异常 |
| GetDebugSlotStats() | 仅 DEBUG 编译下可用,返回槽位分配/复用/释放计数(用于验证复用) |
设计原理
分段链表 + 状态标记
队列由原生段组成,每个段包含一个 NativeMemory 分配的槽位数组。每个槽位有 State 字段标记写入状态:
State == 0:槽位空闲,尚未写入State == 1:数据已就绪,可读取
生产者通过 Volatile.Read 读取当前 EnqueuePos,再用 Interlocked.CompareExchange (CAS) 原子占位。段满检测为纯读操作,不产生原子写开销。写入数据后通过 Volatile.Write 设置 State = 1 通知消费者。
EnqueueRange 进一步优化:单次 CAS 占位 N 个槽位,先批量写入 Value,经 Thread.MemoryBarrier() 后再批量设置 State,将 N 次原子操作分摊为 ⌈N/段剩余容量⌉ 次。
段容量模式
- 吞吐模式(默认):
preferSteadyStateCapacity=false,NextCapacity指数增长(上限 1M),减少段切换频率 - 稳态模式:
preferSteadyStateCapacity=true,段容量保持稳定,更容易命中复用缓存
段生命周期(全 Native + 单槽复用缓存)
段头结构体与槽位数组均通过 NativeMemory 分配,无托管对象参与。
核心生命周期挑战:生产者可能在任意时刻被抢占并持有旧段指针,因此段头不能在消费后立即释放。
- 段满 — 生产者检测到
offset >= capacity,通过 CAS 创建新段并链接到Next,再推进_tail。若 CAS 失败,未发布的新段头会立即释放,未使用的槽位数组回收到单槽缓存 - 段消费完毕 — 消费者检测到
offset >= capacity,将该段槽位数组回收到单槽缓存(若缓存已有旧槽位则释放旧槽位),再前进到Next - 新段分配 — 优先从单槽缓存取回槽位数组复用;不满足容量条件时再申请新内存
- Dispose — 从
_origin遍历整条链表,释放所有段头和剩余槽位数组(含缓存中的槽位)
相比旧版环形缓冲区的改进
| 维度 | 旧实现(环形缓冲区) | 新实现(分段链表) |
|---|---|---|
| 每次操作原子指令 | Interlocked.Inc + CAS + Interlocked.Dec = 3次 | CAS 占位 = 1次(EnqueueRange 可 1 次占 N 个) |
| 扩容方式 | Stop-the-world + O(N) 迁移 | 分配新段 O(1),CAS 链接 |
| 容量策略 | 单一策略 | 吞吐模式(指数增长)/稳态模式(固定容量)可切换 |
| 槽位回收 | 显式缩容 + 迁移 | 消费后回收到单槽缓存,优先复用 |
项目结构
ConcurrentNativeQueue/
├── ConcurrentNativeQueueLibrary/ # 核心库
│ └── ConcurrentNativeQueue.cs
├── ConcurrentNativeQueueDemo/ # MPSC 演示程序(支持 AOT 发布)
│ └── Program.cs
├── ConcurrentNativeQueueBenchmark/ # BenchmarkDotNet 性能基准
│ ├── MpscBenchmark.cs # 多生产者单消费者并发吞吐量 vs ConcurrentQueue
│ ├── SequentialBenchmark.cs # 单线程顺序吞吐量 vs ConcurrentQueue(含模式参数)
│ ├── BatchEnqueueBenchmark.cs # EnqueueRange vs 逐条 Enqueue 吞吐量对比
│ ├── SegmentSizeBenchmark.cs # 不同段大小对吞吐量的影响
│ ├── FixedDepthLoopBenchmark.cs # 固定深度稳态场景(含模式参数)
│ └── NonSteadyStateBenchmark.cs # 非稳态波动场景(含模式参数)
└── ConcurrentNativeQueueUnitTest/ # xUnit 单元测试
└── ConcurrentNativeQueueUnitTest.cs
运行
运行演示
dotnet run --project ConcurrentNativeQueueDemo
运行测试
dotnet test
运行基准测试
dotnet run --project ConcurrentNativeQueueBenchmark -c Release -- --filter *
历史报告、结果说明及大小核绑定方式见 benchmark-results/README.md。
要求
- .NET SDK 6.0 或更高。
- 允许
unsafe代码(已在项目文件中启用)
许可证
MIT
Related Skills
node-connect
344.1kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
96.8kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
344.1kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
344.1kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
