0%

LongAdder

LongAdder

在使用AtomicLong时,如果是在高并发场景下去同时竞争修改同一个原子变量,由于内部使用的是CAS,只会有一个线程修改成功,这就造成了大量的线程竞争失败后,通过无限循环来不断的进行CAS操作,白白的浪费了CPU资源,在JDK8中为了解决这种问题,提供了LongAdder来进行原子性递增递减

当多线程争夺同一个Cell原子变量时如果失败,并不是在当前Cell变量上一直自旋CAS重试,而是会尝试在其他Cell变量上进行CAS尝试,增加了CAS成功的可能性

最终,获取LongAdder的当前值时,会把所有Cell变量的value值累加后再加上base值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// 真实值为base值与cells数组中所有Cell元素中value值的累加
// 在LongAdder中维护了一个Cell数组,分担对单个变量进行竞争的开销
transient volatile Cell[] cells;
// 基值变量
transient volatile long base;
// 用来实现自旋锁,状态值只有0和1,当创建Cell元素,扩容Cell数组和初始化Cell数组时,使用CAS操作该变量来保证同时只有一个线程可以进行其中一个Cell的操作
transient volatile int cellsBusy;

@sun.misc.Contended static final class Cell {
// 每个Cell中有一个初始值为0的long类型变量value
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}


// 进行加一减一操作
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 如果cells为null的话在base的基础上进行累加
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// 选择其中一个Cell进行操作
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}


final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 初始化当前线程的变量h,用于计算当前线程应该被分到cells数组中的哪个元素
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// cells如果为null,说明还没有初始化
// 不为null,说明已经初始化过了
if ((as = cells) != null && (n = as.length) > 0) {
// 数组cells已经扩容,但是有的Cell还没有填充,此时为null,需要新增一个Cell元素到cells数组中
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
// 添加新元素需要将cellsBusy置为1
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 当前Cell存在,则执行CAS操作
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 当前Cell数组元素个数大于CPU个数
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 是否有冲突(多个线程访问了cells数组中的同一个Cell)
else if (!collide)
collide = true;
// 只有当前元素个数没有达到CPU个数,并且有冲突,会进行扩容
// 进行扩容时casCellsBusy方法来将cellsBusy设置为1
else if (cellsBusy == 0 && casCellsBusy()) {
try {

if (cells == as) { // Expand table unless stale
// 扩容为原来的2倍
Cell[] rs = new Cell[n << 1];
// 复制原数组中的Cell元素到新的cells数组中
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 重新计算h,找到空闲的Cell
h = advanceProbe(h);
}
// 初始化Cell数组,cellsBusy为0表示当前cells数组没有在被初始化或者扩容,也没有在新建Cell元素;为1表示当前cells数组在被初始化或者扩容,或者在新建Cell元素
// casCellsBusy来切换cellsBusy的状态
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
// 初始化的个数为2个
Cell[] rs = new Cell[2];
// 使用h&1来计算当前线程应该访问哪个Cell
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

欢迎关注我的其它发布渠道