基于ReentrantLock的AQS的源码分析(独占、非中断、不超时部分)


刚刚看完了并发实践这本书,算是理论具备了,看到了AQS的介绍,再看看源码,发现要想把并发理解透还是很难得,花了几个小时细分析了一下把可能出现的场景尽可能的往代码中去套,还是有些收获,但是真的很费脑,还是对多线程的理解太浅了,不多说了,直接上代码吧。

这段代码不是为跑通,只是把AQS,ReentrantLock中的部分源码合并到了一起,便于理解。

  1 package com.yb.interview.concurrent;
  2 
  3 
  4 import java.util.concurrent.locks.LockSupport;
  5 
  6 public class AQSSourceStudy {
  7 
  8     abstract static class AQS {
  9         /**
 10          * 这个状态是有子类来维护的,AQS不会用这个状态做什么
 11          */
 12         private volatile int state;
 13         /**
 14          * 队尾节点
 15          */
 16         private volatile Node tail;
 17         /**
 18          * 可能情况
 19          */
 20         private volatile Node head;
 21         /**
 22          * 独占线程
 23          */
 24         private Thread exclusiveOwnerThread;
 25 
 26 
 27         /**
 28          * 由子类实现
 29          * 判断当前线程是否需要排队
 30          */
 31         abstract boolean tryAcquire(int i);
 32 
 33         public int getState() {
 34             return state;
 35         }
 36 
 37         public void setState(int state) {
 38             this.state = state;
 39         }
 40 
 41         /**
 42          * 主方法
 43          * 可能的情况
 44          * 当前状态可以直接运行
 45          * 当前状态要放入队列里等待
 46          * 状态->子类获取
 47          * 过程,尽可能的不要去阻塞,循环多次,竞争多次
 48          * 创建节点
 49          * 节点入队,队尾
 50          * 判断新节点的前一个节点的状态,更新,前一个节点,因为在入队的过程中每个节点的状态是动的
 51          * 最后,阻塞当前线程
 52          */
 53         public final void acquire(int arg) {
 54             if (!tryAcquire(arg) &&
 55                     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 56                 // 中断状态传播
 57                 // 实时或者将来阻塞,抛中断异常
 58                 selfInterrupt();
 59         }
 60 
 61         /**
 62          * 当有新节点入队时,循环的把新节点关联到一个有效节点的后面
 63          * 然后,阻塞这个节点的线程(当前线程)
 64          */
 65         private boolean acquireQueued(Node node, int arg) {
 66             boolean failed = true;
 67             try {
 68                 boolean interrupted = false;
 69                 for (; ; ) {
 70                     final Node p = node.predecessor();
 71                     // 新节点的前个节点是头结点,如果头结点的线程释放,新节点接可以直接执行
 72                     // 所有不要着急阻塞,在判断一次,头结点释放没有,如果头结点释放,新节点不阻塞,把新节点设为头结点
 73                     // 当新节点没有排队直接运行了,之后要将节点标记为无效 cancelAcquire
 74                     if (p == head && tryAcquire(arg)) {
 75                         // 想了很久这段代码发生的情况
 76                         // 这段代码发生的情况
 77                         // 1.node在入队列时,有不同的线程在获得了锁,且队列中没有节点
 78                         // 2.当执行到这里再次tryAcquire之前,之前释放了锁
 79                         // 3.这时hasQueuedPredecessors中的判断,头结点的后一个节点,是新建的这个节点,满足s.thread==Thread.currentThread(不考虑这时有其他线程进入,或者进入无效)
 80                         // 满足了tryAcquire返回true的情况
 81                         // 将头结点改为新节点
 82                         /****
 83                          * head          tail
 84                          * |               |
 85                          * |               |
 86                          * ----------    ---------
 87                          * nullNode      newNode
 88                          * ---------     ----------
 89                          * next=newNode  prev=nullNode
 90                          * prev=null     next=null
 91                          * -------       ----------
 92                          *
 93                          * 改完后
 94                          *
 95                          *             head tail
 96                          *               |    |
 97                          *               |    |
 98                          * ---------    ---------
 99                          * nullNode      newNode
100                          * ---------     ---------
101                          * next=newNode  prev=nullNode
102                          * prev=null     next=null
103                          * ---------     ----------
104                          * */
105 
106                         setHead(node);
107                         p.next = null;
108                         failed = false;
109                         return interrupted;
110                     }
111                     // 之前的节点不是正在执行线程的节点,调整位置和状态再阻塞
112                     // 在线程解除阻塞后,使者节点失效
113                     if (shouldParkAfterFailedAcquire(p, node) &&
114                             parkAndCheckInterrupt())
115                         interrupted = true;
116                 }
117             } finally {
118                 if (failed)
119                     // 节点解除阻塞后,可能是中断或者超时
120                     // 非unlock的解锁
121                     cancelAcquire(node);
122             }
123         }
124 
125         private void cancelAcquire(Node node) {
126             if (node == null)
127                 return;
128             node.thread = null;
129             Node pred = node.prev;
130             // 那个空的节点会保证终止
131             while (pred.waitStatus > 0)
132                 // 将节点的prev关联到最近的有效节点
133                 node.prev = pred = pred.prev;
134             Node predNext = pred.next;
135             // 任何情况都执行的
136             node.waitStatus = Node.CANCELLED;
137 
138             // 如果取消的节点是队尾节点,并且将前节点设为队尾节点
139             if (node == tail && compareAndSetTail(node, pred)) {
140                 // cancel的节点和cancel之前的无效节点会移出队列
141                 compareAndSetNext(pred, predNext, null);
142             } else {
143                 // 如果不是队尾节点
144                 int ws;
145                 if (pred != head &&
146                         ((ws = pred.waitStatus) == Node.SIGNAL ||
147                                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
148                         pred.thread != null) {
149                     Node next = node.next;
150                     if (next != null && next.waitStatus <= 0)
151                         // prev->node->next  改为 prev->next
152                         compareAndSetNext(pred, predNext, next);
153                 } else {
154                     // 判断锁定的状态
155                     // 如果前节点是头结点,或者不是SIGNAL状态并且无法设置为SIGNAL状态
156                     // 总结,取消一个节点是,要保证这个节点能被释放,要不通过前节点通知,在锁锁,对应release
157                     unparkSuccessor(node);
158                 }
159 
160                 node.next = node; // help GC
161             }
162         }
163 
164         private void unparkSuccessor(Node node) {
165             // 解锁节点的线程
166             // 当node时头节点时,是当前获取线程释放的炒作
167             // 不是偷节点
168             int ws = node.waitStatus;
169             if (ws < 0)
170                 // 不用再去通知下个节点了,即将释放node了
171                 compareAndSetWaitStatus(node, ws, 0);
172             Node s = node.next;
173             if (s == null || s.waitStatus > 0) {
174                 s = null;
175                 // 从队尾向前找到最前有效的节点
176                 for (Node t = tail; t != null && t != node; t = t.prev)
177                     if (t.waitStatus <= 0)
178                         s = t;
179             }
180             if (s != null)
181                 LockSupport.unpark(s.thread);
182 
183         }
184 
185         private void compareAndSetNext(Node pred, Node predNext, Object o) {
186 
187         }
188 
189         private boolean parkAndCheckInterrupt() {
190             // 阻塞
191             LockSupport.park(this);
192             // 当前前程标记中断
193             return Thread.interrupted();
194         }
195 
196         private boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
197             int ws = pred.waitStatus;
198             // 如果前节点是需要被通知的,前节点正在被阻塞,阻塞当先线程
199             if (ws == Node.SIGNAL)
200                 return true;
201             // 如果前节点是无效的,找到最近的一个有效节点,并关联,返回,在外部调用方法中会再次调用这个方法
202             if (ws > 0) {
203                 do {
204                     node.prev = pred = pred.prev;
205                 } while (pred.waitStatus > 0);
206                 // 这是个切断调用链的过程
207                 pred.next = node;
208             } else {
209                 // 更新前节点的状态,释放时通知新节点
210                 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
211             }
212             return false;
213         }
214 
215         /**
216          * 创建节点
217          * 节点入队
218          *
219          * @return 新节点
220          */
221         private Node addWaiter(Node mode) {
222             Node node = new Node(Thread.currentThread(), mode);
223             Node pred = tail;
224             // 之前有节点在队列中
225             if (pred != null) {
226                 node.prev = pred;
227                 // 直接修改队尾,不成功要进入接下类的循环,循环中也有类型的判断,这里添加会减少一些逻辑(这样说可能是理解的有偏差)
228                 if (compareAndSetTail(pred, node)) {
229                     pred.next = node;
230                     return node;
231                 }
232             }
233             enq(node);
234             return node;
235         }
236 
237         /**
238          * 节点入队
239          * 循环,直到把新节点放到队尾,在多线程中这个过程是不确定的
240          */
241         private Node enq(Node node) {
242             for (; ; ) {
243                 Node t = tail;
244                 // Must initialize
245                 // 队尾没值,新节点是第一个入队的节点,创建一个空的节点,头尾都指向这个空节点
246                 if (t == null) {
247                     if (compareAndSetHead(new Node()))
248                         tail = head;
249                 } else {
250                     node.prev = t;
251                     if (compareAndSetTail(t, node)) {
252                         t.next = node;
253                         return t;
254                     }
255                 }
256             }
257         }
258 
259         /**
260          * 字面理解,是否有已经排队的线程
261          * 实际意义,有重入锁的情况,在这里要考虑到
262          * 没有节点在排队的情况,头结点与未节点是相同的
263          * 判断重入,当前线程是头结点的线程.
264          */
265         protected boolean hasQueuedPredecessors() {
266             Node t = tail;
267             Node h = head;
268             Node s;
269             //为什么是头结点的线程,而不是exclusiveOwnerThread,因为只有在
270             // 当前队列里没有值得时候才回设置独占线程,如果是通过节点释放的线
271             // 程还会和节点绑定,不会映射到exclusiveOwnerThread
272             return h != t &&
273                     ((s = h.next) == null || s.thread != Thread.currentThread());
274         }
275 
276         public final boolean release(int arg) {
277             if (tryRelease(arg)) {
278                 Node h = head;
279                 // 在独占锁的时候,waitStatus只能为0 -1 -2 -3
280                 // 这个里不为0代表头节点是空节点
281                 // 空节点不需要释放
282                 // 头节点是释放锁的时候,最先被考虑的
283                 if (h != null && h.waitStatus != 0)
284                     unparkSuccessor(h);
285                 return true;
286             }
287             return false;
288         }
289 
290         protected abstract boolean tryRelease(int arg);
291 
292 
293         public void setHead(Node head) {
294             this.head = head;
295         }
296 
297         private boolean compareAndSetHead(Node node) {
298             return (true || false);
299         }
300 
301         private boolean compareAndSetTail(Node pred, Node node) {
302             return (true || false);
303         }
304 
305         protected void selfInterrupt() {
306             Thread.currentThread().interrupt();
307         }
308 
309 
310         /**
311          * CAS更新队列状态,CAS的问题在其他的机会介绍
312          */
313         boolean compareAndSetState(int o, int n) {
314             return (false || true);
315         }
316 
317         /**
318          * 独占线程标记改为指定线程
319          */
320         void setExclusiveOwnerThread(Thread t) {
321             exclusiveOwnerThread = t;
322         }
323 
324         /**
325          * 返回独占线程
326          */
327         Thread getExclusiveOwnerThread() {
328             return exclusiveOwnerThread;
329         }
330 
331         // 修改节点的状态
332         private boolean compareAndSetWaitStatus(Node pred, int ws, int signal) {
333             return (true || false);
334         }
335 
336         static class Node {
337 
338             public int waitStatus;
339 
340             Node() {
341             }
342 
343             /**
344              * @param thread
345              * @param mode   SHARED or  EXCLUSIVE
346              */
347             Node(Thread thread, Node mode) {
348                 this.thread = Thread.currentThread();
349                 this.mode = mode;
350             }
351 
352             // 共享模式标记
353             static final Node SHARED = new Node();
354             // 独占模式标记
355             static final Node EXCLUSIVE = null;
356 
357             // 节点被取消,因为超时或者中断
358             static final int CANCELLED = 1;
359             // next被阻塞,当节点释放时,notice next
360             static final int SIGNAL = -1;
361             // 在条件队列中,等待某个条件被阻塞
362             static final int CONDITION = -2;
363             // 节点在共享模式下,可以传播锁
364             static final int PROPAGATE = -3;
365 
366             volatile Node next;
367             volatile Node prev;
368             Node mode;
369 
370             public Thread thread;
371 
372             public Node predecessor() {
373                 Node p = prev;
374                 if (p == null)
375                     throw new NullPointerException();
376                 else
377                     return p;
378             }
379         }
380 
381 
382     }
383 
384     /**
385      * 这是一个独占锁的实现,从ReentrantLock中粘贴出来的部分代码
386      */
387     class SYC extends AQS {
388 
389         public void lock() {
390             acquire(1);
391         }
392 
393         public void unlock() {
394             release(1);
395         }
396 
397         protected final boolean tryAcquire(int acquires) {
398             final Thread current = Thread.currentThread();
399             int c = getState();
400             // 如果当前的状态
401             if (c == 0) {
402                 if (!hasQueuedPredecessors() &&
403                         compareAndSetState(0, acquires)) {
404                     setExclusiveOwnerThread(current);
405                     return true;
406                 }
407             } else if (current == getExclusiveOwnerThread()) {
408                 int nextc = c + acquires;
409                 if (nextc < 0)
410                     throw new Error("Maximum lock count exceeded");
411                 setState(nextc);
412                 return true;
413             }
414             return false;
415         }
416 
417         protected final boolean tryRelease(int releases) {
418             int c = getState() - releases;
419             if (Thread.currentThread() != getExclusiveOwnerThread())
420                 throw new IllegalMonitorStateException();
421             boolean free = false;
422             if (c == 0) {
423                 free = true;
424                 setExclusiveOwnerThread(null);
425             }
426             setState(c);
427             return free;
428         }
429 
430 
431     }
432 }

优质内容筛选与推荐>>
1、扩展欧几里得算法(extgcd)
2、嵊州普及Day3T4
3、Nodejs介绍及npm工具使用
4、文本框自动匹配
5、磁盘管理综合测试题


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号