JAVA多线程之Atomic和LongAddr类(1)

Atomic类详解

Posted by Jason Lee on 2020-05-03

概述

Atomic与LongAddr解决什么问题

解决变量赋值的原子性问题。 底层采用的是CAS的自旋的方式来实现的。CAS会有ABA和性能的问题,Atomic 采用版本号的方式来解决ABA的问题。ABA问题是否会对业务产生影响,还需要业务方自己评估,大部分情况下比如一些非金融业务系统中,ABA不会对业务造成影响。
至于性能问题,CAS自旋的方式确实会造成一些CPU的空转,因为严格意义上来说CAS不能算是锁,所以说CAS的也有适用范围。

自旋操作适用于线程竞争力度大,线程执行时间比较短的情况。这种情况下,CAS自旋空转的等待时间表少,如线程运行时间比较长,最好的方式还是使用重量级的锁,比如说 synchronized AQS等重量级的锁。

Atomic类与LongAddr类简单对比

我们来看一个实例程序 分别来计算 synchronized,Atomic,LongAdder 运行N秒,累加数量的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 测试用例: 同时运行5秒,检查谁的次数最多
public class LongAdderDemo {
// synchronized 方式
private long count = 0;
// Atomic方式
private AtomicLong acount = new AtomicLong(0L);
// LongAdder 方式 (jdk1.8,新计数器)
private LongAdder lacount = new LongAdder();
// 运行时间,毫秒数
private int time=10000;
// 同步代码块的方式
public void testSync() throws InterruptedException {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
long starttime = System.currentTimeMillis();
while (System.currentTimeMillis() - starttime < time) { // 运行两秒
synchronized (this) {
++count;
}
}
long endtime = System.currentTimeMillis();
System.out.println("SyncThread spend:" + (endtime - starttime) + "ms" + " v:" + count);
}).start();
}
}
public void testAtomic() throws InterruptedException {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
long starttime = System.currentTimeMillis();
while (System.currentTimeMillis() - starttime < time) { // 运行两秒
acount.incrementAndGet(); // acount++;
}
long endtime = System.currentTimeMillis();
System.out.println("AtomicThread spend:" + (endtime - starttime) + "ms" + " v:" + acount.incrementAndGet());
}).start();
}
}

public void testLongAdder() throws InterruptedException {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
long starttime = System.currentTimeMillis();
while (System.currentTimeMillis() - starttime < time) { // 运行两秒
lacount.increment();
}
long endtime = System.currentTimeMillis();
System.out.println("LongAdderThread spend:" + (endtime - starttime) + "ms" + " v:" + lacount.sum());
}).start();
}
}

public static void main(String[] args) throws InterruptedException {
LongAdderDemo demo = new LongAdderDemo();
demo.testSync();
demo.testAtomic();
demo.testLongAdder();
}
}

来看耗时

1
2
3
4
5
6
7
8
9
SyncThread spend:10000ms v:102578641
SyncThread spend:10000ms v:102578641
SyncThread spend:10000ms v:102578641
AtomicThread spend:10000ms v:243832099
AtomicThread spend:10000ms v:243832100
AtomicThread spend:10000ms v:243832101
LongAdderThread spend:10000ms v:309048936
LongAdderThread spend:10000ms v:309098061
LongAdderThread spend:10000ms v:309356235

从耗时就可以看出来,在相同的时间内,LongAddr执行的效率最高,可以是sync关键字的2倍,是Atomic的1.5倍左右。
本次的数据仅在这个测试用例中生效。每个单元的执行效率还是要根据具体的业务来看。

类图

Atomic 详解

Atomic 家族概述

  • AtomicBoolean boolean 类型的atomic类
  • AtomicInteger Integer 类型 atomic 类
  • AtomicIntegerArray List元素为Integer ,且元素支持原子更新的类
  • AtomicIntegerFieldUpdater 基于反射的实用工具,可以对指定类的指定 volatile int 字段进行原子更新
  • AtomicLong Long类型的atomic类
  • AtomicLongArray List元素为Long ,且元素支持原子更新的类
  • AtomicLongFieldUpdater 基于反射的实用工具,可以对指定类的指定 volatile long 字段进行原子更新
  • AtomicMarkableReference 是带了布尔型标记位(Boolean mark)的引用型原子量,每次执行CAS操作是需要对比该标记位,如果标记满足要求,则操作成功,否则操作失败。
  • AtomicReference 提供了一个可以原子读写的对象引用变量
  • AtomicReferenceArray List元素为引用类型 ,且元素支持原子更新的类
  • AtomicReferenceFieldUpdater 基于反射的实用工具,可以对指定类的指定 volatile object 字段进行原子更新
  • AtomicStampedReference 是带了整型标记值(int stamp)的引用型原子变量,每次执行CAS操作时需要对比版本,如果版本满足要求,则操作成功,否则操作失败,用于防止CAS操作的ABA问题。

Atomic 详解

下面我们来具体分析一下 AtomicInteger 这个类
AtomicInteger 的核心字段一共有三个

  1. unsafe 对象
  2. value 值
  3. valueOffset在对象中的偏移量

方法如下图:

  • Unsafe 更新值
1
2
3
4
5
6
7
8
9
10
11
12
public class AtomicInteger extends Number implements java.io.Serializable {

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// atomic 具体的值
private volatile int value;

几个核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
// unsafe 类
public final native boolean compareAndSwapInt(Object var1, long var2, long var4, long var6);

public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var4));

return var5;
}
// ... 省略其他

从这个方法就能看出,其实Atomic其实调用的是Unsafe类中的 do while 代码块来实现CAS交换的,而Unsafe中这个方法compareAndSwapInt是一个Native的方法。

unsafe对象在类初始化的时候,已经将Atomic 中用于存放直的value 的偏移量生成 valueOffset, 用于更新操作。
再在更新的时候,调用 unsafe.compareAndSwapInt(this, valueOffset, expect, update) 方法,意思是,将本类的valueOffset的偏移量更新成update值,如果this内的valueexpect相等,就证明没有其他线程改变过这个变量,那么就更新它为update,如果这一步的CAS没有成功,那就采用自旋的方式继续进行CAS操作,取出乍一看这也是两个步骤了,其实在JNI里是借助于一个CPU指令完成的。所以还是原子操作。

我们来看一下OpenJDK中的unsafe.cpp 方法

  • CAS底层原理

CAS底层使用JNI调用C代码实现的,如果你有Hotspot源码,那么在Unsafe.cpp里可以找到它的实现:

1
2
3
4
5
6
7
//openjdk-jdk8u源码:/hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp
static JNINativeMethod methods_15[] = {
//省略一堆代码...
{CC"compareAndSwapInt", CC"("OBJ"J""I""I"")Z", FN_PTR(Unsafe_CompareAndSwapInt)},
{CC"compareAndSwapLong", CC"("OBJ"J""J""J"")Z", FN_PTR(Unsafe_CompareAndSwapLong)},
//省略一堆代码...
};

我们可以看到 compareAndSwapInt 和 compareAndSwapLong 实现是在 Unsafe_CompareAndSwapIntUnsafe_CompareAndSwapLong
下面我们深入到 Unsafe_CompareAndSwapInt 中去。

Unsafe_CompareAndSwapInt 方法

1
2
3
4
5
6
7
8
9
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
// p是取出的对象
oop p = JNIHandles::resolve(obj);
// addr是p中offset处的地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// 其中参数x是即将更新的值,参数e是原内存的值
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

代码中能看到cmpxchg有基于各个平台的实现,这里我选择Linux X86平台下的源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
int mp = os::is_MP(); // >1 = true
// __asm__说明是ASM汇编,__volatile__禁止编译器优化
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
// http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/runtime/os.hpp
static inline bool is_MP() {
assert(_processor_count > 0, "invalid processor count");
// 我们可以利用 -XX:-AssumeMP 这个参数关闭多核运行状态
return _processor_count > 1 || AssumeMP;
}
static int _processor_count; // number of processors

在正式解读这段汇编前,我们来了解下嵌入汇编的基本格式:

1
2
3
4
5
6
7
8
9
10
// 内嵌汇编模板
// 第1行:汇编语句
asm ( assembler template
// 第2行:输出参数,以','分割
: output operands (optional)
// 第3行:输入参数,以','分割
: input operands (optional)
: list of clobbered registers
(optional)
);

对照下面的源码,关键字asm__asm__用于说明随后的字符串是内联汇编代码块。volatile__volatile__ 是可选的,可以将它们添加到 asm 后面,禁止某些编译器的优化。其实,asm__asm__几乎是相同的,惟一的区别是,当预处理程序宏中使用内联汇编时,asm在编译过程中可能会引发警告,volatile也是同样的道理。因此Atomic::cmpxchg函数中汇编指令部分源码可以作如下解读:

1
2
3
4
5
6
7
8
9
10
11
12
// 宏定义表达的意思,后面再说
// 第1行:表示汇编指令 cmpxchgl
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
// 输出参数,"=a"表示把数据保存到eax寄存器中,"="表示这类操作数是只写的
// 这里的意思就是把 exchange_value的值保存到CPU的eax寄存器当中 (输出参数)
: "=a" (exchange_value)
// 输入参数:"r"表示任意寄存器,"a"表示eax寄存器
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
// 这里列出需要使用的寄存器:cc表示汇编代码将改变条件寄存器,memory表示有内存被修改
// 前面也提到过,这里仅列出指令改变的寄存器,输入输出使用的寄存器不放入其中,因此'a'不需要放到这儿
// 如果汇编指令修改了eax而输入输出没有用到eax,那么就需要在这儿加上'a'
: "cc", "memory");

这里的 cmpxchgl %1,(%3) 中的 %1 和 %3 中的数字表示参数列表:["=a" (exchange_value),“r” (exchange_value),“a” (compare_value),“r” (dest), “r” (mp)] 的下标,这里指的是"r" (exchange_value)"r" (dest)

  • compare_value 表示 期望内存中的值,即旧值
  • exchange_value 表示 更新后的值,即新值
  • dest 表示 java对象对应属性的内存地址, (dest) 获取地址dest指向的值

cmpxchg指令即交换比较指令,让目标操作数先和AL,AX,EAX寄存器中的值进行比较:

  • 如果相等,将源操作数直接装载到目标操作数,并将寄存器标志位ZF(zero flag)置为1
  • 如果不相等,将目标操作数转载到AL,AX,EAX寄存器,并将ZF清0

那上面的代码中哪一个是源操作数,哪一个是目标操作数?这又涉及到汇编语言编程风格的问题,在DOS/Windows下的汇编语言采用Intel风格,而Unix和Linux系统中,更多采用的是AT&T风格,这两者在语法格式上有很大不同,具体到操作数,有如下区别:

  • AT&T 汇编格式:指令 源操作数 目标操作数
  • Intel汇编格式:指令 目标操作数 源操作数

还有一个需要注意的区别就是:操作数的字长。

  • AT&T 汇编格式:由操作符的最后一个字母决定,后缀b、w、l分别表示操作数为字节(byte,8bit)、字(word,16bit)、长字(long,32bit)
  • Intel汇编格式:用byte ptr、word ptr等前缀来表示,比如:mov al, byte ptr val

那么表达式其实就是cmpxchgl exchange_value ,dest
EAX寄存器中存的是"a" (compare_value), 也就是cmpxchgl中l表示的寄存器的值也就是期望值,目标操作数是dest指向的值,即内存中存储的值,源操作数是exchange_value,即新值。将期望内存中的值与实际内存值比较,如果相等,则把exchange_value新值装载到dest内存中,并把新值写入EAX中;如果不相等,把dest地址的实际值放入EAX中。

而C/C++内联汇编的特性架起了这样一座桥梁:一边是C语言,一边是汇编语言,在汇编中可以接收C传过来的参数,并且汇编指令也可以把值写入C语言变量中。

LOCK_IF_MP 是内联函数,定位如下:

1
2
// Adding a lock prefix to an instruction on MP machine
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

os::is_MP判断当前系统是否为多核系统,如果是就给总线加锁,所以同一芯片上的其他处理器就暂时不能通过总线访问内存,保证了该指令在多处理器环境下的原子性。

  • cmp $0, #mp
    • cmp是比较命令,两个操作数:$0表示立即操作数0,理解为常量。
    • mp为参数,从%4处引用而来,而%4表示 "r"(mp),即把mp的值存储到任意寄存器。mp如何来?看函数代码的第一句:int mp = os::is_MP(),用于判断是否是多处理器机器, mp 如果为false(表示单核) 那就是0
  • je lf;
    • je为跳转指令,当ZF=1时,跳转到指定位置,这里表示跳转到标签1
  • lock :表示Lock前缀
  • 1
    表示标签,类似于goto语句 1: cmpxchgl %1,(%3)

这里便于大家理解,写一个伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cmp(var1, var2) {
zf ;
if (var1 == var2) {
zf = 0
} else {
zf = 1
}
return zf
}
zf = cmp(0, mp) // zf 标志位
if (zf == 1) {
goto:1
} else {
lock();
goto:1
}

翻译过来就是,如果是在多处理器上运行则先添加LOCK#前缀后,再执行cmpxchgl命令,如果是单处理器,则直接执行 cmpxchgl 命令。那为什么在多处理器机器上要先添加LOCK#前缀。

Lock前缀

Lock前缀指令导致在执行指令期间,声言处理器的LOCK# 信号。在多处理器环境中,LOCK# 信号确保在声言该信号期间,处理器可以独占任何共享内存(因为它会锁住总线,导致其他CPU不能访问总线,也就不能访问系统内存,在Intel486和Pentium处理器中都是这种策略)。

在最近的处理器里,LOCK# 信号一般不锁总线,而是锁缓存,因为锁总线开销的比较大。在P6和目前的处理器中,如果访问的内存区域已经缓存在处理器内部,则不会声言LOCK# 信号。相反,它会锁定这块区域的缓存并回写到内存,并使用缓存一致性机制来确保修改的原子性,此操作被称为“缓存锁定”,缓存一致性机制会阻止同时修改由两个以上的处理器缓存的内存区域数据。

由于我们操作的是count这个值,这个变量是用volatile 变量修饰的,所以保证了线程见的可见性。关于volatile 变量的细节,我会单独另一篇文章讨论。

下面我们来总结一下:
当执行CAS操作的时候,我们先判断机器的核数,如果是多核CPU,则调用 lock cmpxchgl 这个汇编执行,lock执行会根据cpu的型号和架构,来选择总线锁还是缓存锁,来实行原子更新。更新之后,由于更新的值为volatile 变量修饰,(JVM层面规定是内存屏障),强制其他核心从主内存当中读值。这样以来就实现了原子更新

Unsafe_CompareAndSwapLong 方法

unsafe.cpp 的源码中,Int 和 Long 的swap 略有不同。我们来看一下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
UnsafeWrapper("Unsafe_CompareAndSwapLong");
Handle p (THREAD, JNIHandles::resolve(obj));
jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
if (VM_Version::supports_cx8())
return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
else {
jboolean success = false;
// 不支持,则直接锁住对象
ObjectLocker ol(p, THREAD);
if (*addr == e) { *addr = x; success = true; }
return success;
}
UNSAFE_END

可以看到,如果不支持cx8,那么就需要用到ObjectLocker锁,那么什么 VM_Version::supports_cx8() 的底层实现又是什么呢?篇幅应县个,这里不做详细的分析,源代码在:在vm_version.hpp

底层则是判断CUP 是否支持 CMPXCHG8B 指令,简单来说就是CUP是否支持对寄存器的8字节的比较。在早期的CUP一般不支持,现代的CPU一般都在会支持。

AtomicReference 类

这个类主要负责对于引用类型的赋值,他的核心代码很简单就是讲原来的compareAndSwapInt 换成了 compareAndSwapObject 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class AtomicReference<V> implements java.io.Serializable {
// ... 省略
private volatile V value;
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}
// unsafe类
public final Object getAndSetObject(Object var1, long var2, Object var4) {
Object var5;
do {
var5 = this.getObjectVolatile(var1, var2);
} while(!this.compareAndSwapObject(var1, var2, var5, var4));

return var5;
}

按照既定的惯例,我们来找到compareAndSwapObject 的native方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jobject e_h, jobject x_h))
UnsafeWrapper("Unsafe_CompareAndSwapObject");
oop x = JNIHandles::resolve(x_h); // 新值
oop e = JNIHandles::resolve(e_h); // 预期值
oop p = JNIHandles::resolve(obj);
// 在内存中的具体位置
HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);
// 调用了另一个方法
oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true);
// 如果返回的res等于e,则判定满足compare条件(说明res应该为内存中的当前值),但实际上会有ABA的问题
jboolean success = (res == e);
if (success)
// success为true时,说明此时已经交换成功(调用的是最底层的cmpxchg指令)
// 每次Reference类型数据写操作时,都会产生一个Write Barrier暂时中断操作,配合垃圾收集器
update_barrier_set((void*)addr, x);
return success;
UNSAFE_END

compareAndSwapObject 方法其实比较的就是两个 Java Object 的地址,如果相等则将新的地址(Java Object)赋给该字段。

我们看一下atomic_compare_exchange_oop 在oop.inline.hpp的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
inline oop oopDesc::atomic_compare_exchange_oop(oop exchange_value,
volatile HeapWord *dest,
oop compare_value,
bool prebarrier) {
if (UseCompressedOops) {
// 如果使用了压缩普通对象指针(CompressedOops),有一个重新编解码的过程
if (prebarrier) {
update_barrier_set_pre((narrowOop*)dest, exchange_value);
}
// encode exchange and compare value from oop to T
narrowOop val = encode_heap_oop(exchange_value); // 新值
narrowOop cmp = encode_heap_oop(compare_value); // 预期值
// 这里调用了 cmpxchg 方法
narrowOop old = (narrowOop) Atomic::cmpxchg(val, (narrowOop*)dest, cmp);
return decode_heap_oop(old);
} else {
if (prebarrier) {
update_barrier_set_pre((oop*)dest, exchange_value);
}
// 这里表示 执行cmpxchg的汇编指令采用 支持
return (oop)Atomic::cmpxchg_ptr(exchange_value, (oop*)dest, compare_value);
}

// cmpxchg_ptr 的汇编指令集 和 cmpxchg 不同的是 采用了 cmpxchgq 指令
__asm__ __volatile__ (LOCK_IF_MP(%4) "cmpxchgq %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}

AtomicStampedReference 与 AtomicMarkableReference

上面的普通的CAS操作都会有ABA的问题,其根本原因在于在CAS操作过程中,操作者不知道被操作的只中途是否发生过改变,那么解决方案就很自然,当改变一个只,就给这个值附上一个版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//构造方法, 传入引用和戳
public AtomicStampedReference(V initialRef, int initialStamp)
//返回引用
public V getReference()
//返回版本戳
public int getStamp()
//如果当前引用 等于 预期值并且 当前版本戳等于预期版本戳, 将更新新的引用和新的版本戳到内存
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp)
//如果当前引用 等于 预期引用, 将更新新的版本戳到内存
public boolean attemptStamp(V expectedReference, int newStamp)
//设置当前引用的新引用和版本戳
public void set(V newReference, int newStamp)

我们来看用法:

1
2
AtomicStampedReference<String> reference = new AtomicStampedReference<String>(str1,1);
reference.compareAndSet(str1,str2,reference.getStamp(),reference.getStamp()+1);

当我们比较时候,不仅要传递expectednew 值,还要去传递版本戳,我们来看一下他的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;

public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

AtomicStampedReference 可以知道,引用变量中途被更改了几次。有时候,我们并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了 AtomicMarkableReference。

1
2
3
public AtomicMarkableReference(V initialRef, boolean initialMark) {
pair = Pair.of(initialRef, initialMark);
}

AtomicReference,AtomicStampedReference与 AtomicMarkableReference 的区别

区别
AtomicReference 通过volatile和Unsafe提供的CAS函数实现原子操作。
自旋+CAS的无锁操作保证共享变量的线程安全.value是volatile类型,这保证了:当某线程修改value的值时,其他线程看到的value的值都是最新的值,即修改之后的volatile的值通过CAS设置value
这保证了:某线程池通过CAS函数(如compareAndSet函数)设置value时,它的操作时原子性的,即线程在操作vu略时不会被中断。
但是CAS操作可能存在ABA问题
AtomicStampedReference 构造方法中initialStamp(时间戳)用来唯一标识引用变量,在构造器内部,实例化了一个Pair对象,Pair对象记录了对象引用和时间戳信息,采用int作为时间戳,实际使用的时候,要保证时间戳唯一(一般做成自增的),如果时间戳如果重复,还会出现ABA的问题。
AtomicMarkableReference AtomicStampedReference 时间戳 只有true 和false

FieldUpdater

FieldUpdater 包含了
AtomicIntegerFieldUpdater, AtomicLongFieldUpdaterAtomicReferenceFieldUpdater 这3个修改类的成员的原子类型的原理和用法相似

以 AtomicIntegerFieldUpdater 为例来分析一下特性及其源码

特性

AtomicIntegerFieldUpdater 可以对指定"类的 volatile int 类型的成员"进行原子更新。它是基于反射原理实现的。
AtomicIntegerFieldUpdater 以下几个限制

  • (1)字段必须是 volatile 类型
  • (2)字段的描述类型(修饰符public/protected/default/private)是与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
  • (3)只能是实例变量,不能是类变量,也就是说不能加static关键字。
  • (4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。
  • (5)对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。

实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class AtomicIntegerFieldUpdaterDemo {
public static void main(String[] args) {
AtomicIntegerFieldUpdater aifu = AtomicIntegerFieldUpdater.newUpdater(Student.class, "number");

Student student = new Student();

boolean compareAndSet = aifu.compareAndSet(student, 1, 2);
aifu.weakCompareAndSet(student, 1, 2);
aifu.set(student, 1);
aifu.lazySet(student, 1);
aifu.get(student);
aifu.getAndSet(student, 1);
aifu.getAndIncrement(student);
aifu.getAndDecrement(student);
aifu.getAndAdd(student, 1);
aifu.incrementAndGet(student);
aifu.decrementAndGet(student);
aifu.addAndGet(student, 1);
aifu.getAndUpdate(student, operand -> 0);
aifu.updateAndGet(student, operand -> 0);
aifu.getAndAccumulate(student, 1, (left, right) -> 0);
aifu.accumulateAndGet(student, 1, (left, right) -> 0);
}
// AtomicIntegerFieldUpdaterDemo 必须能够访问 number

public static class Student {
volatile int number;
private String name;
private int age;
}

public static class Teacher {
private int number;
private String name;
private int age;
}
}

如果 Student number 是private 修饰的,则会抛出 $Student with modifiers "private volatile" 异常

如果我们将Student 放到其他包下,则会抛出 Atomic can not access a member of class {your bean} with modifiers "volatile

源码分析

AtomicIntegerFieldUpdater 和 AtomicLongFieldUpdater 实现类有些不同
FieldUpdater 都继承与 AtomicXXFieldUpdater类,这些类提供了CAS的基本方法。

这是AtomicLongFieldUpdater

1
2
3
4
5
6
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}

我们可以看到 AtomicIntegerFieldUpdater 底层用的是 AtomicIntegerFieldUpdaterImpl 实现
而 AtomicLongFieldUpdater 底层用的是 CASUpdater 或者 LockedUpdater

1
2
3
4
5
6
7
8
9

public static <U> AtomicLongFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
Class<?> caller = Reflection.getCallerClass();
if (AtomicLong.VM_SUPPORTS_LONG_CAS)
return new CASUpdater<U>(tclass, fieldName, caller);
else
return new LockedUpdater<U>(tclass, fieldName, caller);
}

CASUpdaterLockedUpdater 区别就在于 是否支持 cmpxchg 源操作数为64个bit的CPU 原语言。我们从这个指令就可以看出 除了long和double类型, Java基本数据类型都是的简单读写都是原子的.

LockedUpdaterCASUpdater 的区别在于核心的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// CASUpdater 
public final boolean compareAndSet(T obj, long expect, long update) {
accessCheck(obj);
return U.compareAndSwapLong(obj, offset, expect, update);
}
// LockedUpdater
public final boolean compareAndSet(T obj, long expect, long update) {
accessCheck(obj);
synchronized (this) {
long v = U.getLong(obj, offset);
if (v != expect)
return false;
U.putLong(obj, offset, update);
return true;
}
}

我们看到了 CASUpdater 是直接调用 compareAndSwapLong 方法 而 LockedUpdater 则是通过加锁的方式实现 CAS

参考



支付宝打赏 微信打赏

赞赏一下