JAVA多线程之Atomic和LongAddr类(2)

LongAddr 类详解

Posted by Jason Lee on 2020-05-15

概诉

上一节,我们讨论了Atomic类的实现原理,以及代码分析。这一节我来看 LongAddr 类。LongAdder 是JDK8添加到JUC中的。它是一个线程安全的、比Atomic*系工具性能更好的"计数器"。

首先我们来看一下 Doug Lea 大神对这个类的概述 LongAddr

LongAdder 中会维护一个或多个变量,这些变量共同组成一个long型的和(这个和就是 LongAddr 的值)。
当多个线程同时更新值时,为了减少竞争,可能会动态地增加这组变量的数量。“sum”方法(等效于longValue方法)返回这组变量的“和”值。

当我们的场景是为了统计技术,而不是为了更细粒度的同步控制时,并且是在多线程更新的场景时,LongAdder 类比AtomicLong更好用。

在小并发的环境下,论更新的效率,两者都差不多。但是高并发的场景下,LongAdder有着明显更高的吞吐量,但是有着更高的空间复杂度。

Atomic缺陷

AtomicLong 的 Add() 是依赖自旋不断的 CAS 去累加一个 Long 值。如果在竞争激烈的情况下,CAS 操作不断的失败,就会有大量的线程不断的自旋尝试 CAS 会造成 CPU 的极大的消耗。

LongAddr详解

LongAddr 类图

这里主要涉及两个主要的类

  • Striped64
  • LongAddr
    如果我们要了解 LongAddr, 就要先了解 LongAddr 如何使用。

LongAddr的使用

LongAddr 在主要方法体现在一下几个地方

1
2
3
4
5
6
7
LongAdder longAdder = new LongAdder();
longAdder.add(20L); // 增加20
longAdder.increment(); // +1
longAdder.decrement(); // -1
long sum = longAdder.sum(); // 获取 longaddr 值
long sumThenReset = longAdder.sumThenReset(); // 将long addr 置零并且获取值
longAdder.reset(); //置零

Add 方法

对于累加器 最核心的方法就是 add 方法了,我们来看一下add 方法的实现代码

1
2
3
4
5
6
7
8
9
10
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
  1. 如果 cells 数组不为空,对参数进行 casBase 操作,如果 casBase 操作失败。可能是竞争激烈,进入第二步。
  2. 如果 cells 为空,直接进入 longAccumulate();
  3. m = cells 数组长度减一,如果数组长度小于 1,则进入 longAccumulate()

如果都没有满足以上条件,则对当前线程进行某种 hash 生成一个数组下标,对下标保存的值进行 cas 操作。如果操作失败,则说明竞争依然激烈,则进入 longAccumulate().

从类图上来看,LongAdder 没有任何的成员比变量,那么add 方法所用的基本上都是Striped64 类中的变量。LongAddr 继承了 Striped64 类,而 Striped64 是计数器的核心功能点.

这里我们提一个问题:

  1. Cell 是什么?
  2. CasBase 方法是什么?
  3. LongAccumulate 方法又在干什么?

要解答以上三个问题,我们需要了解一下 Striped64 到底干了什么事情。

Striped64

Striped64 是在java8中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数。

设计思路

Striped64: 设计思路是在竞争激烈的时候尽量分散竞争,在实现上 Striped64 维护了一个base Count和一个Cell数组。

计数线程会首先试图更新 base 变量,如果成功则退出计数,否则会认为当前竞争是很激烈的,那么就会通过 Cell 数组来分散计数

Striped64 根据线程来计算哈希,然后将不同的线程分散到不同的Cell数组的 index 上,然后这个线程的计数内容就会保存在该 Cell 的位置上面,基于这种设计,最后的总计数需要结合 base 以及散落在Cell数组中的计数内容。

我们来看一下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
abstract class Striped64 extends Number {
// Cell组
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// 获取Unsafe方法,用于
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** CPU数量 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**Cell数组 2的平方. */
transient volatile Cell[] cells;
/** Base value 基础值 */
transient volatile long base;
/** cells 正在扩容的标志位*/
transient volatile int cellsBusy;
Striped64() {}
// Cas基础值
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
/** casCellsBusy cas替换,表示正在扩容*/
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
/** 获取线程的 Probe 可以理解为随机数种子 */
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
// ... ... ... ...
}

看到这,我们可以回答一下上面的几个问题了
我们在看一下 add 的代码,分析得到了一下结论

1、 Cell 是什么
Cell 就是加了 避免为共享的类 Cell 里面只有一个value值, 更新Cell 的value 值值能通过 cas 方法

2、 casBase 是更新 base value 的方法, Striped64cell 数组,如果竞争激烈,也就是 casBase 方法 失败,那么意味着多个线程竞争修改 base value值

3、发生竞争,就去 cell 数组中选择一个 cell 去更新,选择计算 getProbe() & m的值,其中m为 cell 数组个数。

4、 如果找到了 cell 则执行cell 的cas操作,如果找不到,或者cas操作也失败了则执行 longAccumulate 方法

longAccumulate

来看一下 longAccumulate 的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // ThreadLocalRandom 初始化主要是为了得到 threadLocalRandomProbe 值
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // cells 数组不为空
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // 判断是否有其他线程在扩容
Cell r = new Cell(x); // 创建一个 Cell
if (cellsBusy == 0 && casCellsBusy()) { // casCellsBusy
boolean created = false;
try { // 在cas 下做二次校验
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true; // 赋值成功
}
} finally {
cellsBusy = 0; // cells 变成0 以防出异常退出
}
if (created)
break; // 如果创建了这退出
continue; // 没有创建成功则继续循环,说明在复制的时候出错了
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // 如果找到cell 直接用cas操作
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try { // 这里仅仅是扩容 没有负责
if (cells == as) { // Cell数组扩容
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// 一下代表cell 为空 cells == as 表示前面已经经历过 第一层的判断了
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // 初始化
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// case操作
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

从判断上来看,我们一共发现了三个主要逻辑判断
在 longAccumulate 中有几个标记位,我们也先理解一下

  1. cellsBusy cells 的操作标记位,如果正在修改、新建、操作 cells 数组中的元素会,会将其 cas 为 1,否则为0。
  2. wasUncontended 表示 cas 是否失败,如果失败则考虑操作升级。
  3. collide 是否冲突,如果冲突,则考虑扩容 cells 的长度。

整个 for(;;) 死循环,都是以 cas 操作成功而告终。否则则会修改上述描述的几个标记位,重新进入循环。

  • 1、如果Cell不为空 (if ((as = cells) != null && (n = as.length) > 0))

    • 果 cell[i] 某个下标为空,则 new 一个 cell,并初始化值,然后退出
    • 如果 上层传过来的结果是 cas 失败,继续循环。
    • 如果 cell 不为空,且 cell cas 成功,退出
    • 如果 cell 的数量,大于等于 cpu 数量或者已经扩容了,继续重试。(扩容没意义)
    • 设置 collide 为 true。
    • 获取 cellsBusy 成功就对 cell 进行扩容,获取 cellBusy 失败则重新 hash 再重试。
  • 2、 是否能 更新 cellsBusy 值 ,走到这里表示 如果Cell 为空, 这个步骤讲的是需要开始初始化 cell((cellsBusy == 0 && cells == as && casCellsBusy()))

  • 3、cellsBusy 获取失败,表示有可能别的进程在扩容,则自己去更新 base 值, baseCas ,操作成功退出,不成功则重试
    if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))

至此 longAccumulate 就分析完了。之所以这个方法那么复杂,我认为有两个原因

  1. 是因为并发环境下要考虑各种操作的原子性,所以对于锁都进行了 double check。
  2. 操作都是逐步升级,以最小的代价实现功能。

Sum 方法

它需要累计base和Cell数组中的Cell中的计数,base中的计数为线程竞争不是很激烈的时候累计的数,而在线程竞争比较激烈的时候就会将计数的任务分散到Cell数组中,所以在sum方法里,需要合并两处的计数值。

除了获取总计数,我们有时候想reset一下,下面的代码展示了这种操作:

1
2
3
4
5
6
7
8
9
10
11
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

注意: 大家注意到,这里并没有加锁,也就是标明,sum 调用的时候是允许,其他线程继续更新的。这种只是调用返回的一个快照值,在返回sum的过程中,有可能更新还在继续

reset 方法

reset 方法将 base 和每个cell 当中的 value 值重置成0
但是有几个需要注意的地方

  1. reset 方法不会重置 cell 数组的数量。
  2. reset 调用的前提 是没有线程再去更新值。
1
2
3
4
5
6
7
8
9
10
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}

Striped64在ConcurrentHashMap

Striped64 的计数方法在java8的 ConcurrentHashMap 中也有使用,具体的实现细节可以参考 addCount 方法,下面来看一下 ConcurrentHashMap 的size方法的实现细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private transient volatile int cellsBusy;
private transient volatile CounterCell[] counterCells;

private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

参考



支付宝打赏 微信打赏

赞赏一下