前言
什么是并发
CPU,内存和IO设备三者之间的速度是不匹配的,更好的利用CPU资源。
为什么要并发
CPU,内存和IO设备三者之间的速度是不匹配的,为了解决这个问题,做了下面的优化:
- CPU增加了缓存,以均衡与内存的速度差异
- 操作系统增加了进程、线程,以分时复用CPU,进而均衡CPU与I/O设备的速度差异;
- 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。
- 重排序包括编译器重排序和处理器重排序。对于后者,需要插入内存屏障才可以进行解决。
最终期望可以提高程序的性能,包括响应速度和吞吐量。
可以使用Lmbench测量上下文切换的时长,使用vmstat测量上下文切换的次数,我们可以关注CS字段。
并发三大问题
优化之后,虽然性能变好,但是却引来了三大问题,让我们程序的正确性出现了问题。
可见性
可见性一般是缓存导致的。指的是一个线程对共享变量的修改,另外一个线程能够立刻看到。
如果只有一个CPU,则不存在可见性问题。
如果是多个CPU,可见性问题并不好解决。
原子性
原子性是线程切换导致的。把一个或者多个操作在CPU执行的过程中不被中断的特性称为原子性。
有序性
有序性是编译优化带来的。
如双重校验锁获取单例:
public class Singleton {
static Singleton instance;
static Singleton getInstance(){
if (instance == null) {
synchronized(Singleton.class) {
if (instance == null)
instance = new Singleton();
}
}
return instance;
}
}
我们以为的顺序:
- 分配一块内存M;
- 在内存M上初始化Singleton对象;
- 然后M的地址赋值给instance变量。
实际上的顺序:
- 分配一块内存M;
- 将M的地址赋值给instance变量;
- 最后在内存M上初始化Singleton对象。
这样就可能引发NPE问题。
Java内存模型JMM:解决可见性和有序性
前面提到:导致可见性的原因是缓存,导致有序性的原因是编译优化,那解决可见性、有序性最直接的办法就是禁用缓存和编译优化,但是这样问题虽然解决了,我们程序的性能可就堪忧了。因此,我们只能按需禁用缓存和编译优化。
因此,Java为我们提供了volatile、synchronized 和 final 三个关键字,以及六项 Happens-Before 规则。
synchronized
volatile
语义是:禁用CPU缓存,必须从内从中读取或者写入。
final
表示一个变量是常量。
Happens-Before 规则
含义:前面一个操作的结果对后续操作是可见的。
Happens-Before 约束了编译器的优化行为,虽允许编译器优化,但是要求编译器优化后一定遵守 Happens-Before 规则。
程序的顺序性规则
在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作。
如下面的,第5行 Happens-Before 第6行。
class VolatileExample {
int x = 0;
volatile boolean v = false;
public void writer() {
x = 42;
v = true;
}
public void reader() {
if (v == true) {
// 这里x会是多少呢?
}
}
}
volatile变量规则
对一个volatile变量的写操作, Happens-Before 于后续对这个volatile变量的读操作。
传递性
这条规则是指如果A Happens-Before B,且B Happens-Before C,那么A Happens-Before C。
因此:
- “x=42” Happens-Before 写变量 “v=true” ,这是规则1的内容;
- 写变量“v=true” Happens-Before 读变量 “v=true”,这是规则2的内容 。
再根据这个传递性规则,我们得到结果:“x=42” Happens-Before 读变量“v=true”。
管程中锁的规则
对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。
线程 start() 规则
指主线程A启动子线程B后,子线程B能够看到主线程在启动子线程B前的操作。
Thread B = new Thread(()->{
// 主线程调用B.start()之前
// 所有对共享变量的修改,此处皆可见
// 此例中,var==77
});
// 此处对共享变量var修改
var = 77;
// 主线程启动子线程
B.start();
线程 join() 规则
指主线程A等待子线程B完成,当子线程B完成后,主线程能够看到子线程的操作。
如果在线程A中,调用线程B的 join() 并成功返回,那么线程B中的任意操作Happens-Before 于该 join() 操作的返回。
Thread B = new Thread(()->{
// 此处对共享变量var修改
var = 66;
});
// 例如此处对共享变量修改,
// 则这个修改结果对线程B可见
// 主线程启动子线程
B.start();
B.join()
// 子线程所有对共享变量的修改
// 在主线程调用B.join()之后皆可见
// 此例中,var==66
线程中断规则interrupt
对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生。
对象终结规则finalize
一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。
重排序问题
数据依赖性
这种关系,如果发生了重排序,程序的执行结果就会被改变。
as-if-serial语义
该语义的意思是:不管如何进行重排序,单线程情况下程序的执行结果不能被改变。因此存在数据依赖的不能重排序。
double pi = 3.14; // A
double r = 1.0; // B
doublearea=pi*r*r; //C
可见,A和B没有依赖关系,但是C和A、B都有关系,所以C必须最后,A和B可以重新排序。
锁:解决原子性问题synchronized
原子性问题的源头是线程切换。在单核时代,可以通过禁用CPU中断来避免线程切换,在多核时代,即使禁止中断也无法解决原子性问题。因此,我们需要保证同一时刻只有一个线程操作共享变量。
我们需要注意锁的范围:
- 当修饰非静态方法的时候,锁定的是当前实例对象 this。
- 当修饰静态方法的时候,锁定的是当前类的 Class 对象;
- 同步方法块,锁的是Synchonized括号里配置的对象;
上面的图是一把锁保护一个资源。
事实上,一把锁可以保护多个资源。
- 保护没有关联关系的多个资源;
- 保护有关联关系的多个资源;
我们还需要了解锁的实现原理,这个需要了解JVM的相关知识,如Java对象头。
此外,锁的升级也需要我们了解。
更多详情参考:不可不说的Java“锁”事
死锁及解决方案
粗粒度的锁浪费性能,可以使用细粒度的锁提高并行度,进行性能优化。但是可能导致死锁:
一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。
死锁产生条件
下面4个条件同时发生才会死锁。
- 互斥,共享资源 X 和 Y 只能被一个线程占用;(不能避免)
- 占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
- 不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
- 循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。
死锁预防
互斥这一条不可避免。其他三条任意破坏一个即可。
破坏占有且等待
一次性申请所有资源。
class Allocator {
private List<Object> als =
new ArrayList<>();
// 一次性申请所有资源
synchronized boolean apply(
Object from, Object to){
if(als.contains(from) ||
als.contains(to)){
return false;
} else {
als.add(from);
als.add(to);
}
return true;
}
// 归还资源
synchronized void free(
Object from, Object to){
als.remove(from);
als.remove(to);
}
}
class Account {
// actr应该为单例
private Allocator actr;
private int balance;
// 转账
void transfer(Account target, int amt){
// 一次性申请转出账户和转入账户,直到成功
while(!actr.apply(this, target))
;
try{
// 锁定转出账户
synchronized(this){
// 锁定转入账户
synchronized(target){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
} finally {
actr.free(this, target)
}
}
}
用“等待-通知”机制优化循环等待
在上面的代码第32行中,我们用死循环的方式。这种方式当apply耗时长,或者并发冲突量大时就不太适用。更好的方式应该是:
如果线程要求的条件(转出账本和转入账本同在文件架上)不满足,则线程阻塞自己,进入等待状态;当线程要求的条件(转出账本和转入账本同在文件架上)满足后,通知等待的线程重新执行。其中,使用线程阻塞的方式就能避免循环等待消耗CPU的问题。
总结一下:完整的等待-通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁。
class Allocator {
private List als;
// 一次性申请所有资源
synchronized void apply(
Object from, Object to){
// 经典写法
// 这里必须用while
// 当线程被唤醒后,是从wait命令后开始执行的,
// 而执行时间点往往跟唤醒时间点不一致,所以条件变量此时不一定满足了。
// 所以通过while循环可以再验证,
// 而if条件却做不到,它只能从wait命令后开始执行,所以要用while
while(als.contains(from) ||
als.contains(to)){
try{
wait();
}catch(Exception e){
}
}
als.add(from);
als.add(to);
}
// 归还资源
synchronized void free(
Object from, Object to){
als.remove(from);
als.remove(to);
notifyAll();
}
}
破坏不可抢占条件
这个synchronized做不到,原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。
只能使用java.util.concurrent.Lock工具类。
破坏循环等待条件
对资源进行编号。
管程Monitor
可以解决并发编程中的两大核心问题:同步和互斥。前者是指同一时刻只允许一个线程访问共享资源,后者是指多个线程之间如何通信、协作。
管程指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。具体来讲是管理类的成员变量和成员方法,让这个类是线程安全的。
解决互斥:将线程不安全的队列封装起来,对外提供线程安全的操作方法,例如入队操作和出队操作。
解决同步:在管程模型里,共享变量和对共享变量的操作是被封装起来的。框的上面只有一个入口,并且在入口旁边还有一个入口等待队列。当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。
那条件变量和条件变量等待队列的作用是什么呢?其实就是解决线程同步问题。你可以结合上面提到的阻塞队列的例子加深一下理解(阻塞队列的例子,是用管程来实现线程安全的阻塞队列,这个阻塞队列和管程内部的等待队列没有关系,本文中一定要注意阻塞队列和等待队列是不同的)。
假设有个线程T1执行阻塞队列的出队操作,执行出队操作,需要注意有个前提条件,就是阻塞队列不能是空的(空队列只能出Null值,是不允许的),阻塞队列不空这个前提条件对应的就是管程里的条件变量。 如果线程T1进入管程后恰好发现阻塞队列是空的,那怎么办呢?等待啊,去哪里等呢?就去条件变量对应的等待队列里面等。此时线程T1就去“队列不空”这个条件变量的等待队列中等待。
再假设之后另外一个线程T2执行阻塞队列的入队操作,入队操作执行成功之后,“阻塞队列不空”*这个条件对于线程T1来说已经满足了,此时线程T2要通知T1,告诉它需要的条件已经满足了。当线程T1得到通知后,会从*等待队列里面出来,但是出来之后不是马上执行,而是重新进入到入口等待队列里面。
Java内置的管程方案synchronized,只有一个条件变量。
wait操作是把当前线程放入条件变量的等待队列中,notifyall。
Java线程相关知识
生命周期
通用线程生命周期
- 初始状态,指的是线程已经被创建,但是还不允许分配CPU执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
- 可运行状态,指的是线程可以分配CPU执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配CPU执行。
- 当有空闲的CPU时,操作系统会将其分配给一个处于可运行状态的线程,被分配到CPU的线程的状态就转换成了运行状态。
- 运行状态的线程如果调用一个阻塞的API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放CPU使用权,休眠状态的线程永远没有机会获得CPU使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
- 线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。
Java线程生命周期
- NEW(初始化状态)
- RUNNABLE(可运行/运行状态)
- BLOCKED(阻塞状态)
- WAITING(无时限等待)
- TIMED_WAITING(有时限等待)
- TERMINATED(终止状态)
在操作系统层面,Java线程中的BLOCKED、WAITING、TIMED_WAITING是一种状态,即前面我们提到的休眠状态。也就是说只要Java线程处于这三种状态之一,那么这个线程就永远没有CPU的使用权。
线程数量
CPU密集型
IO密集型
并发工具类
Lock和Condition
使用
class X {
private final Lock rtl = new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}
为什么设计Lock和Condition
我们知道synchronized实现了管程,但是SDK层面又定义了Lock和Condition两个接口,前者可以解决互斥问题,后者可以解决同步问题,同样可以实现管程。那为什么又定义新的接口呢?
解决死锁问题,有一个方案是【破坏不可抢占条件】。但是synchronized并不能解决这个问题。这是因为synchronized申请资源的时候,如果申请不到,线程直接进入阻塞状态了。但是线程一旦进入阻塞,就无法释放线程已经占有的资源。但是我们期望的是:对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
如果新设计一把锁,那应该怎么做呢?
- 能够响应中断。synchronized的问题是,持有锁A后,如果尝试获取锁B失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁A。这样就破坏了不可抢占条件了。
- 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
- 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
相应的,lock接口提供了三个接口:
// 支持中断的API
void lockInterruptibly()
throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();
此外,synchronized只支持一个条件变量,而Condition可以支持多个条件变量。
可见性原理
synchronized的解锁 Happens-Before 于后续对这个锁的加锁。
那么lock呢?以上面的程序为例。利用了volatile相关的Happens-Before规则:
- 顺序性规则:对于线程T1,value+=1 Happens-Before 释放锁的操作unlock();
- volatile变量规则:由于state = 1会先读取state,所以线程T1的unlock()操作Happens-Before线程T2的lock()操作;
- 传递性规则:线程 T1的value+=1 Happens-Before 线程 T2 的 lock() 操作。
两个条件变量实现阻塞队列
一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队)。
public class BlockedQueue{
final Lock lock = new ReentrantLock();
// 条件变量:队列不满
final Condition notFull = lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty = lock.newCondition();
// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
//入队后,通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
//出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}
注意事项
synchronized不需要释放锁,lock必须要释放。
Semaphore信号量
信号量模型
在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问。
- init():设置计数器的初始值。
- down():计数器的值减1;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行。
- up():计数器的值加1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
而且,这三个方法都是原子性的,原子性由信号量模型来实现的。
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
//将当前线程插入等待队列
//阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
//移除等待队列中的某个线程T
//唤醒线程T
}
}
}
简单使用
static int count;
//初始化信号量
static final Semaphore s = new Semaphore(1);
//用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}
假设两个线程T1和T2同时访问addOne()方法,当它们同时调用acquire()的时候,由于acquire()是一个原子操作,所以只能有一个线程(假设T1)把信号量里的计数器减为0,另外一个线程(T2)则是将计数器减为-1。对于线程T1,信号量里面的计数器的值是0,大于等于0,所以线程T1会继续执行;对于线程T2,信号量里面的计数器的值是-1,小于0,按照信号量模型里对down()操作的描述,线程T2将被阻塞。所以此时只有线程T1会进入临界区执行count+=1;
。
当线程T1执行release()操作,也就是up()操作的时候,信号量里计数器的值是-1,加1之后的值是0,小于等于0,按照信号量模型里对up()操作的描述,此时等待队列中的T2将会被唤醒。于是T2在T1执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。
为什么需要信号量
上面例子,我们用Semaphore实现了互斥锁。那Semaphore还有什么功能呢?Semaphore可以允许多个线程访问一个临界区。
应用:限流器
如数据库连接池,同一时刻,允许多个线程使用连接池。对象池也是如此,指的是一次性创建出N个对象,之后所有的线程重复利用这N个对象,当然对象在被释放前,也是不允许其他线程使用的。
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
我们用一个List来保存对象实例,用Semaphore实现限流器。关键的代码是ObjPool里面的exec()方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用acquire()方法(与之匹配的是在finally里面调用release()方法),假设对象池的大小是10,信号量的计数器初始化为10,那么前10个线程调用acquire()方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于0,那么说明有线程在等待,此时会自动唤醒等待的线程。
ReadWriteLock读写锁
适用场景
读写锁,适用于读多写少的场景。
读写锁三大原则:
- 允许多个线程同时读共享变量;
- 只允许一个线程写共享变量;
- 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。
读写锁和互斥锁的区别是:前者允许多个线程同时读共享变量,后者不允许。
读写锁实现缓存
class Cache {
final Map m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
r.lock();
try {
return m.get(key);
}
finally {
r.unlock();
}
}
// 写缓存
V put(K key, V value) {
w.lock();
try {
return m.put(key, v);
}
finally {
w.unlock();
}
}
}
我们声明了一个Cache类,其中类型参数K代表缓存里key的类型,V代表缓存里value的类型。缓存的数据保存在Cache类内部的HashMap里面,HashMap不是线程安全的,这里我们使用读写锁ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是ReentrantReadWriteLock,通过名字你应该就能判断出来,它是支持可重入的。下面我们通过rwl创建了一把读锁和一把写锁。
StampedLock性能更高的读写锁
final StampedLock sl = new StampedLock();
// 获取/释放悲观读锁示意代码
long stamp = sl.readLock();
try {
//省略业务相关代码
} finally {
sl.unlockRead(stamp);
}
// 获取/释放写锁示意代码
long stamp = sl.writeLock();
try {
//省略业务相关代码
} finally {
sl.unlockWrite(stamp);
}
比ReadWriteLock更快的读写锁,始于Java8。
StampedLock和ReadWriteLock的区别:
- StampedLock支持3种模式:写锁,悲观读和乐观读;支持多个读;乐观读的时候支持写;
- ReadWriteLock支持2种模式:写锁,悲观读;支持多个读;读的时候不支持写;
class Point {
private int x, y;
final StampedLock sl = new StampedLock();
//计算到原点的距离
int distanceFromOrigin() {
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入局部变量,
// 读的过程数据可能被修改
int curX = x, curY = y;
//判断执行读操作期间,
//是否存在写操作,如果存在,
//则sl.validate返回false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
//释放悲观读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(
curX * curX + curY * curY);
}
}
局限性
StampedLock不支持重入;悲观读锁、写锁都不支持条件变量;不支持中断,会造成CPU飙升;
final StampedLock lock= new StampedLock();
Thread T1 = new Thread(()->{
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
LockSupport.park();
});
T1.start();
// 保证T1获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
//阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证T2阻塞在读锁
Thread.sleep(100);
//中断线程T2
//会导致线程T2所在CPU飙升
T2.interrupt();
T2.join();
CountDownLatch和CyclicBarrier:线程同步工具类
背景:
我们有一个对账系统:
可以使用CountDownLatch让主线程等待2个子线程。
// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为2
CountDownLatch latch = new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
前面我们实现了getPOrders()和getDOrders()的并行;事实上,对账操作也可以和查询进行并行;
这个方案的难点有2个:一个是线程T1和T2要做到步调一致,另一个是要能够通知到线程T3。
// 订单队列
Vector pos;
// 派送单队列
Vector dos;
// 执行回调的线程池
// 1.使用线程池是为了异步操作,否则回掉函数是同步调用的,也就是本次对账操作执行完才能进行下一轮的检查。
// 2.线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
总结:CountDownLatch主要用来解决一个线程等待多个线程的场景,而CyclicBarrier是一组线程之间互相等待。CountDownLatch的计数器是不能循环利用的,也就是说一旦计数器减到0,再有线程调用await(),该线程会直接通过。但CyclicBarrier的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到0会自动重置到你设置的初始值。除此之外,CyclicBarrier还可以设置回调函数,可以说是功能丰富。
并发容器
List
List里面只有一个实现类就是CopyOnWriteArrayList。写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁。
Map
Map接口的两个实现是ConcurrentHashMap和ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于ConcurrentHashMap的key是无序的,而ConcurrentSkipListMap的key是有序的。所以如果你需要保证key的顺序,就只能使用ConcurrentSkipListMap。
Set
Set接口的两个实现是CopyOnWriteArraySet和ConcurrentSkipListSet,使用场景可以参考前面讲述的CopyOnWriteArrayList和ConcurrentSkipListMap。
Queue
- 单端阻塞:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue和DelayQueue
- 双端阻塞:LinkedBlockingDeque
- 单端非阻塞:ConcurrentLinkedQueue
- 双端非阻塞:ConcurrentLinkedDeque
原子类Unsafe
CAS。
无锁方案的性能比较好,基本没有死锁问题,但是可能会有饥饿和活锁问题。此外,原子类只能解决单个共享变量的问题,多个共享变量的原子性问题,最好还是采用互斥锁方案。
Executor与线程池
Future:获取线程的执行结果
CompletableFuture回调机制的设计与实现
附录
CPU相关并发能力
原子操作实现原理
Java实现原子操作
- CAS:Compare And Swap
- CAS三大问题
- ABA问题
- 循环时间开销大
- 只能保证一个共享变量的原子操作
- 使用锁机制实现原子操作
参考
- Java并发编程实战
- Java并发编程的艺术
- 不可不说的Java“锁”事