SkillAgentSearch skills...

ReadWriteLock

使用C#实现的读写锁,完成基本读写同步需求

Install / Use

/learn @RuiTan/ReadWriteLock
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

ReadWriteLock

使用C#实现的可重入非公平读写锁,主要目的实现并发读写以及读写同步问题。为了减少读等待时间以及防止出现写饥饿现象,本锁使用了非同步锁的实现,允许读优先(提高效率)的同时使用一个阈值限定读者的最大数量(防止写饥饿);重入机制允许某个线程可以获取锁多次(如多次函数调用导致的锁重入),每次获取都需要有对应的释放,否则会出错。

本锁的目的是实现对读写线程队列的调度,而不是对线程获取锁的顺序进行调度。要注意读写线程进入队列的顺序是系统调度的(意即创建多个线程并Start时,其进入线程的时机是系统决定的,可能最后创建的线程最先执行),这里实现的是对进入队列后的线程进行阻塞、唤醒等操作

本锁使用了C#中的lock原语,目的是为了实现一些队列操作的原子性

Node节点

Node节点是实现本读写锁的关键所在,读写线程队列也是基于Node节点实现的。Node内部持有了一个线程,同时有很多指向其他Node的引用。Node类实现如下:

public class Node
    {
        // 读节点
        public static readonly Node SHARED = new Node();
        // 写节点
        public static readonly Node EXCLUSIVE = null;
        // 读链长度阈值
        public static int Threshold = 3;

        // 节点状态
        public static readonly int CANCELLED = 1;
        public static readonly int RUNNING = -1;
        public static readonly int WAITING = -2;
        public static readonly int SIGNAL = -3;
    
        public int waitStatus;
        // 前驱节点
        public Node prev;
        // 后继节点
        public Node next;
        // 持有线程
        public Thread thread;

        //以下参数仅对读节点适用
        // 读链头
        public Node readerHead;
        // 后继读节点
        public Node nextReader;
        // 读链长度
        public int readerCount = 1;

        // 节点类型
        public Node mode;
        // 是否是共享节点
        public bool isShared()
        {
            return mode == SHARED;
        }
        public Node()
        {
        }

        // 创建共享或独占节点
        public Node(Thread thread, Node mode)
        {
            this.mode = mode;
            this.thread = thread;
        }
    
    	// 获取状态
        public static string GetStatus(int status)
        {
            switch (status)
            {
                case 1:
                    return "CANCELLED";
                case -1:
                    return "RUNNING";
                case -2:
                    return "WAITING";
                case -3:
                    return "SIGNAL";
            }
            return "DEFAULT";
        }
    }

各属性及方法的涵义如上述代码注释所示,而由Node组成的等待队列结构(双向链表)如下图所示(可能出现的一种情况):

队列示意图.png

对于写节点来说,锁是独占的,一次仅能有一个写线程在执行;而对于读节点来说,锁是共享的,在同一读链上的所有节点都可同时进行读。

重要变量

头节点 head

头节点是个傀儡节点,private volatile Node head,其无实际涵义,只是为了作为队列头而存在;其使用了惰性初始化的方法,仅在第一个节点入队列时初始化。

volatile关键字在是为了实现变量的内存可见性,使用该关键字修饰的变量的修改会直接反映到内存中而不是缓存中。

尾节点 tail

尾节点是队列的最后一个节点,private volatile Node tail,引入尾节点是为了防止在新节点入列时遍历队列,提高了效率。

重入量 reentrants

对于写锁(独占锁)来说,当一个写线程获取锁时,reentrants1,后续每当锁重入一次,reentrants增加1;释放锁时,每释放一次reentrants减少1,直到reentrants0时该线程释放当前锁,唤醒后续线程;

对于读锁(共享锁)来说,由于读链中可能会有不超过Node.Threshold个数的读节点,且每个读节点都可能会产生重入,这里会将reentrants初始化为读链长度,在Node.RUNNING``时读链每增加一个读节点会增加一个reentrants,读链中的每个节点多一次重入也会导致reentrants增加1

写锁

获取 WriteLock()

先看获取写锁的流程:

/**
 * 获取写锁
 */
public void WriteLock()
{
    // 当前节点
    Node currentNode = null;
    // 这里使用了惰性初始化,如果头节点为空,则初始化头节点
    lock (this)
    {
        if (head == null)
            head = new Node();
    }
    // 如果尾节点为空,说明当前队列为空,此时线程直接入队列设置为RUNNING状态
    lock (this)
    {
        if (tail == null)
        {
            EnqWhenTailNull(Node.EXCLUSIVE);
            return;
        }
    }
    lock (this)
    {
        // 如果当前线程就是持有线程,说明锁在重入,reentrants加1
        if (owner == Thread.CurrentThread)
        {
            reentrants += 1;
            return;
        }
        // 否则,当前线程需要进入等待队列进行等待
        else
        {
            // 获取尾节点
            Node t = tail;
            // 由于可能出现其他线程的介入,需要再次检测为节点是否为空
            if (t == null)
            {
                EnqWhenTailNull(Node.EXCLUSIVE);
                return;
            }
            // 否则直接入队列等待
            else
                currentNode = Enq(Node.EXCLUSIVE);
        }
    }
    // 检测当前节点是否可以被唤醒
    while (currentNode.waitStatus != Node.SIGNAL) { }
    // 此时线程已被唤醒,设置重入量及状态等
    reentrants = 1;
    currentNode.waitStatus = Node.RUNNING;
    owner = Thread.CurrentThread;
}

基本流程如下:

  1. 先判断队列是否已初始化,若未初始化,先初始化head节点
  2. 判断队列是否为空(即判断tail==null),若是直接进入队列,成功获取锁,返回;否则进入下一步
  3. 判断当前线程是否为持有锁的线程,若是说明线程在重入当前锁,直接reentrants1,成功获取锁,返回;否则进入下一步
  4. 获取尾节点,由于第2步结束时可能有其他线程的介入,因此需再次判断tail==null,若是直接进入队列,成功获取锁,返回;否则以当前线程创建节点并入队列,同时阻塞
  5. 循环检测当前节点是否可以被唤醒,若可则解除阻塞状态,设置节点状态等信息,并成功获取独占锁

释放 WriteUnlock()

释放写锁的逻辑很简单:

/**
 * 释放写锁
 */
public void WriteUnlock()
{
    // 加锁是为了同步修改队列信息
    lock (this)
    {
        // 当前锁持有者重入量直接-1
        reentrants -= 1;
        // 获取队列头(除了head之外的头)
        Node node = GetHolderNode();
        // 如果后续节点为空,则释放完成,队列已空;若后续节点不为空,则可能需要唤醒后续节点
        if (node != null)
        {
            // 若当前锁的重入量为0,说明锁已经完全释放,则需要唤醒后继有效节点(否则可能只是释放了一个锁内部的锁)
            if (reentrants == 0)
            {
                // 当前节点置为无效
                node.waitStatus = Node.CANCELLED;
                // 唤醒后继有效节点
                AwakeNext();
            }
        }
    }
}

逻辑如下:

  1. reentrants1
  2. 获取当前锁持有的节点
  3. 判断上述获取的节点是否为空,若为空说明队列已空,已无等待节点,直接返回;否则检查reentrants是否已减至0
  4. 若是则当前节点线程已完成,置为Node.CANCELLED状态,并唤醒后继有效节点

读锁

获取 ReadLock()

相比于写锁的获取,读锁的获取要复杂一点,因为涉及到读的共享以及阈值的控制:

/**
 * 获取读锁
 */
public void ReadLock()
{
    Node reader = null;
    // 这里基本的初始化方式和读锁相似,不赘述
    lock (this)
    {
        if (head == null)
            head = new Node();
    }
    Thread current = Thread.CurrentThread;
    lock (this)
    {
        if (tail == null)
        {
            EnqWhenTailNull(Node.SHARED);
            return;
        }
    }
    Node currentNode = null;
    lock (this)
    {
        // 读可重入
        if (ReaderCanReentranted())
        {
            reentrants += 1;
            return;
        }
        // 获取当前持有锁节点
        Node node = head.next;
        if (node == null)
        {
            EnqWhenTailNull(Node.SHARED);
            return;
        }
        else
        {
            // 队列中有写者的情况
            if (HasWriter())
            {
                // 创建当前持有当前线程的读节点
                reader = new Node(current, Node.SHARED);
                // 获取第一个读节点(可能为null)
                while (node != null && (!node.isShared() || node.waitStatus == Node.CANCELLED))
                {
                    if (node.isShared())
                    {
                        if (!CheckReadChainCancelled(node))
                            break;
                        else
                            node = node.next;
                    }
                    else
                        node = node.next;
                }
                // 未找到读节点,说明队列中只有写节点,此时直接添加到队列尾
                if (node == null)
                {
                    currentNode = Enq(Node.SHARED);
                }
                // 否则说明找到了有效的读节点,此时此读节点为队列中第一个读节点,只需要判断此读节点链长是否达到了阈值,
                //  若未超过阈值,直接添加到读链中,并判断当前读链头是否正在读,则可以直接读,否则需要循环检测;
                //  若超过了阈值:
                //      若队尾为写节点,则添加到队列尾,并循环等待;
                //      若队尾为读节点,判断是否到达阈值:
                //          若是则添加到队列尾,并循环等待;
                //          否则添加到读链中。
                else
                {
                    // 未超过阈值
                    if (node.readerCount < Node.Threshold)
                    {
                        // 添加到读链中
                        currentNode = AddReader(node, reader);
                        // 链头正在读,此节点也直接读,且reentrants+1
                        if (node.waitStatus == Node.RUNNING)
                        {
                            reader.waitStatus = Node.RUNNING;
                            reentrants += 1;
                            return;
                        }
                    }
                    // 超过了阈值
                    else
                    {
                        // 判断队尾节点类型
                        Node t = tail;
                        // 队尾为“写节点”或“达到阈值的读节点”
                        if (!t.isShared() || (t.isShared() && t.readerCount >= Node.Threshold))
                            currentNode = Enq(Node.SHARED);
                        // 队尾为“未达到阈值的读节点”
                        else
                        {
                            // 添加到队尾节点所在的读链中
                            currentNode = AddReader(t, reader);
                            // 链头正在读,此节点也直接读,且reentrants+1
                            if (t.waitStatus == Node.RUNNING)
                            {
                                reader.waitStatus = Node.RUNNING;
                                reentrants += 1;
                                return;
                            }
                        }
                    }
                }
            }
            // 队列中无写者,说明队列中要么为空,要么全为读者,直接加到队尾或者队尾所在的读链
            else
            {
                // 队列为空,直接入队
                if (tail == null)
                {
                    EnqWhenTailNull(Node.SHARED);
                    return;
                }
                // 队尾节点读链长度未达到阈值
                if (tail.readerCount < Node.Threshold)
                {
                    reader = new Node(current, Node.SHARED);
                    // 添加到链尾
                    currentNode = AddReader(tail, reader);
                    // 是否需要循环等待
                    if (tail.waitStatus == Node.RUNNING)
                    {
                        reader.waitStatus = Node.RUNNING;
                        reentrants += 1;
                        return;
                    }
                }
                // 队尾节点读链达到了阈值,直接加入队尾,并等待唤醒
                else
                {
                    currentNode = Enq(Node.SHARED);
                }
            }
        }
    }
    while (currentNode.waitStatus != Node.SIGNAL) { }
    currentNode.waitStatus = Node.RUNNING;
    reentrants = currentNode.readerHead.readerCount;
    owner = currentNode.readerHead == null ? null : currentNode.readerHead.thread;
}

逻辑如下:

  1. 先判断队列是否已初始化,若未初始化,先初始化head节点
  2. 判断队列是否为空(即判断tail==null),若是直接进入队列,成功获取锁,返回;否则进入下一步
  3. 判断当前线程是否可重入锁(读线程的可重入判断与写不同,会在后面对ReaderCanReentranted()方法解释时介绍到),若是reentrants1,返回;否则进入下一步
  4. 获取当前持有锁节点,若为空说明队列为空,直接进入队列,成功获取锁,返回;否
View on GitHub
GitHub Stars14
CategoryDevelopment
Updated1mo ago
Forks3

Languages

C#

Security Score

75/100

Audited on Feb 28, 2026

No findings