目录

CAS - Compare and Swap

Compare and Swap 即比较并替换,实现并发算法时常用到的一种技术,Doug lea大神在java同步器中大量使用了CAS技术,鬼斧神工的实现了多线程执行的安全性。

CAS 不通过JVM,直接利用java本地方 JNI(Java Native Interface为JAVA本地调用),直接调用CPU 的cmpxchg(是汇编指令)指令。

利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法,实现原子操作。其它原子操作都是利用类似的特性完成的

整个java.util.concurrent都是建立在CAS之上的,因此对比synchronized阻塞算法,J.U.C在性能上有了很大的提升。

CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。

CAS的思想很简单:三个参数,一个当前内存值V、旧的预期值A、即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。

JDK自带的CAS方案,在CAS中,比较和替换是一组原子操作,不会被外部打断,且在性能上更占有优势。

在J.U.C包中利用CAS实现类有很多,可以说是支撑起整个concurrency包的实现,在Lock实现中会有CAS改变state变量,在atomic包中的实现类也几乎都是用CAS实现

使用锁时,线程获取锁是一种悲观锁策略,即假设每一次执行临界区代码都会产生冲突,所以当前线程获取到锁的时候同时也会阻塞其他线程获取该锁。而CAS操作(又称为无锁操作)是一种乐观锁策略,它假设所有线程访问共享资源的时候不会出现冲突,既然不会出现冲突自然而然就不会阻塞其他线程的操作。因此,线程就不会出现阻塞停顿的状态。那么,如果出现冲突了怎么办?无锁操作是使用**CAS(compare and swap)**又叫做比较交换来鉴别线程是否出现冲突,出现冲突就重试当前操作直到没有冲突为止。

元老级的synchronized(未优化前)最主要的问题是:在存在线程竞争的情况下会出现线程阻塞和唤醒锁带来的性能问题,因为这是一种互斥同步(阻塞同步)。而CAS并不是武断的间线程挂起,当CAS操作失败后会进行一定的尝试,而非进行耗时的挂起唤醒的操作,因此也叫做非阻塞同步。

线程安全

众所周知,Java是多线程的。但是,Java对多线程的支持其实是一把双刃剑。一旦涉及到多个线程操作共享资源的情况时,处理不好就可能产生线程安全问题。线程安全性可能是非常复杂的,在没有充足的同步的情况下,多个线程中的操作执行顺序是不可预测的。 Java里面进行多线程通信的主要方式就是共享内存的方式,共享内存主要的关注点有两个:可见性和有序性。加上复合操作的原子性,我们可以认为Java的线程安全性问题主要关注点有3个:可见性、有序性和原子性。 Java内存模型(JMM)解决了可见性和有序性的问题,而锁解决了原子性的问题。这里不再详细介绍JMM及锁的其他相关知识。但是我们要讨论一个问题,那就是锁到底是不是有利无弊的?

锁存在的问题

Java在JDK1.5之前都是靠synchronized关键字保证同步的,这种通过使用一致的锁定协议来协调对共享状态的访问,可以确保无论哪个线程持有共享变量的锁,都采用独占的方式来访问这些变量。独占锁其实就是一种悲观锁,所以可以说synchronized是悲观锁。 悲观锁机制存在以下问题: 在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。 一个线程持有锁会导致其它所有需要此锁的线程挂起。 如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。 而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。 与锁相比,volatile变量是一个更轻量级的同步机制,因为在使用这些变量时不会发生上下文切换和线程调度等操作,但是volatile不能解决原子性问题,因此当一个变量依赖旧值时就不能使用volatile变量。因此对于同步最终还是要回到锁机制上来。

乐观锁

乐观锁( Optimistic Locking)其实是一种思想。相对悲观锁而言,乐观锁假设认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做。 上面提到的乐观锁的概念中其实已经阐述了他的具体实现细节:主要就是两个步骤:冲突检测和数据更新。其实现方式有一种比较典型的就是Compare and Swap(CAS)。

CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。 CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”这其实和乐观锁的冲突检查+数据更新的原理是一样的。 这里再强调一下,乐观锁是一种思想。CAS是这种思想的一种实现方式。

Java 对 CAS 的支持

在JDK1.5 中新增java.util.concurrent(J.U.C)就是建立在CAS之上的。相对于对于synchronized这种阻塞算法,CAS是非阻塞算法的一种常见实现。所以J.U.C在性能上有了很大的提升。

AtomicInteger

以AtomicInteger的实现为例,分析一下CAS是如何实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public class AtomicInteger extends Number implements java.io.Serializable {
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
 
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
 
    private volatile int value;
    public final int get() {return value;}    
    
}
  1. Unsafe,是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。
  2. 变量valueOffset,表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。
  3. 变量value用volatile修饰,保证了多线程之间的内存可见性。

AtomicInteger如何实现并发下的累加操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public final int getAndAdd(int delta) {    
    return unsafe.getAndAddInt(this, valueOffset, delta);
}
 
//unsafe.getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    return var5;
}

假设线程A和线程B同时执行getAndAdd操作(分别跑在不同CPU上):

  1. AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据Java内存模型,线程A和线程B各自持有一份value的副本,值为3。
  2. 线程A通过getIntVolatile(var1, var2)拿到value值3,这时线程A被挂起。
  3. 线程B也通过getIntVolatile(var1, var2)方法获取到value值3,运气好,线程B没有被挂起,并执行compareAndSwapInt方法比较内存值也为3,成功修改内存值为2。
  4. 这时线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的值(3)和内存的值(2)不一致,说明该值已经被其它线程提前修改过了,那只能重新来一遍了。
  5. 重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt进行比较替换,直到成功。

整个过程中,利用CAS保证了对于value的修改的并发安全,继续深入看看Unsafe类中的compareAndSwapInt方法实现。

1
public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);

Unsafe类中的compareAndSwapInt,是一个本地方法,该方法的实现位于unsafe.cpp中

1
2
3
4
5
6
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
  1. 先想办法拿到变量value在内存中的地址。
  2. 通过Atomic::cmpxchg实现比较替换,其中参数x是即将更新的值,参数e是原内存的值。

CAS缺点

1. ABA问题 因为CAS会检查旧值有没有变化,这里存在这样一个有意思的问题。比如一个旧值A变为了成B,然后再变成A,刚好在做CAS时检查发现旧值并没有变化依然为A,但是实际上的确发生了变化。解决方案可以沿袭数据库中常用的乐观锁方式,添加一个版本号可以解决。原来的变化路径A->B->A就变成了1A->2B->3C。java这么优秀的语言,当然在java 1.5后的atomic包中提供了 AtomicStampedReference 来解决ABA问题,解决思路就是这样的。

2. 自旋时间过长

使用CAS时非阻塞同步,也就是说不会将线程挂起,会自旋(无非就是一个死循环)进行下一次尝试,如果这里自旋时间过长对性能是很大的消耗。如果JVM能支持处理器提供的pause指令,那么在效率上会有一定的提升。

3. 只能保证一个共享变量的原子操作

当对一个共享变量执行操作时CAS能保证其原子性,如果对多个共享变量进行操作,CAS就不能保证其原子性。有一个解决方案是利用对象整合多个共享变量,即一个类中的成员变量就是这几个共享变量。然后将这个对象做CAS操作就可以保证其原子性。atomic 中提供了 AtomicReference 来保证引用对象之间的原子性。

CAS存在一个很明显的问题,即ABA问题。

问题:如果变量V初次读取的时候是A,并且在准备赋值的时候检查到它仍然是A,那能说明它的值没有被其他线程修改过了吗?

如果在这段期间曾经被改成B,然后又改回A,那CAS操作就会误认为它从来没有被修改过。针对这种情况,java并发包中提供了一个带有标记的原子引用类AtomicStampedReference,它可以通过控制变量值的版本来保证CAS的正确性。

原理:乐观锁+version

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class ABAQuestion
{
    private static AtomicInteger atomicInt = new AtomicInteger(100);
    private static AtomicStampedReference<Integer> atomicStampedRef = new AtomicStampedReference<Integer>(100,0);

    public static void main(String[] args) throws InterruptedException
    {
        Thread thread1 = new Thread(new Runnable(){
            @Override
            public void run()
            {
                atomicInt.compareAndSet(100, 101);
                atomicInt.compareAndSet(101, 100);
            }
        });

        Thread thread2 = new Thread(new Runnable(){
            @Override
            public void run()
            {
                try
                {
                    TimeUnit.SECONDS.sleep(1);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                boolean c3 = atomicInt.compareAndSet(100, 101);
                System.out.println(c3);
            }
        });

        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();

        Thread thread3 = new Thread(new Runnable(){
            @Override
            public void run()
            {
                try
                {
                    TimeUnit.SECONDS.sleep(1);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp()+1);
                atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp()+1);
            }
        });

        Thread thread4 = new Thread(new Runnable(){
            @Override
            public void run()
            {
                int stamp = atomicStampedRef.getStamp();
                try
                {
                    TimeUnit.SECONDS.sleep(2);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp+1);
                System.out.println(c3);
            }
        });
        thread3.start();
        thread4.start();
    }
}

输出结果:true false