在Java中,对于锁的默认实现是synchronized实现的加锁和解锁,而在JDK1.5之后Java中实现了ReentrantLock,ReentrantLock是当内置加锁机制不适用的时候,作为一种可以选择的高级功能。
构建ReentrantLock的时候,采用了AQS(AbstractQueuedSynchronized),也就是队列同步器作为基础框架。
AbstractQueuedSynchronized
AbstractQueuedSynchronized是一个用来构建缩合同步组件的基础框架,它是一个高度抽象的类,除去ReentrantLock外,ReentrantReadWriteLock和CountDownLatch等一些不同类型的同步组件也是基于AQS实现的。AQS中使用了一个int类型的成员变量来表示同步状态,并且通过内置的FIFO队列来实现资源获取线程的排队动作。
1 2 3 4 5 6 7 8 9 10 11
| state变量: private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); }
|
AQS定义了一个volatile的int类型的表示同步状态变量,提供了同步状态的访问和修改方法:
getState():获取当前的同步状态
setState(int newState):设置当前的同步状态
compareAndSetState(int expect,int update):使用CAS设置当前状态,这种方法能够保证状态的原子性
AQS还提供了可以重写的许多方法以及模板方法:
可重写方法:
方法名称 |
描述 |
protected boolean tryAcquire(int arg) |
独占式获取同步状态,实现方法查询当前状态并判断同步状态是否符合预期,使用CAS进行同步状态的设置 |
protected boolean tryRelease(int arg) |
独占式释放同步状态,等待获取状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) |
共享式获得同步状态 |
protected boolean tryReleaseShared(int arg) |
共享式释放同步状态 |
protected boolean isHeldExclusively() |
判断同步器是否在独占模式下被线程占用 |
模板方法:
方法名称 |
描述 |
public final void acquire(int arg) |
抢占式获取同步状态 |
public final void acquireInterruptibly(int arg) |
响应中断的抢占式获取同步状态 |
public final boolean tryAcquireNanos(int arg, long nanosTimeout) |
在acquireInterruptibly的基础上加入超时时间,在时间内获取到锁才成功 |
public final boolean release(int arg) |
独占式释放同步状态,当释放成功之后,会唤醒在同步队列中的第一个节点中的线程 |
public final boolean releaseShared(int arg) |
共享式的释放同步状态 |
public final void acquireShared(int arg) |
共享式的获取同步状态,在同一时刻内能够有多个线程获取到同步状态 |
public final void acquireSharedInterruptibly(int arg) |
响应中断的共享获取同步状态 |
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) |
在响应中断的共享获取同步状态的基础上加入超时时间 |
public final Collection getSharedQueuedThreads() |
获取在同步队列上的线程的集合 |
同步队列
AQS通过在内部维护一个同步队列来实现多个线程对同一个资源访问情况下的线程状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| AQS中同步队列的实现: static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {} Node(Node nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); } Node(int waitStatus) { WAITSTATUS.set(this, waitStatus); THREAD.set(this, Thread.currentThread()); } final boolean compareAndSetWaitStatus(int expect, int update) { return WAITSTATUS.compareAndSet(this, expect, update); } final boolean compareAndSetNext(Node expect, Node update) { return NEXT.compareAndSet(this, expect, update); } final void setPrevRelaxed(Node p) { PREV.set(this, p); } private static final VarHandle NEXT; private static final VarHandle PREV; private static final VarHandle THREAD; private static final VarHandle WAITSTATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); NEXT = l.findVarHandle(Node.class, "next", Node.class); PREV = l.findVarHandle(Node.class, "prev", Node.class); THREAD = l.findVarHandle(Node.class, "thread", Thread.class); WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class); } catch (ReflectiveOperationException e) { throw new Error(e); } } }
|
在同步队列里面,waitStatus为线程的等待状态,包括了这样几种情况:
CANCELLED,值为1,线程从同步队列中取消等待
SIGNAL,值为-1,后继节点的线程处于等待状态
CONDITION,值为-2,节点等待在Condition上,等待其他线程对Condition的调用,进而进入到同步队列
PROPAGATE,值为-3,笑一次共享式同步状态会无条件地被传播下去
INITIAL,值为0,表示初始状态
在同步队列中,包含了两个节点类型的引用:头节点和尾节点。添加尾节点通过CAS添加,只有当前节点符合预期才会添加进去,并且尾节点引用指向队列的尾节点。
同步队列遵循FIFO,也就是先到达同步队列的线程将优先得到同步状态,当头节点释放同步状态之后,将会唤醒在同步队列中等待的下一个节点的线程,并且当后继节点在获取同步状态成功后会把自己设置为头节点。
再同步队列中的线程会采用自旋的方式来判断自己是否能够获取到同步状态,判断的方式是只有判断前驱节点是头节点的情况下才开始自旋获取同步状态,否则节点中的线程将会进入等待状态。并且通过只有前驱节点是头节点,才尝试获取同步状态,只有成功以后才会把自己设置为头节点,这样提供了并发条件下出现竞争的安全。
共享式获取同步状态或者释放同步状态的情况即读写锁的情况:只有在存在读锁或者不上锁的情况下,读锁的申请才会成功,而只有不上锁的时候写锁才能够获取成功。并且在Java中,ReentrantReadWriteLock的实现就是通过AQS来实现的。
在独占式超时获取同步状态的时候,需要判断当前自旋时间是否超过超时等待时间,如果已经超过,那么获取同步状态就不会成功,否则在未超时的时候同获取独占式同步状态。
Codition
除去维护一个同步队列,AQS还维护一个Codition(等待队列)来存储等待状态的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
| public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject() { } private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }
Node node = new Node(Node.CONDITION);
if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } private static final int REINTERRUPT = 1; private static final int THROW_IE = -1; private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); final long deadline = System.nanoTime() + nanosTimeout; long initialNanos = nanosTimeout; Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); long remaining = deadline - System.nanoTime(); return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; } public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() >= abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); final long deadline = System.nanoTime() + nanosTimeout; Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }
|
等待队列也是一个FIFO队列,在每个节点上包括了一个线程的引用,这个引用是在Codition对象上等待的线程,当有一个线程调用了Codition对象的await()方法,那么这个线程将会释放锁,构造更节点并且进入等待队列。
跟同步队列不同的是,在等待队列中,每个节点只是存储下一个节点的引用,而不存储前驱节点的引用。添加一个尾节点需要将当前尾节点的引用指向添加节点,并且在Condition里将尾节点修改。更新过程是由锁保证的,并不需要CAS进行更新。
当调用Codition的signal()方法时,会在唤醒首节点前,将节点移到同步队列。而调用signalAll()方法会将整个等待队列中的节点移到同步队列中。
Codition和Object的监视器模型
在Object中,存在默认的监视器,即wait()、notify()和notifyALl()方法构成。在监视器模型中。一个对象拥有一个同步队列和等待队列,而在AQS中,可以拥有一个同步队列和多个等待队列,通过这种方式将等待的线程可以按照条件进行区分,即不满足不同条件的线程存储在不同的等待队列中。并且Codition可以精确的控制多个线程的休眠和唤醒。