JUC: AQS框架

  • 时间:
  • 来源:互联网
  • 文章标签:

AbstractQueuedSynchronizer:AQS框架

阅读前需知:此文章是我在学习过程中做的笔记,可能有许多错误之处,欢迎大家指出并提出宝贵的意见,同时感谢参考的原创文章的作者们!

更多个人笔记以及面试技巧可以到本人的Github上观看:杰哥的Java_Note

一、基础知识

1、基本介绍

中文翻译:抽象的队列式的同步器,队列同步器

并发编程中实现同步器的一个框架

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

一个非常有用的超类,可用来定义锁以及依赖于排队阻塞线程的其他同步器;

ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier和Semaphore信号量等这些类都是基于AQS类实现的。

2、资源共享方式

AQS维护了一个volatile int state(代表共享资源)和一个**FIFO线程等待队列CLH(**多线程争用资源被阻塞时会进入此队列)

state:一个int类型的成员变量,用来控制同步状态,当state=0时,则说明没有任何线程占有共享资源的锁,当state=1时,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待

2.1 state的访问方式有三种

  • getState()
  • setState()
  • compareAndSetState()

2.2 AQS定义两种资源共享方式

在Node类里面定义了:

Exclusive:独占模式,表示共享状态值state每次能由一条线程持有,其他线程如果需要获取,则需要阻塞。如JUC中的ReentrantLock

Share:共享模式,表示共享状态值state每次可以由多个线程持有,如JUC中的CountDownLatch

3、自定义同步器方法

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):**独占方式。**尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

二、底层实现

0、基础知识

AQS中只能存在一个同步队列,但可拥有多个等待队列

AQS内部通过内部类Node构成FIFO的同步队列来完成线程获取锁的排队工作,同时利用内部类ConditionObject构建等待队列.

同步队列的首节点调用await()方法,该节点相关的线程会被封装成新节点进入等待队列,当前线程调用signal()方法,会取出等待队列中的首节点插入到同步队列的队尾,同步队列和等待队列就是如此做交互的。

CLH锁: 是一个自旋锁,能确保无饥饿性,提供先来先服务FIFO的公平性。

1、节点类Node

static final class Node {
    //共享模式
    static final Node SHARED = new Node();
    //独占模式
    static final Node EXCLUSIVE = null;

    //标识线程已处于结束状态
    static final int CANCELLED =  1;
    //等待被唤醒状态
    static final int SIGNAL    = -1;
    //条件状态,
    static final int CONDITION = -2;
    //在共享模式中使用表示获得的同步状态会被传播
    static final int PROPAGATE = -3;

    //等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
    volatile int waitStatus;

    //同步队列中前驱结点
    volatile Node prev;

    //同步队列中后继结点
    volatile Node next;

    //请求锁的线程
    volatile Thread thread;

    //等待队列中的后继结点,这个与Condition有关,稍后会分析
    Node nextWaiter;

    //判断是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //获取前驱结点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    //.....
}

waitStatus:

  • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
  • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
  • 0状态:值为0,代表初始化状态。

2、同步队列

2.1 基本介绍

节点Node构成FIFO的同步队列

节点的数据结构是双向链表

img
/**
 * AQS抽象类
 */
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer{
//指向同步队列队头
private transient volatile Node head;

//指向同步的队尾
private transient volatile Node tail;

//同步状态,0代表锁未被占用,1代表锁已被占用
private volatile int state;

//省略其他代码......
}

2.2 整体流程图

img

2.3 主要实现方法: acquire()

在NonfairSync获取锁失败会再次请求获取

//参数arg表示要获取同步状态后设置的值(即要设置state的值)
//status为0时是释放锁,1则是获取锁,所以这里一般传递参数为1
public final void acquire(int arg) { 
        
        //再次尝试获取同步状态
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

1) tryAcquire(arg)

tryAcquire(arg)方法在AQS中并没有具体实现,而是交由子类实现,因此该方法是由ReetrantLock类内部sync实现的

compareAndSetState:CAS操作设置state值

//NonfairSync类
static final class NonfairSync extends Sync {

    protected final boolean tryAcquire(int acquires) {
         return nonfairTryAcquire(acquires);
     }
 }


//Sync类
abstract static class Sync extends AbstractQueuedSynchronizer {

  //nonfairTryAcquire方法
  final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      //判断同步状态是否为0,并尝试再次获取同步状态
      if (c == 0) {
          //执行CAS操作
          if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      //如果当前线程已获取锁,属于重入锁,再次获取锁后将status值加1
      else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          //设置当前同步状态,当前只有一个线程持有锁,因为不会发生线程安全问题,可以直接执行 setState(nextc);
          setState(nextc);
          return true;
      }
      
      return false;
  }
  //省略其他代码
}

这里做了两件事,一是尝试再次获取同步状态,如果获取成功则将当前线程设置为OwnerThread,否则失败;

二是判断当前线程current是否为OwnerThread,如果是则属于重入锁,state自增1,并获取锁成功,返回true

反之失败,返回false,也就是**tryAcquire(arg)**执行失败,返回false。

2) addWaiter()

如果tryAcquire(arg)返回true,acquireQueued自然不会执行,这是最理想的,因为毕竟当前线程已获取到锁,如果tryAcquire(arg)返回false,则会执行addWaiter(Node.EXCLUSIVE)进行入队操作,由于ReentrantLock属于独占锁,因此结点类型为Node.EXCLUSIVE,下面看看addWaiter方法具体实现:

private Node addWaiter(Node mode) {
    //将请求同步状态失败的线程封装成结点
    Node node = new Node(Thread.currentThread(), mode);

    Node pred = tail;
    //如果是第一个结点加入pred肯定为空,跳过。
    //如果非第一个结点则直接执行CAS入队操作,尝试在尾部快速添加
    if (pred != null) {
        node.prev = pred;
        //使用CAS执行尾部结点替换,尝试在尾部快速添加
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果第一次加入或者CAS操作失败执行enq入队操作
    enq(node);
    return node;
}

如果是第一个节点加入或者CAS操作失败就执行enq入队操作

private Node enq(final Node node) {
    //死循环
    for (;;) {
         Node t = tail;
         //如果队列为null,即没有头结点
         if (t == null) { // Must initialize
             //创建并使用CAS设置头结点
             if (compareAndSetHead(new Node()))
                 tail = head;
         } else {//队尾添加新结点
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
     }
    }

这个方法使用一个死循环进行CAS操作,可以解决多线程并发问题。

这里做了两件事:一是如果还没有初始同步队列则创建新结点并使用compareAndSetHead设置头结点,tail也指向head;

二是队列已存在,则将新结点node添加到队尾。

注意这两个步骤都存在同一时间多个线程操作的可能,如果有一个线程修改head和tail成功,那么其他线程将继续循环,直到修改成功,这里使用CAS原子操作进行头结点设置和尾结点tail替换可以保证线程安全,从这里也可以看出head结点本身不存在任何数据,它只是作为一个牵头结点,而tail永远指向尾部结点(前提是队列不为null)。

3)acquireQueued(final Node node, int arg)

添加到同步队列后,结点就会进入一个自旋过程,即每个结点都在观察时机待条件满足获取同步状态:前驱节点是否是head,然后从同步队列退出并结束自旋,回到之前的acquire()方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋,死循环
        for (;;) {
            //获取前驱结点
            final Node p = node.predecessor();
            //当且仅当p为头结点才尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                //将node设置为头结点
                setHead(node);
                //清空原来头结点的引用便于GC
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果前驱结点不是head,判断是否挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //最终都没能获取同步状态,结束该线程的请求
            cancelAcquire(node);
    }
}

3、等待队列(独占模式)

每个Condition维护着一个队列,该队列的作用是维护一个等待singal信号的队列。

3.1 Condition:等待唤醒机制

每个Condition都对应着一个等待队列,也就是说如果一个锁上创建了多个Condition对象,那么也就存在多个等待队列。

等待队列是一个FIFO的队列,在队列中每一个节点都包含了一个线程的引用,而该线程就是Condition对象上等待的线程。当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。

  • 通过Condition能够精细的控制多线程的休眠与唤醒。
  • 对于一个锁,我们可以为多个线程间建立不同的Condition。
public interface Condition {

 /**
  * 使当前线程进入等待状态直到被通知(signal)或中断
  * 当其他线程调用singal()或singalAll()方法时,该线程将被唤醒
  * 当其他线程调用interrupt()方法中断当前线程
  * await()相当于synchronized等待唤醒机制中的wait()方法
  */
 void await() throws InterruptedException;

 //当前线程进入等待状态,直到被唤醒,该方法不响应中断要求
 void awaitUninterruptibly();

 //调用该方法,当前线程进入等待状态,直到被唤醒或被中断或超时
 //其中nanosTimeout指的等待超时时间,单位纳秒
 long awaitNanos(long nanosTimeout) throws InterruptedException;

  //同awaitNanos,但可以指明时间单位
  boolean await(long time, TimeUnit unit) throws InterruptedException;

 //调用该方法当前线程进入等待状态,直到被唤醒、中断或到达某个时
 //间期限(deadline),如果没到指定时间就被唤醒,返回true,其他情况返回false
  boolean awaitUntil(Date deadline) throws InterruptedException;

 //唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须
 //获取与Condition相关联的锁,功能与notify()相同
  void signal();

 //唤醒所有等待在Condition上的线程,该线程从等待方法返回前必须
 //获取与Condition相关联的锁,功能与notifyAll()相同
  void signalAll();
}

3.2 ConditionObject

Condition的实现类是AQS的内部类ConditionObject

等待队列中结点node的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。

 public class ConditionObject implements Condition, java.io.Serializable {
    //等待队列第一个等待结点
    private transient Node firstWaiter;
    //等待队列最后一个等待结点
    private transient Node lastWaiter;
    //省略其他代码.......
}

ConditionObject中的等待队列模型如下

img

3.3 await():等待操作

await()方法主要做了3件事:

  • 调用addConditionWaiter()方法将当前线程封装成node结点加入等待队列
  • 调用fullyRelease(node)方法释放同步状态并唤醒后继结点的线程
  • 调用isOnSyncQueue(node)方法判断结点是否在同步队列中,如果在同步队列中没有该结点就直接挂起该线程,直到被中断唤醒。如果线程被唤醒后就调用acquireQueued(node, savedState)执行自旋操作争取锁,即当前线程结点从等待队列转移到同步队列并开始努力获取锁
public final void await() throws InterruptedException {
      //判断线程是否被中断
      if (Thread.interrupted())
          throw new InterruptedException();
      //创建新结点加入等待队列并返回
      Node node = addConditionWaiter();
      //释放当前线程锁即释放同步状态
      int savedState = fullyRelease(node);
      int interruptMode = 0;
      //结点如果不在同步队列(SyncQueue)中,即是否被唤醒
      while (!isOnSyncQueue(node)) {
          //不在则挂起线程
          LockSupport.park(this);
          //如果被中断唤醒,如果是退出循环。
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
      }
      //被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
          interruptMode = REINTERRUPT;
       // clean up if cancelled
      if (node.nextWaiter != null) 
          //清理等待队列中不为CONDITION状态的结点
          unlinkCancelledWaiters();
      if (interruptMode != 0)
          reportInterruptAfterWait(interruptMode);
  }

执行addConditionWaiter()添加到等待队列。

 private Node addConditionWaiter() {
    Node t = lastWaiter;
      // 判断是否为结束状态的结点并移除
      if (t != null && t.waitStatus != Node.CONDITION) {
          unlinkCancelledWaiters();
          t = lastWaiter;
      }
      //创建新结点状态为CONDITION
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
      //加入等待队列
      if (t == null)
          firstWaiter = node;
      else
          t.nextWaiter = node;
      lastWaiter = node;
      return node;
        }

3.4 singal() 唤醒操作

 public final void signal() {
     //判断是否持有独占锁,如果不是抛出异常
   if (!isHeldExclusively())
          throw new IllegalMonitorStateException();
      Node first = firstWaiter;
      //唤醒等待队列第一个结点的线程
      if (first != null)
          doSignal(first);
 }

signal()方法做了两件事:

一是判断当前线程是否持有独占锁,没有就抛出异常,从这点也可以看出只有独占模式先采用等待队列,而共享模式下是没有等待队列的,也就没法使用Condition。

二是唤醒等待队列的第一个结点,即执行doSignal(first)

 private void doSignal(Node first) {
     do {
             //移除条件等待队列中的第一个结点,
             //如果后继结点为null,那么说没有其他结点将尾结点也设置为null
            if ( (firstWaiter = first.nextWaiter) == null)
                 lastWaiter = null;
             first.nextWaiter = null;
          //如果被通知节点没有进入到同步队列并且条件等待队列还有不为空的节点,则继续循环通知后续结点
         } while (!transferForSignal(first) &&
                  (first = firstWaiter) != null);
        }

//transferForSignal方法
final boolean transferForSignal(Node node) {
    //尝试设置唤醒结点的waitStatus为0,即初始化状态
    //如果设置失败,说明当期结点node的waitStatus已不为
    //CONDITION状态,那么只能是结束状态了,因此返回false
    //返回doSignal()方法中继续唤醒其他结点的线程,注意这里并
    //不涉及并发问题,所以CAS操作失败只可能是预期值不为CONDITION,
    //而不是多线程设置导致预期值变化,毕竟操作该方法的线程是持有锁的。
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
         return false;

        //加入同步队列并返回前驱结点p
        Node p = enq(node);
        int ws = p.waitStatus;
        //判断前驱结点是否为结束结点(CANCELLED=1)或者在设置
        //前驱节点状态为Node.SIGNAL状态失败时,唤醒被通知节点代表的线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            //唤醒node结点的线程
            LockSupport.unpark(node.thread);
        return true;
    }

doSignal(first)方法中做了两件事:

  • 从条件等待队列移除被唤醒的节点,然后重新维护条件等待队列的firstWaiter和lastWaiter的指向
  • 从等待队列移除的结点加入同步队列(在transferForSignal()方法中完成的),如果进入到同步队列失败并且条件等待队列还有不为空的节点,则继续循环唤醒后续其他结点的线程。

4、等待唤醒机制流程图

img

参考文章

深入剖析基于并发AQS的(独占锁)重入锁(ReetrantLock)及其Condition实现原理

Java并发之AQS详解

本文链接http://www.taodudu.cc/news/show-1782153.html