概诉
上一节,我们讨论了Atomic类的实现原理,以及代码分析。这一节我来看 LongAddr 类。LongAdder 是JDK8添加到JUC中的。它是一个线程安全的、比Atomic*系工具性能更好的"计数器"。
首先我们来看一下 Doug Lea 大神对这个类的概述 LongAddr
LongAdder 中会维护一个或多个变量,这些变量共同组成一个long型的和(这个和就是 LongAddr 的值)。
当多个线程同时更新值时,为了减少竞争,可能会动态地增加这组变量的数量。“sum”方法(等效于longValue方法)返回这组变量的“和”值。
当我们的场景是为了统计技术,而不是为了更细粒度的同步控制时,并且是在多线程更新的场景时,LongAdder 类比AtomicLong更好用。
在小并发的环境下,论更新的效率,两者都差不多。但是高并发的场景下,LongAdder有着明显更高的吞吐量,但是有着更高的空间复杂度。
Atomic缺陷
AtomicLong 的 Add() 是依赖自旋不断的 CAS 去累加一个 Long 值。如果在竞争激烈的情况下,CAS 操作不断的失败,就会有大量的线程不断的自旋尝试 CAS 会造成 CPU 的极大的消耗。
LongAddr详解
LongAddr 类图

这里主要涉及两个主要的类
- Striped64
- LongAddr
如果我们要了解 LongAddr, 就要先了解 LongAddr 如何使用。
LongAddr的使用
LongAddr 在主要方法体现在一下几个地方
1 2 3 4 5 6 7
| LongAdder longAdder = new LongAdder(); longAdder.add(20L); longAdder.increment(); longAdder.decrement(); long sum = longAdder.sum(); long sumThenReset = longAdder.sumThenReset(); longAdder.reset();
|
Add 方法
对于累加器 最核心的方法就是 add 方法了,我们来看一下add 方法的实现代码
1 2 3 4 5 6 7 8 9 10
| public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
|
- 如果 cells 数组不为空,对参数进行 casBase 操作,如果 casBase 操作失败。可能是竞争激烈,进入第二步。
- 如果 cells 为空,直接进入
longAccumulate();
- m = cells 数组长度减一,如果数组长度小于 1,则进入
longAccumulate()
如果都没有满足以上条件,则对当前线程进行某种 hash 生成一个数组下标,对下标保存的值进行 cas 操作。如果操作失败,则说明竞争依然激烈,则进入 longAccumulate().
从类图上来看,LongAdder 没有任何的成员比变量,那么add 方法所用的基本上都是Striped64 类中的变量。LongAddr 继承了 Striped64 类,而 Striped64 是计数器的核心功能点.
这里我们提一个问题:
- Cell 是什么?
- CasBase 方法是什么?
- LongAccumulate 方法又在干什么?
要解答以上三个问题,我们需要了解一下 Striped64 到底干了什么事情。
Striped64
Striped64 是在java8中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数。
设计思路
Striped64: 设计思路是在竞争激烈的时候尽量分散竞争,在实现上 Striped64 维护了一个base Count和一个Cell数组。
计数线程会首先试图更新 base 变量,如果成功则退出计数,否则会认为当前竞争是很激烈的,那么就会通过 Cell 数组来分散计数
Striped64 根据线程来计算哈希,然后将不同的线程分散到不同的Cell数组的 index 上,然后这个线程的计数内容就会保存在该 Cell 的位置上面,基于这种设计,最后的总计数需要结合 base 以及散落在Cell数组中的计数内容。

我们来看一下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| abstract class Striped64 extends Number { @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } } static final int NCPU = Runtime.getRuntime().availableProcessors(); transient volatile Cell[] cells; transient volatile long base; transient volatile int cellsBusy; Striped64() {} final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }
static final int advanceProbe(int probe) { probe ^= probe << 13; probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; }
private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base")); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy")); Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); } catch (Exception e) { throw new Error(e); } } }
|
看到这,我们可以回答一下上面的几个问题了
我们在看一下 add 的代码,分析得到了一下结论
1、 Cell 是什么
Cell 就是加了 避免为共享的类 Cell 里面只有一个value值, 更新Cell 的value 值值能通过 cas 方法
2、 casBase 是更新 base value 的方法, Striped64 有 cell 数组,如果竞争激烈,也就是 casBase 方法 失败,那么意味着多个线程竞争修改 base value值
3、发生竞争,就去 cell 数组中选择一个 cell 去更新,选择计算 getProbe() & m 的值,其中m为 cell 数组个数。
4、 如果找到了 cell 则执行cell 的cas操作,如果找不到,或者cas操作也失败了则执行 longAccumulate 方法
longAccumulate
来看一下 longAccumulate 的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; } else if (!wasUncontended) wasUncontended = true; else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } }
|
从判断上来看,我们一共发现了三个主要逻辑判断
在 longAccumulate 中有几个标记位,我们也先理解一下
cellsBusy cells 的操作标记位,如果正在修改、新建、操作 cells 数组中的元素会,会将其 cas 为 1,否则为0。
wasUncontended 表示 cas 是否失败,如果失败则考虑操作升级。
collide 是否冲突,如果冲突,则考虑扩容 cells 的长度。
整个 for(;;) 死循环,都是以 cas 操作成功而告终。否则则会修改上述描述的几个标记位,重新进入循环。
-
1、如果Cell不为空 (if ((as = cells) != null && (n = as.length) > 0) )
- 果 cell[i] 某个下标为空,则 new 一个 cell,并初始化值,然后退出
- 如果 上层传过来的结果是 cas 失败,继续循环。
- 如果 cell 不为空,且 cell cas 成功,退出
- 如果 cell 的数量,大于等于 cpu 数量或者已经扩容了,继续重试。(扩容没意义)
- 设置 collide 为 true。
- 获取 cellsBusy 成功就对 cell 进行扩容,获取 cellBusy 失败则重新 hash 再重试。
-
2、 是否能 更新 cellsBusy 值 ,走到这里表示 如果Cell 为空, 这个步骤讲的是需要开始初始化 cell((cellsBusy == 0 && cells == as && casCellsBusy()))
-
3、cellsBusy 获取失败,表示有可能别的进程在扩容,则自己去更新 base 值, baseCas ,操作成功退出,不成功则重试
if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))
至此 longAccumulate 就分析完了。之所以这个方法那么复杂,我认为有两个原因
- 是因为并发环境下要考虑各种操作的原子性,所以对于锁都进行了 double check。
- 操作都是逐步升级,以最小的代价实现功能。
Sum 方法
它需要累计base和Cell数组中的Cell中的计数,base中的计数为线程竞争不是很激烈的时候累计的数,而在线程竞争比较激烈的时候就会将计数的任务分散到Cell数组中,所以在sum方法里,需要合并两处的计数值。
除了获取总计数,我们有时候想reset一下,下面的代码展示了这种操作:
1 2 3 4 5 6 7 8 9 10 11
| public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
|
注意: 大家注意到,这里并没有加锁,也就是标明,sum 调用的时候是允许,其他线程继续更新的。这种只是调用返回的一个快照值,在返回sum的过程中,有可能更新还在继续。
reset 方法
reset 方法将 base 和每个cell 当中的 value 值重置成0
但是有几个需要注意的地方
- reset 方法不会重置 cell 数组的数量。
- reset 调用的前提 是没有线程再去更新值。
1 2 3 4 5 6 7 8 9 10
| public void reset() { Cell[] as = cells; Cell a; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) a.value = 0L; } } }
|
Striped64在ConcurrentHashMap
Striped64 的计数方法在java8的 ConcurrentHashMap 中也有使用,具体的实现细节可以参考 addCount 方法,下面来看一下 ConcurrentHashMap 的size方法的实现细节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| private transient volatile int cellsBusy; private transient volatile CounterCell[] counterCells;
private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
|
参考