老王:小陈啊,上一章节我们对LongAdder的底层源码、实现机制进行了深入了剖析,包括AtomicInteger在高并发竞争下导致的大量自旋的问题,以及LongAdder是怎么使用分段锁优化这个问题的。
老王:我们还以银行办事大厅的常规窗口、备用窗口为例子说明了什么是分段锁机制,这些东西啊,都要记得啊;有上一章的基础、懂得什么是分段锁,这一章我才能带你来深入的分析一下Striped64的分段锁机制的实现。
上章内容回顾
小陈:老王,放心吧,上一章的内容我还记着呢,我现在就可以给你总结一下上一章的内容
首先是分段锁的内容:
(1)首先银行办事大厅在访客比较少的时候,只开放常规窗口,就足以处理用户请求了
(2)但是由于一个窗口同一时间只能处理一个请求,所以在高并发的时候,容易造成大量的用户在等待常规窗口
(3)于是大流量的时候就开启备用窗口,比如有4个备用窗口,然后在备用窗口已经开启的时候,就会使用用户id % 4 的算法将用户派到不同的备用窗口,这样减少对锁的竞争,大大提升了并发性能。
老王:没错,对分段锁的内容记得很牢固啊,看来我果然没有看错你啊。
老王:那你继续来说说LongAdder底层最核心的add源码流程是咋样的?
小陈:嘿嘿,这也难不倒我,我先给你上代码:
public void add(long x) {
// as就类似我们上述说的备用窗口列表
Cell[] as;
// 这里的b就是常规窗口的值,v是你被分派到的那个窗口的value值(可能是常规窗口也可能是备用窗口)
long b, v;
// m是备用窗口的长度,
// 上面我们讲过getProbe()方法就是获取用户id的方法
// getProbe() & m 其实就是 用户id % 窗口总数,窗口分派的算法
int m;
// a就是你被派遣到的那个窗口
Cell a;
// 1.首先如果cells==null,说明备用窗口没有开放,
// 全都执行casBase争抢常规窗口,cas成功则争抢成功,然后办完事就退出了
// 如果争抢失败 casBase == false,则会进入if代码内重试
// 2. 如果cells != null,说明备用窗口开发了,不用去争抢常规窗口了,
// 直接就进入争抢备用窗口
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// 3. as == null || as.length - 1 < 0 说明备用窗口列表尚未开放
if (as == null || (m = as.length - 1) < 0 ||
// 4. as[getProbe() & m] 是你被派遣到的那个备用窗口
// (a = as[getProbe() & m]) == null 你被派遣到的那个备用窗口还没人工作
(a = as[getProbe() & m]) == null ||
// 5. a.cas() 就是你被分派到窗口a后,去尝试争抢窗口a的权限
// 如果 uncontented就是你争抢的结果,如果!uncnotented == true,说明你争抢失败了
!(uncontended = a.cas(v = a.value, v + x)))
// 相当于上面操作都失败之后的一种兜底方案
longAccumulate(x, null, uncontended);
}
}
小陈:然后给你重新画一下上面的这个代码的流程图:
老王:嗯嗯,可以的,看来你记得相当牢固了,好了,那我也就不多废话了,直接进入本章主题吧。
老王:上一章我们讲LongAdder的时候,讲了怎么使用分段锁、怎么减少并发的竞争,以及它在竞争窗口的时候具体的流程我们上面已经画图讲解了 ,最后剩下的只是没有讲解的longAccumulate的源码了。
老王:那这一章我们就来分析分析,从longAccmulate方法内部源码角度了解的分段锁实现机制到底是怎么样子的?
Striped64底层实现
longAccumulate底层源码:
我们首先来看一下longAccumulate底层使用分段锁实现加减操作的源码,源码稍微有点复杂,但是我们慢慢来分析,别害怕哈。
我们先从整体来看下这个方法内部做了几件事:
longAccumulate里面做了几件事情,先从大角度分析一下:
(1)进入方法,黄色部分,首先就是获取一下用户id,如果为0,先强制初始化一下用户id
(2)然后就是进入for死循环里面,只有用户的请求成功处理了,才会退出循环
(3)然后就是下面比较重要的三个大的分支条件
进入备用窗口处理
// cells是备用窗口列表,如果备用窗口不等于null,
// 并且是length>0 说明备用窗口开启了
// 则用户进入这个条件,去备用窗口列表里面处理
if ((as = cells) != null && (n = as.length) > 0)
初始化备用窗口
如果上面的分支不满足,说明 cells == null 或者 cells.length 0) {
// 如果自己被派到的那个备用窗口为null,说明窗口还么人工作,则叫一个工作人员过来负责
if ((a = as[(n - 1) & h]) == null) {
// 如果没人在初始化,下面则创建一个窗口(叫一个工作人员过来处理)
if (cellsBusy == 0) {
// 创建一个新的窗口,同时提交自己的请求x
Cell r = new Cell(x);
// 如果没人在初始化
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
// 再次检查备用窗口列表初始化好了
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 设置自己创建的新窗口
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 如果到这里已经知道cas操作失败了,则重新设置一下失败标识,进入下一循环重试
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 根据 用户id % 备用窗口总数 算法,自己应该争抢哪个备用窗口
// 如果争抢成功,则操作结束了,如果失败进入下一循环重试
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false;
else if (!collide)
collide = true;
// 走到这里,说明上面的操作都失败了,备用窗口竞争还是很激烈
// 所以需要扩容了,多增加一些备用窗口,缓解竞争激烈的情况
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 这里就是具体的扩容操作了
if (cells == as) { // Expand table unless stale
// 新的备用窗口是原来的2倍
Cell[] rs = new Cell[n