CAS

ddatsh

dev #concurrent

有这种情况,看了很多文章,知道 基于硬件的 CAS命令,set值一定成功,但到底为什么成功,或者说还是不理解CAS语义和基于语义后,算法该是怎样的

CAS的语义 我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少

锁的代价

锁是做并发最简单的方式,代价也是最高的

内核态的锁,需要上下文切换

上下文切换时,cpu之前缓存的指令和数据都失效,性能损失很大

操作系统对多线程的锁进行判断,相对会滞后一些才会做出裁决,比较慢

用户态的锁虽然避免了这些问题,但是其实它们只是在没有真实的竞争时才有效

JDK1.5靠synchronized保证同步,通过使用一致的锁定协议来协调对共享状态的访问,确保无论哪个线程持有共享变量的锁,都采用独占的方式来访问这些变量

独占锁其实就是一种悲观锁,所以可以说synchronized`是悲观锁

悲观锁机制存在以下问题:

多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题

一个线程持有锁会导致其它所有需要此锁的线程挂起

如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险

与锁相比,volatile变量是一个更轻量级的同步机制,因为在使用这些变量时不会发生上下文切换和线程调度等操作,但是volatile不能解决原子性问题,因此当一个变量依赖旧值时就不能使用volatile变量。因此对于同步最终还是要回到锁机制上来。

CAS

乐观锁是一种思想,CAS就是这种思想的一种实现方式/一种有名的的无锁非阻塞实现方法

也是种乐观锁技术,多个线程用CAS同时更新同一个变量时,只有一个成功更新,其它线程都失败,但失败的线程不会被挂起,而是被告知这次竞争中失败,并可以再次尝试

基于共享数据不会被修改的假设,采用 commit-retry 的模式

同步冲突出现的机会很少时,这种假设能带来较大的性能提升

CAS 开销

CAS,比较并交换 基于 CPU指令,只有一步原子操作,在CPU内部就搞定,非常快,且避免请求os kernel裁定锁的问题,

但有cache miss的情况

CPU0 对变量执行 CAS 操作,而变量的cache line 在 CPU7 中

ABA 问题

除了 cache line 问题还有 ABA问题

CAS算法实现一个重要前提需要取出内存中某时刻的数据,而在下时刻比较并替换,那么在这个时间差类会导致数据的变化

一个线程1从内存位置V中取出A,同时另一个线程2也从内存中取出A,并且2进行了一些操作变成了B,然后2又将V位置的数据变成A,这时候线程1进行CAS操作发现内存中仍然是A,然后1操作成功

尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的

部分乐观锁的实现是通过版本号(version)的方式来解决ABA问题

乐观锁每次在执行数据的修改操作时,都会带上一个版本号,一旦版本号和数据的版本号一致就可以执行修改操作并对版本号执行+1操作,否则就执行失败

因为每次操作的版本号都会随之增加,所以不会出现ABA问题,因为版本号只会增加不会减少

java.util.concurrent 使用中也要注意ABA问题

如果链表的头在变化了两次后恢复了原值,但是不代表链表就没有变化

线程1准备用CAS将变量的值由A替换为B,在此之前,线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功

但实际上这时的现场已经和最初不同了,尽管CAS成功,但可能存在潜藏的问题,例如

一个单向链表实现的堆栈,栈顶为A,线程T1已经知道A.next为B,然后希望用CAS将栈顶替换为B:

head.compareAndSet(A,B);

T1执行这条指令之前,线程T2介入,将A、B出栈,再pushD、C、A,此时堆栈结构如下图,而对象B此时处于游离状态:

此时轮到线程T1执行CAS操作,检测发现栈顶仍为A,所以CAS成功,栈顶变为B,但实际上B.next为null,所以此时的情况变为:

其中堆栈中只有B一个元素,C和D组成的链表不再存在于堆栈中,平白无故就把C、D丢掉了

以上就是由于ABA问题带来的隐患,各种乐观锁的实现中通常都会用版本戳version来对记录或对象标记,避免并发操作带来的问题

Java中,AtomicStampedReference也实现了这个作用,通过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题

下面的代码分别用AtomicInteger和AtomicStampedReference来对初始值为100的原子整型变量进行更新,AtomicInteger会成功执行CAS操作,而加上版本戳的AtomicStampedReference对于ABA问题会执行CAS失败:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.niwodai.inf.notification.server.notify.sms;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

public class ABA {

	private static AtomicInteger atomicInt = new AtomicInteger(100);

	private static AtomicStampedReference<Integer> atomicStampedRef =
			new AtomicStampedReference<>(100, 0);

	public static void main(String[] args) throws InterruptedException {

		Thread intT1 = new Thread(new Runnable() {

			@Override
			public void run() {

				atomicInt.compareAndSet(100, 101);
				atomicInt.compareAndSet(101, 100);
			}
		});

		Thread intT2 = new Thread(new Runnable() {

			@Override
			public void run() {

				try {
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				boolean c3 = atomicInt.compareAndSet(100, 101);
				System.out.println(c3);        //true
			}
		});

		intT1.start();
		intT2.start();
		intT1.join();
		intT2.join();

		Thread refT1 = new Thread(new Runnable() {

			@Override
			public void run() {

				try {
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				atomicStampedRef.compareAndSet(100, 101,
						atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
				atomicStampedRef.compareAndSet(101, 100,
						atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
			}
		});

		Thread refT2 = new Thread(new Runnable() {

			@Override
			public void run() {

				int stamp = atomicStampedRef.getStamp();
				System.out.println("before sleep : stamp = " + stamp);    // stamp = 0
				try {
					TimeUnit.SECONDS.sleep(2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("after sleep : stamp = " + atomicStampedRef.getStamp());//stamp = 2,refT1 change twice
				boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
				System.out.println(c3);        //false
			}
		});

		refT1.start();
		refT2.start();
	}

}

jvm 对CAS的支持

JDK 1.5引入底层支持,int、long和对象的引用等类型上公开了CAS的操作

且把编译为底层硬件提供的最有效的方法,CPU不支持CAS,JVM使用自旋锁

如AtomicLong.incrementAndGet用了乐观锁技术,调用了sun.misc.Unsafe里的 CAS算法,用CPU指令来实现无锁自增。所以,AtomicLong.incrementAndGet的自增比用synchronized的锁效率倍增

高并发环境下优化锁或无锁(lock-free)的设计思路

服务端编程的3大性能杀手:

  1. 大量线程导致的线程切换开销
  2. 非必要的内存拷贝

高并发环境下要实现高吞吐量和线程安全,两个思路:

  1. 优化的锁实现
  2. lock-free的无锁结构

优化锁实现的例子

ConcurrentHashMap,用桶粒度的锁和锁分离机制,避免put和get中对整个map的锁定

Lock-free无锁的例子

CAS 的利用和disruptor无锁消息队列数据结构

除了尽量减少资源争用以外,还可以借鉴nginx/node.js等单线程大循环的机制,用单线程或CPU数相同的线程开辟大的队列,并发的时候任务压入队列,线程轮询然后一个个顺序执行

由于每个都采用异步I/O,没有阻塞线程。这个大队列可以使用

任务处理可以全部放在内存(多级缓存、读写分离、ConcurrentHashMap、甚至分布式缓存Redis)中进行增删改查

最后用维护定时把缓存数据同步到DB中

当然,这只是中小型系统的思路,如果是大型分布式系统会非常复杂,需要分而治理,用SOA的思路