本文是学习《多处理器编程的艺术》的笔记。

下面主要介绍了一些常用的锁的原理,这些只是一些理论,离我们实际使用还是有一些差距的。不过这种理论也往往是相对比较好容易理解了掌握的,只有了解了这些理论,可以加深我们对锁原理知识的理解。有能力的同学更可能根据这些理论定制开发符合自己场景的高性能的锁。

本篇文章并没有具体讲到ava中锁的实现。但是通过本篇文章,相信大家对Java中锁的实现也会有更深入的理解。因为Java中Lock的底层实现AbstractQueuedSynchronizer的原型在这里就会讲到。

同样的这些原理在其他很多地方也能见到,比如TCP中关于碰撞的处理就和这里提到的指数后退锁是一样的。zookeeper的分布式锁也就是通过队列的方式来实现的。

测设-设置锁

import java.util.concurrent.atomic.AtomicBoolean;

public class TASLock {
    private AtomicBoolean state=new AtomicBoolean(false);
    public void lock(){
        while(state.getAndSet(true)){

        }
    }
    public void unlock(){
        state.set(false);
    }
}

TASLock通过将state变量的值由false变为true来设置加锁。

注意:只有将state变量从false变为true的线程,可以获得锁,其他线程都会在state.getAndSet(true)上一直自旋

import java.util.concurrent.atomic.AtomicBoolean;

public class TTASLock {
    private AtomicBoolean state=new AtomicBoolean(false);
    public void lock(){
        while(true){
            while(state.get()){}
            if(!state.getAndSet(true)){
                return;
            }
        }
    }
    public void unlock(){
        state.set(false);
    }
}

TTASLock和TASLock基本一样,只不过,TTASLock在设置state的值为true之前,首先会判断state当前的值是不是false,如果不是false就会在state.getAndSet(true)上自旋等待;如果将state的值从false设置为true失败就又会重新开始上述操作。

TASLock和TTASLock它们的性能如何呢?

从图上的结果就能看出TASLock和TTASLock它们直接的性能差距还是比较明显的。

为什么会有这么大的性能差距呢?

主要的原因是为了CPU访问数据的速度,CPU内部都有一个小容量的存储器cache,CPU只会跟cache通信,并不会直接跟内存通信。

getAndSet设置的变量是有volatile修饰的。根据缓存一致性协议,这事getAndSet会直接将数据写回内存,同时迫使其他处理器cache中的对应的副本无效,这时其他处理器就需要去内存中重新读取对应变量的新值。

处理器写回数据到内存、处理器从内存中读取数据都是通过总线来完成的。是通过总线将数据写到内存中的。而在任何时刻值只能有一个处理器占用总线和内存通信,其他想要读写内存的处理器会被阻塞。

这就是TASLock性能为什么差的原因。

再来看看TTASLock,线程B第一次读锁时发生cache缺失,会从内存中读取值到它的cache中。只要线程A持有锁,线程B就会不断重新该值,这时每次都是cache命中,不会占用总线从内存中读取,也不会降低其他线程的内存访问速度。此外,释放锁的线程也不会被正在该锁上自旋的线程所延迟。

然而,当锁被释放的情况并不理想。锁的持有者将false写入锁变量来释放锁。该操作会使自旋线程的cache副本立刻失效。每个线程都将放生一次cache缺失并重新读取新值,它们会同时调用getAndSet以获取锁。第一个成功的线程将再次使其他线程中cache中的值失效。这些失效线程接下来又重新从内存中读取那个值,从而引起一场总线流量风暴。最终,所有线程再此平静,进入本地自旋。

从上面也可以看出,本地自旋每次重复读取被缓存的值而不是反复使用总线从内存中读取,对设计高效的自旋锁非常关键。

指数后退

TTASLock类中,性能消耗主要在当锁看似空闲时,调用getAndSet来获取锁,这个过程极有可能存在高争用。因为每个获取锁的线程几乎都是同时看到锁空闲,同时调用getAndSet来获取锁,但是这时每个获取锁的线程获得锁的机会都是非常小的。所以我们可以想办法来避免每个获取锁的线程同时调用getAndSet,让它们在不同的时间调用getAndSet,给正在竞争的线程以结束的机会,将会更有效。

所以当线程调用getAndSet获取锁失败时,让它们随机暂停一段时间来减少它们的获取锁的争用。暂停的时间太短,没有意义;太长,会影响整个获取锁的时间。暂停时间固定的话,也没有意义,因为下次它们又会是同时调用getAndSet。所以我们需要指定最小暂停时间minDelay,最大暂停时间maxDelay。限制暂停时间在它们之间的一个随机值。

说了这么多了,我们直接看代码吧。

首先定义一个暂停时间的类Backoff

import java.util.Random;

public class Backoff {
    final int minDelay,maxDelay;
    int limit;
    final Random random;
    public Backoff(int min,int max){
        minDelay=min;
        maxDelay=min;
        limit=minDelay;
        random=new Random();
    }
    public void backoff() throws InterruptedException {
        int delay=random.nextInt(limit);
        limit=Math.min(maxDelay,2*limit);
        Thread.sleep(delay);
    }
}

这里的暂停时间上限每次都是*2,所以也叫指数后退。

这种指数后退的处理算法TCP的碰撞处理也用到了

下面我们修改TTASLock类获取锁的getAndSet方法中返回false时,调用backoff方法随机暂停个时间。我们把这个修改的类叫BackoffLock,它的完整代码就是下面这个样子了

import java.util.concurrent.atomic.AtomicBoolean;

public class BackoffLock implements Lock {
    private AtomicBoolean state = new AtomicBoolean(false);
    private static final int MIN_DELAY = 1;
    private static final int MAX_DELAY = 10;

    public void lock() {
        Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
        while (true) {
            while (state.get()) {
            }
            if (!state.getAndSet(true)) {
                return;
            } else {
                try {
                    backoff.backoff();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public void unlock() {
        state.set(false);
    }
}

BackoffLock这种指数后退的锁易于实现,且在许多系统结构中要比TASLock、TTASLock的性能要好。但是它的暂停时间的最优值的选择与处理器的个数以及速度密切相关,因此,很难调整BackoffLock类以使它与各种不同的机器相互兼容。

队列锁

这是一种实现了可扩展自旋锁的方法,比BackoffLock这种指数后退锁复杂一些,但却有更好的移植性。在BackoffLock中有两个问题

  • cache一致性流量:所有线程都在同一个共享存储单元上自旋,每一次成功的锁访问都会产生cache一致性流量
  • 临界区利用率低:线程延迟过长,导致临界区利用率低下。

可以将线程组织成一个队列来克服这些缺点。在队列中,每个线程检测其前驱线程是否已完成来判断是否轮到它自己。获取锁,释放锁只有当前节点的后续节点的cache失效,不会影响到其他节点;这样就只有这个后续节点会去通过总线访问内存,其他节点的cache依旧是有效的。

基于数组的锁

import java.util.concurrent.atomic.AtomicInteger;

public class ALock implements Lock{
    private ThreadLocal<Integer>  mySlotIndex=ThreadLocal.withInitial(()->0);
    private AtomicInteger tail;
    private volatile boolean[] flag;
    private int size;
    public ALock(int capacity){
        size=capacity;
        tail=new AtomicInteger(0);
        flag=new boolean[capacity];
        flag[0]=true;
    }
    public void lock(){
        int slot=tail.getAndIncrement()%size;
        mySlotIndex.set(slot);
        while(!flag[slot]){};
    }
    public void unlock(){
        int slot= mySlotIndex.get();
        flag[slot]=false;
        flag[(slot+1)%size]=true;
    }
}

在上面的类中,有一个boolean类型的flag数组(如上图所示,上图中的数组大小为128),数组中每个位置对应一个要加锁的线程。只有当数组中值为true的线程可以获取锁。有一个AtomicInteger类型的tail,每个加锁的线程通过修改它的值,来获得当前线程所对应的flag数组中的位置。

tail属性是一个成员变量,可以被多个线程共享,初始值是0。为了获得锁,每个线程原子地增加tail1域的值。所得的结果值作为线程的槽,也就是对应数组flag的下标。同时这个结果值也通过ThreadLocal和对应的线程做了绑定。

如果flag[i]的值为true,那么取得结果值为i的线程就有权获得锁。

在初始状态时,只有flag[0]为true,其他的都是false。(java中原始变量的boolean的默认值是false)。表明只有当前只有增加tail的返回值是0的线程可以获取锁。

在释放锁值,线程把自己对应的数组位置值设置成false,同时将数组下一个位置的值设置成true。确保取的数组下一个位置的线程可以获得锁。

但是这种数组结构有两个问题:

1 由于我们是先创建ALock的对象,这时flag数组的容量就确定了,但是我们每个获取锁的线程都需要去通过得到数据下标的方式来获取锁。但是我们有多少个线程来获取锁,常常在创建ALock对象时未知的,为了避免数组越界,我们通过取余的方式来得到线程在数组中的位置,但这就有了一个问题,多个线程可能会对应到一个数组位置,也就是可能存在多个线程同时获得锁的问题。

2 从上面右边的图上可以看到flag数组中多个元素在同一个缓存行,也就有了伪共享的问题。

要避免伪共享的问题,那就需要每个线程在数组中更新的值(tail.getAndIncrement()%size;得到的值)都在单独的一个缓存行。换句话说也就是每个线程更新的值代表一个缓存行。

每次获取锁时调用tail.getAndIncrement()%size;的返回值*(一个缓存行的字节数/数组中单个元素的字节数)。这样就可以确保每个线程修改数组的位置都在单独的一个缓存行。

CLH队列锁

上面可以看到基于数组的锁的两个问题:

1.存在多个线程可能同时获得锁的问题。

2.为了解决伪共享的问题,需要浪费内存和cache。

CLH队列锁就可以解决这个问题。

说到CLH,如果你对Java的锁有一定的了解,那你对这个名字至少应该不会感到陌生,因为在AbstractQueuedSynchronizer中的队列就是参考CLH队列来实现的。

CLH队列锁是一个单向链表结构,每个节点都有一个指向它前驱节点的引用。每个节点的初始值都是true。当它的前驱节点的值是false时,代表它的前驱节点释放了锁,当前节点可以获得锁。

下面看看CLH队列锁的实现代码:

public class CLHLock {
    private AtomicReference<QNode> tail = new AtomicReference<QNode>(new QNode());
    ThreadLocal<QNode> myPred;
    ThreadLocal<QNode> myNode;

    public CLHLock() {
        tail = new AtomicReference<>(new QNode());
        myNode = ThreadLocal.withInitial(() -> new QNode());
        myPred = ThreadLocal.withInitial(() -> null);
    }

    public void lock() {
        QNode qNode = myNode.get();
        qNode.locked = true;
        QNode pred = tail.getAndSet(qNode);
        myPred.set(pred);
        while (pred.locked) {
        }
    }

    public void unlock() {
        QNode qNode = myNode.get();
        qNode.locked = false;
    }
    privaet static class QNode {
        public volatile boolean locked;
    }
}

队列的每个节点都是一个QNode。这种其实是一种隐式队列。因为它的队列节点QNode并没有属性指向其前驱节点或者后续节点。

myPred 和 myNode都是ThreadLocal变量。它们分别存储队列中每个线程的在队列上的前驱节点和它自己的节点。

它的前后节点是这样串起来的:

  1. 13行首先myNode.get();返回myNode这个ThreadLocal变量存储的节点。

  2. 每个线程加锁的时候都会调用 tail.getAndSet(qNode),将队列中的尾节点tail替换成第1步得到的节点

  3. “myPred.set(pred);这句代码将原来的tail节点存储到myPred`这个ThreadLocal变量中。

这里执行完之后,原来的tail节点就存储到myPred中,而新的tail节点就是myNode存储的节点。原来的tail节点也就成了新的tail节点的前驱节点。

这里需要注意的是就算有多个线程同时调用tail.getAndSet(qNode),这行代码也是串行执行的。它们只会修改最新的tail节点。也就是只会影响到myNode存储的节点是不是当前最新的tail节点。并不会影响到myNode存储的节点和myNode存储的节点的关系。也就是说不会改变myNode存储的节点是myNode存储的节点的前驱节点这个事实。

tail代表队列的尾节点。初始时,队列中只有一个tail节点。

在加锁过程中,每个线程通过将自己添加到tail节点后面,将自己的节点(myNode)作为tail节点的值,同时将原来的tail节点的值更新尾当前线程的前驱节点(myPred)的值。通过这种方式,当前线程就将自己的节点加入到了队列中。它去观察它前驱节点(myPred)的值,当myPred节点的值为false时,表明当前线程可以获得锁。

在解锁的过程中,只需要将当前线程节点(myNode)的值更新为false,它的后续线程就可以获得锁。

初始状态:

​ 这时,只有一个tail节点。它的值false。

加锁阶段:

​ 首先会创建一个当前节点myNode,将它的值设置为True。

​ 同时调用tail.getAndSet(qNode)将自己更新为tail节点,它的返回值pred是原来的tail节点。

​ 再执行myPred.set(pred);将原来的tail节点设置成当前节点的前驱节点。

​ 当前节点通过自旋的方式去观察前驱节点的值是否为false,只有当前驱节点为false时,表示前驱节点已经释放锁,当前节点可以获得锁。

解锁阶段:

​ 这里只需要将当前节点myNode的值设置成false就可以了。

​ 当前线程结束后ThreadLocal变量也会自动被垃圾回收器回收。这里的myPred节点由于没有其他引用,就会被垃圾回收器回收。myNode节点要不就是tail节点,要不就是它的后续节点的前驱节点,所以myNode节点并不会被垃圾回收器回收。

CLH队列锁可以解决ALock的两个问题。

1.由于我们当前时一个链表,所以不存在容量不够的问题。

2.我们CLH队列中的每个节点都是在调用加锁阶段才创建的。而且由于每个节点都是由不同线程创建的。所以我可以认为它们不会在同一个缓存行中。

但是CLH队列锁也有一个问题。

1.由于每个线程都要去观察它的前驱节点,进行自旋。所以在无cache的NUMA系统结构下,就会出现前驱节点可能在其他CPU内存上,性能就会比较差。

MCS队列锁:

MCS队列锁也被标识为QNode对象的链表,其中的每个QNode要么表示一个锁的持有者,要么表示一个正在等待获得锁的线程。与CLHLock队列锁相比,它的链表时显式的而不是虚拟的:整个链表通过QNode对象里的next属性作为后续节点的引用,而在CLH队列锁中时通过ThreadLocal变量myPred来指向前驱节点的。

下面我们看看MCS队列锁的代码:

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

public class MCSLock implements Lock {

    private final AtomicReference<QNode> queue;
    private AtomicReference<QNode> tail = new AtomicReference<QNode>(new QNode());
    ThreadLocal<QNode> myNode;
    public MCSLock(){
        queue=new AtomicReference<QNode>();
        myNode=ThreadLocal.withInitial(()->new QNode());
    }


    @Override
    public void lock() {
      QNode qNode=myNode.get();
      QNode pred=tail.getAndSet(qNode);
      if(Objects.nonNull(pred)){
          qNode.locked=true;
          pred.next=qNode;
          while(qNode.locked){

          }
      }
    }

    @Override
    public void unlock() {
        QNode qNode=myNode.get();
        if(Objects.isNull(qNode.next)){
            if(tail.compareAndSet(qNode,null)){
                return ;
            }
            while(Objects.isNull(qNode.next)){

            }
        }
        qNode.next.locked=false;
        qNode.next=null;
    }
    
    private static class QNode {
    public volatile boolean locked;
    public volatile QNode next;
}
}

加锁阶段:

每个线程的 节点还是调用tail.getAndSet(qNode)将自己添加到整个队列的尾部,同时修改自己为tail节点。它的返回值就是原来tail保存的节点。由于在初始化时未给tail分配节点(,所以tail的值有可能为空unlock也可能导致tail节点为空)。

接下来判断原来的tail节点是否为空:

​ 1.如果为空,表示当前节点就是整个队列的头节点。这时就可以获取锁。

​ 2.如果原来的tail节点不为空,就将当前节点的locked属性设置为true,同时将原来的tail节点的next属性设置为当前节点。

在当前节点上自旋,等待当前节点的locked属性为false,以便获取锁。

解锁阶段:

首先判断当前节点的后续节点next属性是否为空,如果为空,那就有两种情况。

  1. 当前节点就是队尾节点。
  2. 已经有了后续节点,但是后续节点还没有调用pred.next=qNode;将当前要释放锁的节点的后续节点设置成它自己。

对于这两种情况,都会通过调用tail.compareAndSet(qNode,null)来判断当前节点是否是tail节点,并将tail节点由当前节点设置成null,然后直接返回。如果这句能执行成功,那就说明当前节点就是队尾节点,也就是在当前解锁的线程执行完tail.getAndSet(qNode)这就后,一直没有其他线程执行过这行代码。这也就是解决上面next为空的第一种场景。

如果tail.compareAndSet(qNode,null)这句执行不成功,说明已经有其他线程执行了tail.getAndSet(qNode)这句代码,当前的tail节点已经不是当前线程自己的节点了 。这时就需要等待执行完tail.getAndSet(qNode)代码的线程直到它执行完pred.next=qNode;这句代码,这里是通过自旋进行等待的。这也是解决上面说的第二种场景。

上面的第一种场景 在34行已经return,代码能执行到40行,就表示当前节点已经不是队尾tail节点了。这时只需要将当前节点的后续节点next节点的locked值设置为false,从而唤醒next节点获得锁。

qNode.next=null;将后续节点和当前节点的链接断开,方便当前节点被垃圾回收器回收。

对比CLH队列锁,MCS队列锁在加锁阶段只是在自己的节点上自旋,也就是没有了无cache的NUMA问题。但是它解锁的时候也需要在后续节点上进行自旋,不过这个自旋的时候就非常短,无cache的NUMA问题的问题基本也可以忽略吧。

时限队列锁

Java的Lock接口中有个boolean tryLock(long time, TimeUnit unit) throws InterruptedException;方法,调用者可以指定一个时限:调用这个方法为获得锁而准备等待的最大时间,如果在调用者获得锁之前超时,调用者则放弃获得锁的尝试。

用CLH队列锁来举例,我们可以重载lock方法,传入最大的等待时间,线程在它的前驱节点自旋的时候判断是否超时,如果超时就返回。

    public void lock(long   time) {
        long start=System.currentTimeMillis();
        QNode qNode = myNode.get();
        qNode.locked = true;
        QNode pred = tail.getAndSet(qNode);
        myPred.set(pred);
        while (pred.locked) {
            //超时
            if((System.currentTimeMillis()-start)>time){
                return;
            }
        }
    }

上面的代码是我自己写的,书上没有。上面的代码只是在超时时直接return,看着和正常获取锁的基本没区别。严格来说,在超时时,我们还需要将当前节点的前驱节点和当前节点的后续节点连起来,从整个队列中删除掉当前节点。这里有两个问题:

1.CLH队列锁都只有前驱节点的引用,没有后续节点的引用。所以如果要删除当前节点,让当前节点的前驱节点和后续节点连起来,就需要每个节点判断它的前驱节点是不是由于超时等原因实效了,如果失效了 就要将它的前驱节点剔除掉,将它的前驱节点设置成它前驱节点的前驱节点。这个无疑增加了加锁节点代码的复杂度。

2.从一个队列节点中删除一个节点还会扰乱并发锁的释放,这个解决起来也会比较困难。

所以我们需要采用一种迂回的方法,不是在超时时主动的将当前节点从队列中删除掉,而是采用惰性方法:若一个线程超时,则该线程将它的节点标记为已放弃。这样该线程在队列中的后续(如果有)将会注意到它正在自旋的节点已经被放弃,于是开始在被放弃节点的前驱上自旋。

下面还是直接看代码吧

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class TOLock implements Lock {
    static QNode AVAILABLE = new QNode();
    AtomicReference<QNode> tail;
    ThreadLocal<QNode> myNode;

    public TOLock() {
        tail = new AtomicReference<>();
        myNode = ThreadLocal.withInitial(() -> new QNode());
    }

    @Override
    public void lock() {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) {
        long startTime = System.currentTimeMillis();
        long patience = TimeUnit.MILLISECONDS.convert(time, unit);
        QNode qNode = new QNode();
        myNode.set(qNode);
        qNode.pred = null;
        QNode myPred = tail.getAndSet(qNode);
        if (Objects.isNull(myPred) || myPred.pred == AVAILABLE) {
            return true;
        }
        while (System.currentTimeMillis() - startTime < patience) {
            QNode predPred = myPred.pred;
            if (predPred == AVAILABLE) {
                return true;
            } else if (Objects.nonNull(predPred)) {
                myPred = predPred;
            }

        }
        if (!tail.compareAndSet(qNode, myPred)) {
            qNode.pred = myPred;
        }
        return false;


    }

    @Override
    public void unlock() {
        QNode qNode = myNode.get();
        if (!tail.compareAndSet(qNode, null)) {
            qNode.pred = AVAILABLE;
        }
    }

    private static class QNode {
        public volatile QNode pred = null;
    }

下面我们简单分析下上面的代码

首先定义了一个QNode对象(57行),作为当前线程加锁状态的节点,其中它有个pred属性(58行)。关于pred的取值,分为下面几种情况:

  1. 当前线程已经获得了锁,或者当前线程正在获取锁(此时还没有超时),这时pred的值就是null
  2. 如果当前线程在获取锁的过程中超时了,这时pred的值就是它的前驱节点的pred的值。
  3. 如果当前线程解锁时,队列中已经有了后续线程,就将pred的值设置成AVAILABLE,指示它的后续节点可以获得锁。

加锁的代码:

1.首先我们创建一个QNode对象,存储到threadLocal变量myNode中。然后通过原子的方式将队列中的tail尾节点更新为当前线程的QNode对象(28行),同时将原来的tail节点存储到myPred变量中。

2.接下来判断myPred的值:

​ (1).如果myPred==null,表示当前队列中只有当前线程一个节点,此时它就可以获取到锁。

​ (2).如果myPred.pred == AVAILABLE,表示当前节点的前驱节点已经释放了锁,此时当前线程也就可以获取到锁。

​ (3).对于其他的情况,都不能直接获取锁。

3.在32行开始,使用while循环来等待它的前驱节点释放锁或者超时。

​ 在这里面主要是通过对myPred变量的pred值进行判断。在进入while循环时,myPred表示当前线程的前驱节点。前面已经说过pred的取值有3中情况,下面我们对while循环中pred取值的3中情况简单分析下:

QNode predPred = myPred.pred;由于myPred是当前线程的前驱节点,所以这里的predPred就是当前线程的前驱节点的pred属性。

​ (1).predPred==null,表示当前线程的前驱节点也在等待获取锁,这时当前线程需要继续在自己的节点上循环,等待前驱节点超时(36行),或者前驱节点获得锁后并释放锁(34行)。

​ (2).predPred==AVAILABLE表示前驱节点已经释放了锁,当前节点就可以获取锁。

​ (3)其他取值,这种其实也就只有一种取值了,就是42行的代码。不过这里需要注意的是42行是当前线程节点超时的操作。while 循环中这里就是当前线程的前驱节点超时的情况了。也就是36行的代码,直接修改myPred = predPred;跳过超时的节点,继续 执行后续操作。

4.在41行,对超时进行一个处理。如果当前线程的节点依旧是队列中的尾节点(在28行到41行之间,没有其他线程执行第28行代码),就直接将当前节点从队列中去掉。如果当前线程的节点已经不是尾节点,就将当前线程节点的pred的前驱节点。

解锁的代码

这个就比较简单了,直接判断队列中是否有后续节点,如果有后续节点,就将当前节点的pred设置成AVAILABLE,唤醒队列中的后续节点。