Java CAS(Compare And Swap)

基础知识

CAS(Compare And Swap)比较和替换是设计并发算法时用到的一种技术。简单来说,比较和替换是使用一个期望值和一个变量的当前值进行比较,如果当前变量的值与期望的值相等,就使用一个新值替换当前变量的值。
在程序和算法中一个经常出现的模式就是Check And Act模式。先检查后操作模式发生在代码中首先检查一个变量的值,然后再基于这个值做一些操作。

简单的示例

1
2
3
4
5
6
7
8
9
10
class MyLock {
private boolean locked = false;
public boolean lock() {
if(!locked) {
locked = true;
return true;
}
return false;
}
}

如果同个线程访问同一个MyLock实例,上面的lock()将不能保证正常工作。如果一个线程A检查locked的值,然后将其设置为false,与此同时,一个线程B也在检查locked的值。因此,线程A和线程B可能都看到locked的值为false,然后两者都基于这个信息做一些操作。(会有线程安全问题)
为了在一个多线程程序中良好的工作,Check Then Act操作必须是原子的。原子就是说Check操作和Act被当做一个原子代码块执行。(放在临界区中执行,不存在并发问题,只允许一个线程执行)。
把之前的lock()synchronized关键字重构成一个原子块。

1
2
3
4
5
6
7
8
9
10
class MyLock {
private boolean locked = false;
public synchronized boolean lock() {
if(!locked) {
locked = true;
return true;
}
return false;
}
}

现在lock()是同步的。所以,在某一时刻只能有一个线程在同一个MyLock实例上执行它。原子的lock()实际上是一个Compare And Swap的例子。

原子操作

原子性

原子(atom)本意是”不能被进一步分割的最小粒子”,而原子操作(Atomic Operation)意为”不可被中断的一个或一系列操作”。在多处理器上实现原子操作就变得有点复杂。
使用AtomicBoolean类实现lock()的例子。

1
2
3
4
5
6
public static class MyLock {
private AtomicBoolean locked = new AtomicBoolean(false);
public boolean lock() {
return locked.compareAndSet(false, true);
}
}

locked变量不再是boolean类型而是AtomicBoolean。这个类中有一个compareAndSet()方法,它使用一个期望值和AtomicBoolean实例的值比较,和两者相等,则使用一个新值替换原来的值。在这个例子中,它比较locked的值和false,如果locked的值为false,则把修改为true。如果值被替换了,compareAndSet()返回true,否则,返回false。

处理器实现原子性

32位IA-32处理器使用基于对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。

处理器自动保证基本内存操作的原子性

首先处理器会自动保证基本的内存操作的原子性。处理器保证从系统内存当中读取或者写入一个字节是原子的,意思是当一个处理器读取一个字节时,其他处理器不能访问这个字节的内存地址。奔腾6和最新的处理器能自动保证单处理器对同一个缓存行里进行16/32/64位的操作是原子的,但是复杂的内存操作处理器不能自动保证其原子性,比如跨总线宽度,跨多个缓存行,跨页表的访问。但是处理器提供总线锁定和缓存锁定两个机制来保证复杂内存操作的原子性。

总线保证原子性

通过总线锁保证原子性。如果多个处理器同时对共享变量进行读改写(i++就是经典的读改写操作)操作,那么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子的,操作完之后共享变量的值会和期望的不一致。
例子:如果i=1,进行两次i++操作,期望的结果是3,但是有可能结果是2。如下图。

有可能多个处理器同时从各自的缓存中读取变量i,分别进行加一操作,然后分别写入系统内存当中。那么想要保证读改写共享变量的操作是原子的,就必须保证CPU1读改写共享变量的时候,CPU2不能操作缓存了该共享变量内存地址的缓存。
处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占使用共享内存。

缓存锁保证原子性

通过缓存锁定保证原子性。在同一时刻我们只需保证对某个内存地址的操作是原子性即可,但总线锁定把CPU和内存之间通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,最近的处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。
频繁使用的内存会缓存在处理器的L1,L2和L3高速缓存里,那么原子操作就可以直接在处理器内部缓存中进行,并不需要声明总线锁,在奔腾6和最近的处理器中可以使用“缓存锁定”的方式来实现复杂的原子性。
所谓“缓存锁定”就是如果缓存在处理器缓存行中内存区域在LOCK操作期间被锁定,当它执行锁操作回写内存时,处理器不在总线上声明LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行的数据时会起缓存行无效(另一个线程不能缓存这个数据),在上图中,当CPU1修改缓存行中的i时使用缓存锁定,那么CPU2就不能同时缓存了i的缓存行。
但是有两种情况下处理器不会使用缓存锁定。

  • 当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(cache line(缓存的最小操作单位)),则处理器会调用总线锁定。
  • 有些处理器不支持缓存锁定。对于Inter486和奔腾处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。

以上两个机制可以通过Inter处理器提供了很多LOCK前缀的指令来实现。比如位测试和修改指令BTSBTRBTC,交换指令XADDCMPXCHG和其他一些操作数和逻辑指令,比如ADD(加),OR(或)等,被这些指令操作的内存区域就会加锁,导致其他处理器不能同时访问它。

Java保证原子性

在Java中可以通过锁和循环CAS的方式来实现原子操作。JVM中的CAS操作正是利用了上面提到的处理器提供的CMPXCHG指令实现的。
自旋CAS实现的基本思路就是循环进行CAS操作直到成功为止,以下代码实现了一个基于CAS线程安全的计数器方法safeCount和一个非线程安全的计数器count

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private AtomicInteger atomicI = new AtomicInteger(0);
private int i = 0;
public static void main(String[] args) {
final Counter cas = new Counter();
List<Thread> ts = new ArrayList<Thread>(600);
long start = System.currentTimeMillis();
for (int j = 0; j < 100; j++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
cas.count();
cas.safeCount();
}
}
});
ts.add(t);
}
for (Thread t : ts) {
t.start();
}
// 等待所有线程执行完成
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(cas.i);
System.out.println(cas.atomicI.get());
System.out.println(System.currentTimeMillis() - start);
}
/**
* 使用CAS实现线程安全计数器
*/
private void safeCount() {
for (;;) {
int i = atomicI.get();
boolean suc = atomicI.compareAndSet(i, ++i);
if (suc) {
break;
}
}
}
/**
* 非线程安全计数器
*/
private void count() {
i++;
}
}

在Java并发包中有一些并发框架也使用了自旋CAS的方式来实现原子操作,比如LinkedTransferQueue类的Xfer()

CAS的3个问题

CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。

  • ABA问题。
  • 循环时间长开销大。
  • 只能保证一个共享变量的原子操作。

    ABA问题

    因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果1个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A-B-A就会变成1A-2B-3A
    从Java1.5开始JDK的atomic包里提供了1个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet()作用是首先检查当前引用是否等于预期引用,并且当前标志的值是否等于预期标志的值,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
    1
    2
    3
    4
    5
    6
    public boolean compareAndSet(
    V expectedReference,//预期引用
    V newReference,//更新后的引用
    int expectedStamp, //预期标志
    int newStamp //更新后的标志
    )

循环时间长开销大

自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。

CAS的ABA问题解决方式

线程1准备用CAS将变量的值由A替换为B,在此之前,线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。但实际上这时的现场已经和最初不同了(引用地址变),尽管CAS成功,但可能存在潜藏的问题。
现有一个用单向链表实现的堆栈,栈顶为A,这时线程T1已经知道A.next为B,然后希望用CAS将栈顶替换为B。

head.compareAndSet(A,B);
在T1执行上面这条指令之前,线程T2介入,将A、B出栈,再push D、C、A,此时堆栈结构如下图,而对象B此时处于游离状态。(B已经不在栈内)

此时轮到线程T1执行CAS操作,检测发现栈顶仍为A,所以CAS成功,栈顶变为B,但实际上B.next为null,所以此时的情况变为。

其中堆栈中只有B一个元素,C和D组成的链表不再存在于堆栈中,平白无故就把C、D丢掉了(B的栈内,不存在C,D)。
例如下面的代码分别用AtomicIntegerAtomicStampedReference来对初始值为100的原子整型变量进行更新,AtomicInteger会成功执行CAS操作,而加上版本戳的AtomicStampedReference对于ABA问题会执行CAS失败。
以上就是由于ABA问题带来的隐患,各种乐观锁的实现中通常都会用版本戳version来对记录或对象标记,避免并发操作带来的问题,在Java中,AtomicStampedReference<E>也实现了这个作用,它通过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Atomically sets the value of both the reference and stamp
* to the given update values if the
* current reference is {@code ==} to the expected reference
* and the current stamp is equal to the expected stamp.
*
* @param expectedReference the expected value of the reference
* @param newReference the new value for the reference
* @param expectedStamp the expected value of the stamp
* @param newStamp the new value for the stamp
* @return true if successful
*/
public boolean compareAndSet(V expectedReference,//预期引用
V newReference,//更新引用
int expectedStamp,//以前的版本
int newStamp)//更新版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
public class ABA {
private static AtomicInteger atomicInt = new AtomicInteger(100);
@SuppressWarnings({ "unchecked", "rawtypes" })
private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference(100, 0);
public static void main(String[] args) throws InterruptedException {
Thread intT1 = new Thread(new Runnable() {
@Override
public void run() {
//1
atomicInt.compareAndSet(100, 101);
//2
atomicInt.compareAndSet(101, 100);
}
});
//3
Thread intT2 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
//4
boolean c3 = atomicInt.compareAndSet(100, 101);
System.out.println("[AtomicInteger]ABA问题:" + c3); // 交换成功true
}
});
intT1.start();
intT2.start();
intT1.join();
intT2.join();
//5
Thread refT1 = new Thread(new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
//6
boolean c = atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);

System.out.println("线程1,第1次交换后101版本:" + atomicStampedRef.getStamp());
//7
boolean c1 = atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);

System.out.println("线程1,第2次交换100版本:" + (atomicStampedRef.getStamp()));
System.out.println("线程1,第1次交换:" + c);
System.out.println("线程1,第2次交换:" + c1);
}
});
Thread refT2 = new Thread(new Runnable() {
@Override
public void run() {
int stamp = atomicStampedRef.getStamp();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
//8
@SuppressWarnings("unchecked")
boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
System.out.println("线程2,第1次交换:" + c3);// false
}
});
refT1.start();
refT2.start();
}
}

标注代码分析

  1. 以前A是100,B是101,交换A=101,B=100。
  2. A=101,101=101,则交换A=100。
  3. intT1值是100,交换过后还是100。
  4. A原来的值是100,100=100,进行交换。
  5. intT1,intT2线程执行完毕之后,再执行下面代码。版本号只在当前线程内有效。
  6. 初始化100版本=0;101版本=1。第一次交换,设置101值,版本=1。
  7. 101和版本都预期的值相同,更新值=100,版本=2。
  8. 初始化=100:0,101:1。因为100值,版本=2,当前值和预期的值,不相同。交换失败。

    CAS的Uncafe学习

    Unsafe的源码:http://www.docjar.com/html/api/sun/misc/Unsafe.java.html
    Unsafe源码中的描述如下

    A collection of methods for performing low-level, unsafe operations. Although the class and all methods are public, use of this class is limited because only trusted code can obtain instances of it.

这个类是用于执行低级别、不安全操作的方法集合。尽管这个类和所有的方法都是公开的(public),但是这个类的使用仍然受限,无法在自己的Java程序中直接使用该类,因为只有授信的代码才能获得该类的实例。
从上面的描述,可以了解到该类是用来执行较低级别的操作的,比如获取某个属性在内存中的位置,不过一般人很少会有这样的需求。在AtomicInteger的源码中相关的代码如下

1
2
// setup to use Unsafe.compareAndSwapInt for updates  
private static final Unsafe unsafe = Unsafe.getUnsafe();

上面这行代码是获取Unsafe实例的。一般情况下,是拿不到该类的实例的,jdk库里面是可以随意使用的。

1
2
3
4
5
static {  
try {
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

上面这几行代码,是用来获取AtomicInteger实例中的value属性在内存中的位置。这里使用了UnsafeobjectFieldOffset()
这个方法是一个本地方法, 该方法用来获取一个给定的静态属性的内存中位置。(内存中的偏移量,内存地址偏移量,这个值对于给定的filed是唯一的且是固定不变的)

1
public native long objectFieldOffset(Field f);

通过查看AtomicInteger源码发现,在这样几个地方使用到了这个valueOffset(偏移量)值。

1
2
3
4
5
6
7
8
9
10
11
public final void lazySet(int newValue) {  
unsafe.putOrderedInt(this, valueOffset, newValue);
}

public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

查找资料后,发现lazySet方法大多用在并发的数据结构中,用于低级别的优化。
compareAndSet这个方法多见于并发控制中,简称CAS(Compare And Swap),意思是如果valueOffset位置包含的值与expect值(预期位置值)相同,则更新valueOffset位置的值为update,并返回true,否则不更新,返回false。

例子

通过反射获取Unsafe实例,计算对象在内存中的偏移量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.lang.reflect.Field;
import sun.misc.Unsafe;
/**
* Unsafe代码测试
*
* @author Administrator
*
*/
public class UnsafeTest {
private static Unsafe unsafe;
static {
try {
// 通过反射获取rt.jar下的Unsafe类
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
// 实例化对象
unsafe = (Unsafe) field.get(null);
} catch (Exception e) {
System.out.println("Get Unsafe instance occur error" + e);
}
}
public static void main(String[] args) throws Exception {
@SuppressWarnings("rawtypes")
Class clazz = Target.class;
Field[] fields = clazz.getDeclaredFields();
System.out.println("属性的偏移量fieldName:fieldOffset");
for (Field f : fields) {
// 获取属性偏移量,可以通过这个偏移量给属性设置
System.out.println("属性名字:" + f.getName() + ",类型:" + f.getType() + ",偏移量:" + unsafe.objectFieldOffset(f));
}
Target target = new Target();
Field intFiled = clazz.getDeclaredField("intParam");
int a = (Integer) intFiled.get(target);
System.out.println("intParam初始化值是:" + a);
// intParam的字段偏移是12, 初始化值3, 我们要改为10
System.out.println("改变成偏移量10:"+unsafe.compareAndSwapInt(target, 12, 3, 10));
int b = (Integer) intFiled.get(target);
System.out.println("intParam改变之后的值是:" + b);
// 这个时候已经改为10了,所以会返回false
System.out.println("这个时候已经改为10,不是12;所以交换失败,intParam:" + unsafe.compareAndSwapInt(target, 12, 3, 10));
System.out.println("偏移量24,值是null:" + unsafe.compareAndSwapObject(target, 24, null, "5"));
//int c = (Integer) intFiled.get(target);
//System.out.println("偏移量24,值是null,改变之后的值" + c);
}
}
class Target {
int intParam = 3;
long longParam;
String strParam;
String strParam2;
}

参考

百度百科
聊聊并发(五)原子操作的实现原理

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×