无锁队列的实现

无锁队列的实现

关于无锁队列的实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的方式把这些文章中的重要的知识点串起来和大家讲一讲这个技术。下面开始正文。

关于CAS等原子操作

在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构。

这个操作用C语言来描述就是下面这个样子:(代码来自Wikipedia的Compare And Swap词条)意思就是说,看一看内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。

int compare_and_swap (int* reg, int oldval, int newval)
{
  int old_reg_val = *reg;
  if (old_reg_val == oldval)
     *reg = newval;
  return old_reg_val;
}

这个操作可以变种为返回bool值的形式(返回 bool值的好处在于,可以调用者知道有没有更新成功):

bool compare_and_swap (int *accum, int *dest, int newval)
{
  if ( *accum == *dest ) {
      *dest = newval;
      return true;
  }
  return false;
}

与CAS相似的还有下面的原子操作:(这些东西大家自己看Wikipedia吧)

注:在实际的C/C++程序中,CAS的各种实现版本如下:

1)GCC的CAS

GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看 GCC Atomic Builtins

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

2)Windows的CAS

在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的InterLocked Functions

 InterlockedCompareExchange ( __inout LONG volatile *Target,
                                 __in LONG Exchange,
                                 __in LONG Comperand);

3) C++11中的CAS

C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 Atomic Operation Library

template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,
                                   T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,
                                   T* expected, T desired );

无锁队列的链表实现

下面的东西主要来自John D. Valois 1994年10月在拉斯维加斯的并行和分布系统系统国际大会上的一篇论文——《Implementing Lock-Free Queues》。

我们先来看一下进队列用CAS实现的方式:

EnQueue(x) //进队列
{
    //准备新加入的结点数据
    q = new record();
    q->value = x;
    q->next = NULL;

    do {
        p = tail; //取链表尾指针的快照
    } while( CAS(p->next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试

    CAS(tail, p, q); //置尾结点
}

我们可以看到,程序中的那个 do- while 的 Re-Try-Loop。就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。

你会看到,为什么我们的“置尾结点”的操作(第12行)不判断是否成功,因为:

  1. 如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
  2. 此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
  3. 直到T1线程更新完tail指针,于是其它的线程中的某个线程就可以得到新的tail指针,继续往下走了。

这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()

EnQueue(x) //进队列改良版
{
    q = new record();
    q->value = x;
    q->next = NULL;

    p = tail;
    oldp = p
    do {
        while (p->next != NULL)
            p = p->next;
    } while( CAS(p.next, NULL, q) != TRUE); //如果没有把结点链在尾上,再试

    CAS(tail, oldp, q); //置尾结点
}

我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而通实际情况看下来,99.9%的情况不会有线程停转的情况,所以,更好的做法是,你可以接合上述的这两个版本,如果retry的次数超了一个值的话(比如说3次),那么,就自己fetch指针。

好了,我们解决了EnQueue,我们再来看看DeQueue的代码:(很简单,我就不解释了)

DeQueue() //出队列
{
    do{
        p = head;
        if (p->next == NULL){
            return ERR_EMPTY_QUEUE;
        }
    while( CAS(head, p, p->next) != TRUE );
    return p->next->value;
}

我们可以看到,DeQueue的代码操作的是 head->next,而不是head本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个元素,head和tail都指向同一个结点的问题,这样EnQueue和DeQueue要互相排斥了

注:上图的tail正处于更新之前的装态。

CAS的ABA问题

所谓ABA(见维基百科的ABA词条),问题基本是这个样子:

  1. 进程P1在共享变量中读到值为A
  2. P1被抢占了,进程P2执行
  3. P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
  4. P1回来看到共享变量里的值没有被改变,于是继续执行。

虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的地址。如果这个地址被重用了呢,问题就很大了。(地址被重用是很经常发生的,一个内存分配后释放了,再分配,很有可能还是原来的地址)

比如上述的DeQueue()函数,因为我们要让head和tail分开,所以我们引入了一个dummy指针给head,当我们做CAS的之前,如果head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了,这会有很大的问题。(内存管理中重用内存基本上是一种很常见的行为

这个例子你可能没有看懂,维基百科上给了一个活生生的例子——

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

这就是ABA的问题。

解决ABA的问题

维基百科上给了一个解——使用double-CAS(双保险的CAS),例如,在32位系统上,我们要检查64位的内容

1)一次用CAS检查双倍长度的值,前半部是指针,后半部分是一个计数器。

2)只有这两个都一样,才算通过检查,要吧赋新的值。并把计数器累加1。

这样一来,ABA发生时,虽然值一样,但是计数器就不一样(但是在32位的系统上,这个计数器会溢出回来又从1开始的,这还是会有ABA的问题)

当然,我们这个队列的问题就是不想让那个内存重用,这样明确的业务问题比较好解决,论文《Implementing Lock-Free Queues》给出一这么一个方法——使用结点内存引用计数refcnt

SafeRead(q)
{
    loop:
        p = q->next;
        if (p == NULL){
            return p;
        }

        Fetch&Add(p->refcnt, 1);

        if (p == q->next){
            return p;
        }else{
            Release(p);
        }
    goto loop;
}

其中的 Fetch&Add和Release分是是加引用计数和减引用计数,都是原子操作,这样就可以阻止内存被回收了。

用数组实现无锁队列

本实现来自论文《Implementing Lock-Free Queues

使用数组来实现队列是很常见的方法,因为没有内存的分部和释放,一切都会变得简单,实现的思路如下:

1)数组队列应该是一个ring buffer形式的数组(环形数组)

2)数组的元素应该有三个可能的值:HEAD,TAIL,EMPTY(当然,还有实际的数据)

3)数组一开始全部初始化成EMPTY,有两个相邻的元素要初始化成HEAD和TAIL,这代表空队列。

4)EnQueue操作。假设数据x要入队列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),则说明队列满了。

5)DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同样需要注意,如果x是TAIL,则说明队列为空。

算法的一个关键是——如何定位HEAD或TAIL?

1)我们可以声明两个计数器,一个用来计数EnQueue的次数,一个用来计数DeQueue的次数。

2)这两个计算器使用使用Fetch&ADD来进行原子累加,在EnQueue或DeQueue完成的时候累加就好了。

3)累加后求个模什么的就可以知道TAIL和HEAD的位置了。

如下图所示:

 小结

以上基本上就是所有的无锁队列的技术细节,这些技术都可以用在其它的无锁数据结构上。

1)无锁队列主要是通过CAS、FAA这些原子操作,和Retry-Loop实现。

2)对于Retry-Loop,我个人感觉其实和锁什么什么两样。只是这种“锁”的粒度变小了,主要是“锁”HEAD和TAIL这两个关键资源。而不是整个数据结构。

还有一些和Lock Free的文章你可以去看看:

注:我配了一张look-free的自行车,寓意为——如果不用专门的车锁,那么自行得自己锁自己!

 (全文完)


关注CoolShell微信公众账号可以在手机端搜索文章

(转载本站文章请注明作者和出处 酷 壳 – CoolShell ,请勿用于任何商业用途)

——=== 访问 酷壳404页面 寻找遗失儿童。 ===——
好烂啊有点差凑合看看还不错很精彩 (18 人打了分,平均分: 4.67 )
Loading...

无锁队列的实现》的相关评论

  1. EnQueue(x) //进队列
    {
    //准备新加入的结点数据
    q = new record();
    q->value = x;
    q->next = NULL;

    do {
    p = tail; //取链表尾指针的快照
    } while( CAS(&tail, p, q) != TRUE); //检查尾端改变没,没有赋值,有重复
    }
    有没有什么问题? ABA问题也没有了吧?

  2. EnQueue(x) //进队列改良版
    {
    q = new record();
    q->value = x;
    q->next = NULL;

    do {
    p = tail;
    } while( CAS(tail, p, q) != TRUE); //如果没有把结点链在尾上,再试

    p.next = q;
    }

    先修改tail的值,再建立旧tail到新增节点的连接?一次cas就够用的吧

  3. 这个无锁队列的实现方式可能存在一个问题,就是不能删除节点,如果删除节点可能会引起core dump,所以从这点上说,这个实现方式的运用场景可能有限。

  4. 同意freewill的观点,如果删除节点,可能会core dump:
    EnQueue(x) //进队列
    {
    //准备新加入的结点数据
    q = new record();
    q->value = x;
    q->next = NULL;
    do {
    p = tail; //取链表尾指针的快照
    // 如果这个时候线程被抢占,p所指的tail被其它线程删除,紧跟的CAS指令就会导致crash.
    } while( CAS(p->next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试

    CAS(tail, p, q); //置尾结点
    }

    而且,个人认为:传统的锁(pthread_mutex_lock/pthread_mutex_unlock, 或者EnterCriticalSection/LeaveCriticalSection) 在不发生锁的争用的情况下,其开销和一个原子操作相当,一般来说高效的锁的实现,如果没有锁的争用,不会导致系统调用的开销。只有在发生锁的争用的时候,才会使不能即时获得锁的线程进入睡眠并等待另一个获得锁的线程唤醒(在这种情况下才会产生系统调用)。
    我的观点是,如果锁的争用的概率很低的话,使用无锁算法相对于使用传统的锁在性能上不会有太大的优势。
    如果锁的争用概率很高的话,无锁算法Busy Wait的方式有可能会导致更高的CPU使用率。

    当然,一些便捷的原子操作,比如sync_add_and_fetch/sync_add_and_sub用于实现一些引用计数,我还是很喜欢的。

  5. 同觉得有问题,改良版最后一行,可能会两个线程竞争tail,其中一个更新失败

    Marvin :
    我也觉得改良版存在问题。
    进程1 插入数据q1, 循环执行后,tail->next = q1. 然后被进程2抢占。
    进程2 插入数据q2, 开始执行, while(p->next != NULL)成立, p等于q1, 然后q1->next = q2. 被进程1被抢占。
    进程1 判断tail == oldp, 将tail改为q1。
    进程2 判断tail != oldp, 不改变tail。
    这时候tail不就错了吗?
    是不是我那个地方忽略了?

    @Marvin

  6. @nadir

    我考虑了一下,觉得其实都是ok的。

    使用 CAS(tail, oldp, q) 的暗示是:只有 happy path 才能够将 tail 变成 q;如果不是 happy path,那么我们就假定认为,其他线程已经更改了 tail,因此,咱们这儿就不用再多此一举更新 tail 了,于是减少了一次更改 tail 的操作。

    相应的,基于这种「必须是 happy path 才会更新 tail」的假定,肯定会出现「谁都不更新 tail 的情况」,如 A B C 三个线程,没有任何一个跑出了完整的 happy path,都是第一个原子 CAS 操作完成后,就被人抢占了(不妨假设是 A -> B -> C -> A 的循环抢占关系……),那么大家都不更新 tail 了……

    但是,这也没什么关系,由于 do {} while () 里另一个 while 的存在,使得总能够在某个时间,tail 被正确的更新。

    P.S.

    如果是 CAS(tail, p, q) 操作,则更新 tail 的几率更大一些(上面那种足够规律的循环抢占的场景里,大家都能够更新 tail……),当然某些更新 tail 的操作并不一定是(理论上)完全正确的。

    简而言之,两种做法都是 ok 的……

  7. Pingback: lock free的理解
  8. 讲的不错,受教育。
    无锁队列其实主要用在多线程中,尤其是多核,目前的所有实现都应该考虑cache MESI协议问题

  9. 改良版也是有问题的吧。
    while (p->next != NULL)
    p = p->next;
    让p自己fetch到尾部的前提是当前的p到尾部之间的节点没有被free。
    但如果当前的p与tail之间有节点在DeQueue中被free的话,p->next就会造成访问越界了。
    虽然说文章中的DeQueue没有free的操作,但理论上需要时间这样的操作吧?

  10. EnQueue(x) //进队列
    {
    q = new record();
    q->value = x;
    q->next = NULL;

    do {
    p = tail;
    // 如果这个时候线程被抢占,p所指的tail被其它线程删除,即便没有free,cas也可能执行成功,这种情况下,
    会将一个delete节点的next指向q,并执行成功,这也不是我们想看的的结果。
    解决办法是在delete 节点前先mark节点 p->next &= 0x1
    } while( CAS(p->next, NULL, q) != TRUE);

    CAS(tail, p, q);
    }

  11. EnQueue(x) //进队列
    {
        //准备新加入的结点数据
        q = new record();
        q-&gt;value = x;
        q-&gt;next = NULL;
     
        do {
            p = tail; //取链表尾指针的快照
        } while( CAS(p-&gt;next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试
     
        CAS(tail, p, q); //置尾结点
    }

    这样实现是否等价:

    EnQueue(x) //进队列
    {
        //准备新加入的结点数据
        q = new record();
        q-&gt;value = x;
        q-&gt;next = NULL;
     
        do {
            // p = tail; //取链表尾指针的快照
        } while( CAS(tail , NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试
     
        // CAS(tail, p, q); //置尾结点
        tail = q;
    }
  12. chinaxing :

    EnQueue(x) //进队列
    {
        //准备新加入的结点数据
        q = new record();
        q-&gt;value = x;
        q-&gt;next = NULL;
        do {
            p = tail; //取链表尾指针的快照
        } while( CAS(p-&gt;next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试
        CAS(tail, p, q); //置尾结点
    }
    这样实现是否等价:
    EnQueue(x) //进队列
    {
        //准备新加入的结点数据
        q = new record();
        q-&gt;value = x;
        q-&gt;next = NULL;
        do {
            // p = tail; //取链表尾指针的快照
        } while( CAS(tail-&gt;next , NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试
        // CAS(tail, p, q); //置尾结点
        tail = q;
    }
  13. 之前面试就被问过无锁队列,当时我就怀疑,这貌似理论上无法实现吧,当然我也不知道如何证明。但后来看了很多人写的所谓无锁队列,其实还是有锁啊,只不过这个锁是指令集实现的而已。有真正的无锁队列吗?在理论上是不是也可以证明有或没有?

  14. EnQueue(x) //进队列
    {
        //准备新加入的结点数据
        q = new record();
        q->value = x;
        q->next = NULL;
     
        do {
            p = tail; //取链表尾指针的快照
        } while( CAS(p->next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试
     
        CAS(tail, p, q); //置尾结点
    }

    感觉等价于

    EnQueue(x) //进队列
    {
        //准备新加入的结点数据
        q = new record();
        q->value = x;
        q->next = NULL;
     
        do {
            p = tail; //取链表尾指针的快照
            p->next=q;
        }while(CAS(tail,p,q));
        
    }
    因为cas本质上是一种乐观锁嘛。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*