ReadWriteLock
使用C#实现的读写锁,完成基本读写同步需求
Install / Use
/learn @RuiTan/ReadWriteLockREADME
ReadWriteLock
使用C#实现的可重入、非公平的读写锁,主要目的实现并发读写以及读写同步问题。为了减少读等待时间以及防止出现写饥饿现象,本锁使用了非同步锁的实现,允许读优先(提高效率)的同时使用一个阈值限定读者的最大数量(防止写饥饿);重入机制允许某个线程可以获取锁多次(如多次函数调用导致的锁重入),每次获取都需要有对应的释放,否则会出错。
本锁的目的是实现对读写线程队列的调度,而不是对线程获取锁的顺序进行调度。要注意读写线程进入队列的顺序是系统调度的(意即创建多个线程并Start时,其进入线程的时机是系统决定的,可能最后创建的线程最先执行),这里实现的是对进入队列后的线程进行阻塞、唤醒等操作。
本锁使用了C#中的lock原语,目的是为了实现一些队列操作的原子性。
Node节点
Node节点是实现本读写锁的关键所在,读写线程队列也是基于Node节点实现的。Node内部持有了一个线程,同时有很多指向其他Node的引用。Node类实现如下:
public class Node
{
// 读节点
public static readonly Node SHARED = new Node();
// 写节点
public static readonly Node EXCLUSIVE = null;
// 读链长度阈值
public static int Threshold = 3;
// 节点状态
public static readonly int CANCELLED = 1;
public static readonly int RUNNING = -1;
public static readonly int WAITING = -2;
public static readonly int SIGNAL = -3;
public int waitStatus;
// 前驱节点
public Node prev;
// 后继节点
public Node next;
// 持有线程
public Thread thread;
//以下参数仅对读节点适用
// 读链头
public Node readerHead;
// 后继读节点
public Node nextReader;
// 读链长度
public int readerCount = 1;
// 节点类型
public Node mode;
// 是否是共享节点
public bool isShared()
{
return mode == SHARED;
}
public Node()
{
}
// 创建共享或独占节点
public Node(Thread thread, Node mode)
{
this.mode = mode;
this.thread = thread;
}
// 获取状态
public static string GetStatus(int status)
{
switch (status)
{
case 1:
return "CANCELLED";
case -1:
return "RUNNING";
case -2:
return "WAITING";
case -3:
return "SIGNAL";
}
return "DEFAULT";
}
}
各属性及方法的涵义如上述代码注释所示,而由Node组成的等待队列结构(双向链表)如下图所示(可能出现的一种情况):

对于写节点来说,锁是独占的,一次仅能有一个写线程在执行;而对于读节点来说,锁是共享的,在同一读链上的所有节点都可同时进行读。
重要变量
头节点 head
头节点是个傀儡节点,private volatile Node head,其无实际涵义,只是为了作为队列头而存在;其使用了惰性初始化的方法,仅在第一个节点入队列时初始化。
volatile关键字在是为了实现变量的内存可见性,使用该关键字修饰的变量的修改会直接反映到内存中而不是缓存中。
尾节点 tail
尾节点是队列的最后一个节点,private volatile Node tail,引入尾节点是为了防止在新节点入列时遍历队列,提高了效率。
重入量 reentrants
对于写锁(独占锁)来说,当一个写线程获取锁时,reentrants为1,后续每当锁重入一次,reentrants增加1;释放锁时,每释放一次reentrants减少1,直到reentrants为0时该线程释放当前锁,唤醒后续线程;
对于读锁(共享锁)来说,由于读链中可能会有不超过Node.Threshold个数的读节点,且每个读节点都可能会产生重入,这里会将reentrants初始化为读链长度,在Node.RUNNING``时读链每增加一个读节点会增加一个reentrants,读链中的每个节点多一次重入也会导致reentrants增加1。
写锁
获取 WriteLock()
先看获取写锁的流程:
/**
* 获取写锁
*/
public void WriteLock()
{
// 当前节点
Node currentNode = null;
// 这里使用了惰性初始化,如果头节点为空,则初始化头节点
lock (this)
{
if (head == null)
head = new Node();
}
// 如果尾节点为空,说明当前队列为空,此时线程直接入队列设置为RUNNING状态
lock (this)
{
if (tail == null)
{
EnqWhenTailNull(Node.EXCLUSIVE);
return;
}
}
lock (this)
{
// 如果当前线程就是持有线程,说明锁在重入,reentrants加1
if (owner == Thread.CurrentThread)
{
reentrants += 1;
return;
}
// 否则,当前线程需要进入等待队列进行等待
else
{
// 获取尾节点
Node t = tail;
// 由于可能出现其他线程的介入,需要再次检测为节点是否为空
if (t == null)
{
EnqWhenTailNull(Node.EXCLUSIVE);
return;
}
// 否则直接入队列等待
else
currentNode = Enq(Node.EXCLUSIVE);
}
}
// 检测当前节点是否可以被唤醒
while (currentNode.waitStatus != Node.SIGNAL) { }
// 此时线程已被唤醒,设置重入量及状态等
reentrants = 1;
currentNode.waitStatus = Node.RUNNING;
owner = Thread.CurrentThread;
}
基本流程如下:
- 先判断队列是否已初始化,若未初始化,先初始化head节点
- 判断队列是否为空(即判断
tail==null),若是直接进入队列,成功获取锁,返回;否则进入下一步 - 判断当前线程是否为持有锁的线程,若是说明线程在重入当前锁,直接
reentrants加1,成功获取锁,返回;否则进入下一步 - 获取尾节点,由于第2步结束时可能有其他线程的介入,因此需再次判断
tail==null,若是直接进入队列,成功获取锁,返回;否则以当前线程创建节点并入队列,同时阻塞 - 循环检测当前节点是否可以被唤醒,若可则解除阻塞状态,设置节点状态等信息,并成功获取独占锁
释放 WriteUnlock()
释放写锁的逻辑很简单:
/**
* 释放写锁
*/
public void WriteUnlock()
{
// 加锁是为了同步修改队列信息
lock (this)
{
// 当前锁持有者重入量直接-1
reentrants -= 1;
// 获取队列头(除了head之外的头)
Node node = GetHolderNode();
// 如果后续节点为空,则释放完成,队列已空;若后续节点不为空,则可能需要唤醒后续节点
if (node != null)
{
// 若当前锁的重入量为0,说明锁已经完全释放,则需要唤醒后继有效节点(否则可能只是释放了一个锁内部的锁)
if (reentrants == 0)
{
// 当前节点置为无效
node.waitStatus = Node.CANCELLED;
// 唤醒后继有效节点
AwakeNext();
}
}
}
}
逻辑如下:
- 锁
reentrants减1 - 获取当前锁持有的节点
- 判断上述获取的节点是否为空,若为空说明队列已空,已无等待节点,直接返回;否则检查
reentrants是否已减至0 - 若是则当前节点线程已完成,置为
Node.CANCELLED状态,并唤醒后继有效节点
读锁
获取 ReadLock()
相比于写锁的获取,读锁的获取要复杂一点,因为涉及到读的共享以及阈值的控制:
/**
* 获取读锁
*/
public void ReadLock()
{
Node reader = null;
// 这里基本的初始化方式和读锁相似,不赘述
lock (this)
{
if (head == null)
head = new Node();
}
Thread current = Thread.CurrentThread;
lock (this)
{
if (tail == null)
{
EnqWhenTailNull(Node.SHARED);
return;
}
}
Node currentNode = null;
lock (this)
{
// 读可重入
if (ReaderCanReentranted())
{
reentrants += 1;
return;
}
// 获取当前持有锁节点
Node node = head.next;
if (node == null)
{
EnqWhenTailNull(Node.SHARED);
return;
}
else
{
// 队列中有写者的情况
if (HasWriter())
{
// 创建当前持有当前线程的读节点
reader = new Node(current, Node.SHARED);
// 获取第一个读节点(可能为null)
while (node != null && (!node.isShared() || node.waitStatus == Node.CANCELLED))
{
if (node.isShared())
{
if (!CheckReadChainCancelled(node))
break;
else
node = node.next;
}
else
node = node.next;
}
// 未找到读节点,说明队列中只有写节点,此时直接添加到队列尾
if (node == null)
{
currentNode = Enq(Node.SHARED);
}
// 否则说明找到了有效的读节点,此时此读节点为队列中第一个读节点,只需要判断此读节点链长是否达到了阈值,
// 若未超过阈值,直接添加到读链中,并判断当前读链头是否正在读,则可以直接读,否则需要循环检测;
// 若超过了阈值:
// 若队尾为写节点,则添加到队列尾,并循环等待;
// 若队尾为读节点,判断是否到达阈值:
// 若是则添加到队列尾,并循环等待;
// 否则添加到读链中。
else
{
// 未超过阈值
if (node.readerCount < Node.Threshold)
{
// 添加到读链中
currentNode = AddReader(node, reader);
// 链头正在读,此节点也直接读,且reentrants+1
if (node.waitStatus == Node.RUNNING)
{
reader.waitStatus = Node.RUNNING;
reentrants += 1;
return;
}
}
// 超过了阈值
else
{
// 判断队尾节点类型
Node t = tail;
// 队尾为“写节点”或“达到阈值的读节点”
if (!t.isShared() || (t.isShared() && t.readerCount >= Node.Threshold))
currentNode = Enq(Node.SHARED);
// 队尾为“未达到阈值的读节点”
else
{
// 添加到队尾节点所在的读链中
currentNode = AddReader(t, reader);
// 链头正在读,此节点也直接读,且reentrants+1
if (t.waitStatus == Node.RUNNING)
{
reader.waitStatus = Node.RUNNING;
reentrants += 1;
return;
}
}
}
}
}
// 队列中无写者,说明队列中要么为空,要么全为读者,直接加到队尾或者队尾所在的读链
else
{
// 队列为空,直接入队
if (tail == null)
{
EnqWhenTailNull(Node.SHARED);
return;
}
// 队尾节点读链长度未达到阈值
if (tail.readerCount < Node.Threshold)
{
reader = new Node(current, Node.SHARED);
// 添加到链尾
currentNode = AddReader(tail, reader);
// 是否需要循环等待
if (tail.waitStatus == Node.RUNNING)
{
reader.waitStatus = Node.RUNNING;
reentrants += 1;
return;
}
}
// 队尾节点读链达到了阈值,直接加入队尾,并等待唤醒
else
{
currentNode = Enq(Node.SHARED);
}
}
}
}
while (currentNode.waitStatus != Node.SIGNAL) { }
currentNode.waitStatus = Node.RUNNING;
reentrants = currentNode.readerHead.readerCount;
owner = currentNode.readerHead == null ? null : currentNode.readerHead.thread;
}
逻辑如下:
- 先判断队列是否已初始化,若未初始化,先初始化head节点
- 判断队列是否为空(即判断
tail==null),若是直接进入队列,成功获取锁,返回;否则进入下一步 - 判断当前线程是否可重入锁(读线程的可重入判断与写不同,会在后面对
ReaderCanReentranted()方法解释时介绍到),若是reentrants加1,返回;否则进入下一步 - 获取当前持有锁节点,若为空说明队列为空,直接进入队列,成功获取锁,返回;否
