Java并发-AQS核心原理深度剖析 发表于 2024-12-19 更新于 2025-11-27
字数总计: 1.8k 阅读时长: 7分钟 上海
AQS 核心原理深度剖析 AQS 是 JUC 包的基石,理解 AQS 就理解了 Java 并发工具类的一半。
一、什么是 AQS? AQS(AbstractQueuedSynchronizer,抽象队列同步器)是 Doug Lea 设计的一个用于构建锁和同步器的框架 。它抽象了锁的核心要素:状态管理 + 线程排队 + 阻塞唤醒 。
1.1 基于 AQS 的实现类 同步器 使用模式 说明 ReentrantLock 独占 可重入互斥锁 ReentrantReadWriteLock 独占 + 共享 读写分离锁 Semaphore 共享 信号量,控制并发数 CountDownLatch 共享 倒计数门闩 CyclicBarrier 独占 循环屏障
1.2 核心设计思想 AQS 采用模板方法模式 ,将通用逻辑(入队、出队、阻塞、唤醒)封装在父类,将同步状态的获取与释放逻辑留给子类实现。
1 2 3 4 5 6 protected boolean tryAcquire (int arg) protected boolean tryRelease (int arg) protected int tryAcquireShared (int arg) protected boolean tryReleaseShared (int arg) protected boolean isHeldExclusively ()
二、AQS 核心组件 2.1 同步状态(state) 1 2 3 4 5 6 7 8 9 private volatile int state;protected final int getState () { return state; }protected final void setState (int newState) { state = newState; }protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
state 在不同锁中的含义:
锁类型 state 含义 ReentrantLock 锁的重入次数(0 表示未锁定) Semaphore 可用许可数 CountDownLatch 剩余计数值 ReadWriteLock 高 16 位:读锁持有数;低 16 位:写锁重入数
2.2 CLH 同步队列 AQS 维护了一个变体 CLH(Craig, Landin, and Hagersten)双向链表 作为同步队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 static final class Node { volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; }
队列结构示意图:
1 2 3 4 5 6 +------+ prev +------+ +------+ head | | <----- | | <---- | | tail | Node | next | Node | | Node | | | -----> | | ----> | | +------+ +------+ +------+ 哨兵节点 等待节点 等待节点
注意 :head 节点是一个哨兵节点(dummy node),不存储线程信息。
2.3 等待状态详解 waitStatus 值 说明 SIGNAL -1 当前节点释放锁时需要唤醒后继节点 CANCELLED 1 节点被取消(超时或中断) CONDITION -2 节点在条件队列中等待 PROPAGATE -3 共享模式下,释放需要传播 初始值 0 新节点的初始状态
三、独占模式源码分析 3.1 获取锁流程 1 2 3 4 5 6 7 8 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
addWaiter:入队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued:自旋 + 阻塞 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
3.2 释放锁流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
四、共享模式 4.1 获取共享锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
4.2 独占模式 vs 共享模式 对比维度 独占模式 共享模式 典型实现 ReentrantLock Semaphore、CountDownLatch 获取结果 boolean int(剩余资源数) 唤醒方式 仅唤醒一个后继 链式唤醒多个后继 适用场景 互斥访问 并发访问控制
五、Condition 条件队列 5.1 工作原理 每个 Condition 对象维护一个单向链表(条件队列),与同步队列配合实现等待/通知机制。
1 2 3 4 5 6 7 同步队列(双向链表) 条件队列(单向链表) +------+ +------+ +------+ +------+ | head | <-> | Node | ... | first| --> | Node | ... +------+ +------+ +------+ +------+ | ^ | await() | signal() +--------------------+
5.2 await() 流程 将当前线程封装成 Node 加入条件队列 完全释放锁(state 清零) 阻塞等待被 signal() 唤醒 被唤醒后重新竞争锁 5.3 signal() 流程 将条件队列的首节点移动到同步队列尾部 唤醒该节点的线程重新竞争锁 六、AQS 设计精髓 模板方法模式 :框架负责排队、阻塞、唤醒,子类只需实现 try 方法CLH 队列变体 :自旋 + 阻塞的混合策略,平衡 CPU 和响应volatile + CAS :无锁化实现状态管理和队列操作双向链表 :便于取消节点的处理SIGNAL 状态 :避免无效唤醒,提高效率七、要点 Q1:AQS 的核心是什么?
state 同步状态 + CLH 同步队列 + 模板方法模式
Q2:为什么用双向链表?
方便处理取消的节点,从尾部向前遍历更安全(入队时先设置 prev)
Q3:独占模式和共享模式的区别?
独占只允许一个线程持有;共享允许多个线程同时持有,且会链式传播唤醒