概诉
上一节,我们讨论了Atomic类的实现原理,以及代码分析。这一节我来看 LongAddr
类。LongAdder
是JDK8添加到JUC中的。它是一个线程安全的、比Atomic*系工具性能更好的"计数器"。
首先我们来看一下 Doug Lea 大神对这个类的概述 LongAddr
LongAdder
中会维护一个或多个变量,这些变量共同组成一个long型的和(这个和就是 LongAddr
的值)。
当多个线程同时更新值时,为了减少竞争,可能会动态地增加这组变量的数量。“sum”方法(等效于longValue方法)返回这组变量的“和”值。
当我们的场景是为了统计技术,而不是为了更细粒度的同步控制时,并且是在多线程更新的场景时,LongAdder
类比AtomicLong
更好用。
在小并发的环境下,论更新的效率,两者都差不多。但是高并发的场景下,LongAdder有着明显更高的吞吐量,但是有着更高的空间复杂度。
Atomic缺陷
AtomicLong 的 Add() 是依赖自旋不断的 CAS 去累加一个 Long 值。如果在竞争激烈的情况下,CAS 操作不断的失败,就会有大量的线程不断的自旋尝试 CAS 会造成 CPU 的极大的消耗。
LongAddr详解
LongAddr 类图
这里主要涉及两个主要的类
Striped64
LongAddr
如果我们要了解 LongAddr
, 就要先了解 LongAddr
如何使用。
LongAddr的使用
LongAddr 在主要方法体现在一下几个地方
1 2 3 4 5 6 7 LongAdder longAdder = new LongAdder(); longAdder.add(20L ); longAdder.increment(); longAdder.decrement(); long sum = longAdder.sum(); long sumThenReset = longAdder.sumThenReset(); longAdder.reset();
Add 方法
对于累加器 最核心的方法就是 add 方法了,我们来看一下add 方法的实现代码
1 2 3 4 5 6 7 8 9 10 public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); } }
如果 cells 数组不为空,对参数进行 casBase 操作,如果 casBase 操作失败。可能是竞争激烈,进入第二步。
如果 cells 为空,直接进入 longAccumulate()
;
m = cells 数组长度减一,如果数组长度小于 1,则进入 longAccumulate()
如果都没有满足以上条件,则对当前线程进行某种 hash 生成一个数组下标,对下标保存的值进行 cas 操作。如果操作失败,则说明竞争依然激烈,则进入 longAccumulate()
.
从类图上来看,LongAdder
没有任何的成员比变量,那么add 方法所用的基本上都是Striped64
类中的变量。LongAddr
继承了 Striped64
类,而 Striped64
是计数器的核心功能点.
这里我们提一个问题:
Cell 是什么?
CasBase 方法是什么?
LongAccumulate 方法又在干什么?
要解答以上三个问题,我们需要了解一下 Striped64 到底干了什么事情。
Striped64
Striped64
是在java8中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数。
设计思路
Striped64
: 设计思路是在竞争激烈的时候尽量分散竞争,在实现上 Striped64
维护了一个base Count
和一个Cell
数组。
计数线程会首先试图更新 base
变量,如果成功则退出计数,否则会认为当前竞争是很激烈的,那么就会通过 Cell
数组来分散计数
Striped64
根据线程来计算哈希,然后将不同的线程分散到不同的Cell数组的 index
上,然后这个线程的计数内容就会保存在该 Cell
的位置上面,基于这种设计,最后的总计数需要结合 base
以及散落在Cell数组中的计数内容。
我们来看一下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 abstract class Striped64 extends Number { @sun .misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , valueOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class ; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value" )); } catch (Exception e) { throw new Error(e); } } } static final int NCPU = Runtime.getRuntime().availableProcessors(); transient volatile Cell[] cells; transient volatile long base; transient volatile int cellsBusy; Striped64() {} final boolean casBase (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , BASE, cmp, val); } final boolean casCellsBusy () { return UNSAFE.compareAndSwapInt(this , CELLSBUSY, 0 , 1 ); } static final int getProbe () { return UNSAFE.getInt(Thread.currentThread(), PROBE); } static final int advanceProbe (int probe) { probe ^= probe << 13 ; probe ^= probe >>> 17 ; probe ^= probe << 5 ; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; } private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class ; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base" )); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy" )); Class<?> tk = Thread.class ; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe" )); } catch (Exception e) { throw new Error(e); } } }
看到这,我们可以回答一下上面的几个问题了
我们在看一下 add
的代码,分析得到了一下结论
1、 Cell
是什么
Cell
就是加了 避免为共享的类 Cell
里面只有一个value值, 更新Cell 的value 值值能通过 cas 方法
2、 casBase
是更新 base value
的方法, Striped64
有 cell
数组,如果竞争激烈,也就是 casBase
方法 失败,那么意味着多个线程竞争修改 base
value值
3、发生竞争,就去 cell
数组中选择一个 cell
去更新,选择计算 getProbe() & m
的值,其中m为 cell
数组个数。
4、 如果找到了 cell 则执行cell 的cas操作,如果找不到,或者cas操作也失败了则执行 longAccumulate
方法
longAccumulate
来看一下 longAccumulate
的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell[2 ]; rs[h & 1 ] = new Cell(x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } }
从判断上来看,我们一共发现了三个主要逻辑判断
在 longAccumulate 中有几个标记位,我们也先理解一下
cellsBusy
cells
的操作标记位,如果正在修改、新建、操作 cells
数组中的元素会,会将其 cas 为 1,否则为0。
wasUncontended
表示 cas
是否失败,如果失败则考虑操作升级。
collide
是否冲突,如果冲突,则考虑扩容 cells
的长度。
整个 for(;;) 死循环,都是以 cas 操作成功而告终。否则则会修改上述描述的几个标记位,重新进入循环。
1、如果Cell不为空 (if ((as = cells) != null && (n = as.length) > 0)
)
果 cell[i] 某个下标为空,则 new 一个 cell,并初始化值,然后退出
如果 上层传过来的结果是 cas 失败,继续循环。
如果 cell 不为空,且 cell cas 成功,退出
如果 cell 的数量,大于等于 cpu 数量或者已经扩容了,继续重试。(扩容没意义)
设置 collide 为 true。
获取 cellsBusy 成功就对 cell 进行扩容,获取 cellBusy 失败则重新 hash 再重试。
2、 是否能 更新 cellsBusy
值 ,走到这里表示 如果Cell 为空, 这个步骤讲的是需要开始初始化 cell((cellsBusy == 0 && cells == as && casCellsBusy())
)
3、cellsBusy
获取失败,表示有可能别的进程在扩容,则自己去更新 base 值, baseCas
,操作成功退出,不成功则重试
if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))
至此 longAccumulate 就分析完了。之所以这个方法那么复杂,我认为有两个原因
是因为并发环境下要考虑各种操作的原子性,所以对于锁都进行了 double check。
操作都是逐步升级,以最小的代价实现功能。
Sum 方法
它需要累计base和Cell数组中的Cell中的计数,base中的计数为线程竞争不是很激烈的时候累计的数,而在线程竞争比较激烈的时候就会将计数的任务分散到Cell数组中,所以在sum方法里,需要合并两处的计数值。
除了获取总计数,我们有时候想reset一下,下面的代码展示了这种操作:
1 2 3 4 5 6 7 8 9 10 11 public long sum () { Cell[] as = cells; Cell a; long sum = base; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
注意: 大家注意到,这里并没有加锁,也就是标明,sum 调用的时候是允许,其他线程继续更新的。这种只是调用返回的一个快照值,在返回sum的过程中,有可能更新还在继续 。
reset 方法
reset 方法将 base 和每个cell 当中的 value 值重置成0
但是有几个需要注意的地方
reset 方法不会重置 cell 数组的数量。
reset 调用的前提 是没有线程再去更新值。
1 2 3 4 5 6 7 8 9 10 public void reset () { Cell[] as = cells; Cell a; base = 0L ; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) a.value = 0L ; } } }
Striped64在ConcurrentHashMap
Striped64
的计数方法在java8的 ConcurrentHashMap
中也有使用,具体的实现细节可以参考 addCount
方法,下面来看一下 ConcurrentHashMap
的size方法的实现细节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private transient volatile int cellsBusy;private transient volatile CounterCell[] counterCells;private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } }
参考