一文学习JUC

目录

前言

什么是并发

CPU,内存和IO设备三者之间的速度是不匹配的,更好的利用CPU资源。

为什么要并发

CPU,内存和IO设备三者之间的速度是不匹配的,为了解决这个问题,做了下面的优化:

  • CPU增加了缓存,以均衡与内存的速度差异
  • 操作系统增加了进程、线程,以分时复用CPU,进而均衡CPU与I/O设备的速度差异;
  • 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。
    • 重排序包括编译器重排序和处理器重排序。对于后者,需要插入内存屏障才可以进行解决。

最终期望可以提高程序的性能,包括响应速度和吞吐量。

可以使用Lmbench测量上下文切换的时长,使用vmstat测量上下文切换的次数,我们可以关注CS字段。

并发三大问题

优化之后,虽然性能变好,但是却引来了三大问题,让我们程序的正确性出现了问题。

可见性

可见性一般是缓存导致的。指的是一个线程对共享变量的修改,另外一个线程能够立刻看到。

如果只有一个CPU,则不存在可见性问题。

img

如果是多个CPU,可见性问题并不好解决。

img

原子性

原子性是线程切换导致的。把一个或者多个操作在CPU执行的过程中不被中断的特性称为原子性

img

img

有序性

有序性是编译优化带来的。

如双重校验锁获取单例:

public class Singleton {
  static Singleton instance;
  static Singleton getInstance(){
    if (instance == null) {
      synchronized(Singleton.class) {
        if (instance == null)
          instance = new Singleton();
        }
    }
    return instance;
  }
}

我们以为的顺序:

  1. 分配一块内存M;
  2. 在内存M上初始化Singleton对象;
  3. 然后M的地址赋值给instance变量。

实际上的顺序:

  1. 分配一块内存M;
  2. 将M的地址赋值给instance变量;
  3. 最后在内存M上初始化Singleton对象。

这样就可能引发NPE问题。

img

Java内存模型JMM:解决可见性和有序性

前面提到:导致可见性的原因是缓存,导致有序性的原因是编译优化,那解决可见性、有序性最直接的办法就是禁用缓存和编译优化,但是这样问题虽然解决了,我们程序的性能可就堪忧了。因此,我们只能按需禁用缓存和编译优化。

因此,Java为我们提供了volatilesynchronizedfinal 三个关键字,以及六项 Happens-Before 规则

synchronized

volatile

语义是:禁用CPU缓存,必须从内从中读取或者写入。

final

表示一个变量是常量。

Happens-Before 规则

含义:前面一个操作的结果对后续操作是可见的

Happens-Before 约束了编译器的优化行为,虽允许编译器优化,但是要求编译器优化后一定遵守 Happens-Before 规则。

程序的顺序性规则

在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作。

如下面的,第5行 Happens-Before 第6行。

class VolatileExample {
  int x = 0;
  volatile boolean v = false;
  public void writer() {
    x = 42;
    v = true;
  }
  public void reader() {
    if (v == true) {
      // 这里x会是多少呢?
    }
  }
}

volatile变量规则

对一个volatile变量的写操作, Happens-Before 于后续对这个volatile变量的读操作。

传递性

这条规则是指如果A Happens-Before B,且B Happens-Before C,那么A Happens-Before C。

img

因此:

  1. “x=42” Happens-Before 写变量 “v=true” ,这是规则1的内容;
  2. 写变量“v=true” Happens-Before 读变量 “v=true”,这是规则2的内容 。

再根据这个传递性规则,我们得到结果:“x=42” Happens-Before 读变量“v=true”。

管程中锁的规则

对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。

线程 start() 规则

指主线程A启动子线程B后,子线程B能够看到主线程在启动子线程B前的操作。

Thread B = new Thread(()->{
  // 主线程调用B.start()之前
  // 所有对共享变量的修改,此处皆可见
  // 此例中,var==77
});
// 此处对共享变量var修改
var = 77;
// 主线程启动子线程
B.start();

线程 join() 规则

指主线程A等待子线程B完成,当子线程B完成后,主线程能够看到子线程的操作。

如果在线程A中,调用线程B的 join() 并成功返回,那么线程B中的任意操作Happens-Before 于该 join() 操作的返回。

Thread B = new Thread(()->{
  // 此处对共享变量var修改
  var = 66;
});
// 例如此处对共享变量修改,
// 则这个修改结果对线程B可见
// 主线程启动子线程
B.start();
B.join()
// 子线程所有对共享变量的修改
// 在主线程调用B.join()之后皆可见
// 此例中,var==66

线程中断规则interrupt

对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生。

对象终结规则finalize

一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。

重排序问题

数据依赖性

image-20220308225710487

这种关系,如果发生了重排序,程序的执行结果就会被改变。

as-if-serial语义

该语义的意思是:不管如何进行重排序,单线程情况下程序的执行结果不能被改变。因此存在数据依赖的不能重排序。

double pi = 3.14; // A 
double r = 1.0; // B 
doublearea=pi*r*r; //C

可见,A和B没有依赖关系,但是C和A、B都有关系,所以C必须最后,A和B可以重新排序。

锁:解决原子性问题synchronized

原子性问题的源头是线程切换。在单核时代,可以通过禁用CPU中断来避免线程切换,在多核时代,即使禁止中断也无法解决原子性问题。因此,我们需要保证同一时刻只有一个线程操作共享变量。

img

我们需要注意锁的范围:

  • 当修饰非静态方法的时候,锁定的是当前实例对象 this。
  • 当修饰静态方法的时候,锁定的是当前类的 Class 对象;
  • 同步方法块,锁的是Synchonized括号里配置的对象;

上面的图是一把锁保护一个资源。

事实上,一把锁可以保护多个资源。

  • 保护没有关联关系的多个资源;
  • 保护有关联关系的多个资源;

我们还需要了解锁的实现原理,这个需要了解JVM的相关知识,如Java对象头。

此外,锁的升级也需要我们了解。

img

更多详情参考:不可不说的Java“锁”事

死锁及解决方案

img

粗粒度的锁浪费性能,可以使用细粒度的锁提高并行度,进行性能优化。但是可能导致死锁:

一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。

img

死锁产生条件

下面4个条件同时发生才会死锁。

  • 互斥,共享资源 X 和 Y 只能被一个线程占用;(不能避免)
  • 占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
  • 不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
  • 循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。

死锁预防

互斥这一条不可避免。其他三条任意破坏一个即可。

破坏占有且等待

一次性申请所有资源。

img


class Allocator {
  private List<Object> als =
    new ArrayList<>();
  // 一次性申请所有资源
  synchronized boolean apply(
    Object from, Object to){
    if(als.contains(from) ||
         als.contains(to)){
      return false;  
    } else {
      als.add(from);
      als.add(to);  
    }
    return true;
  }
  // 归还资源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
  }
}

class Account {
  // actr应该为单例
  private Allocator actr;
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    // 一次性申请转出账户和转入账户,直到成功
    while(!actr.apply(this, target))
      
    try{
      // 锁定转出账户
      synchronized(this){              
        // 锁定转入账户
        synchronized(target){           
          if (this.balance > amt){
            this.balance -= amt;
            target.balance += amt;
          }
        }
      }
    } finally {
      actr.free(this, target)
    }
  } 
}

用“等待-通知”机制优化循环等待

在上面的代码第32行中,我们用死循环的方式。这种方式当apply耗时长,或者并发冲突量大时就不太适用。更好的方式应该是:

如果线程要求的条件(转出账本和转入账本同在文件架上)不满足,则线程阻塞自己,进入等待状态;当线程要求的条件(转出账本和转入账本同在文件架上)满足后,通知等待的线程重新执行。其中,使用线程阻塞的方式就能避免循环等待消耗CPU的问题。

总结一下:完整的等待-通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁

class Allocator {
  private List als;
  // 一次性申请所有资源
  synchronized void apply(
    Object from, Object to){
    // 经典写法
    // 这里必须用while
    // 当线程被唤醒后,是从wait命令后开始执行的,
    // 而执行时间点往往跟唤醒时间点不一致,所以条件变量此时不一定满足了。
    // 所以通过while循环可以再验证,
    // 而if条件却做不到,它只能从wait命令后开始执行,所以要用while
    while(als.contains(from) ||
         als.contains(to)){
      try{
        wait();
      }catch(Exception e){
      }   
    } 
    als.add(from);
    als.add(to);  
  }
  // 归还资源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
    notifyAll();
  }
}

破坏不可抢占条件

这个synchronized做不到,原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

只能使用java.util.concurrent.Lock工具类。

破坏循环等待条件

对资源进行编号。

管程Monitor

可以解决并发编程中的两大核心问题:同步和互斥。前者是指同一时刻只允许一个线程访问共享资源,后者是指多个线程之间如何通信、协作。

管程指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。具体来讲是管理类的成员变量和成员方法,让这个类是线程安全的。

解决互斥:将线程不安全的队列封装起来,对外提供线程安全的操作方法,例如入队操作和出队操作。

img

解决同步:在管程模型里,共享变量和对共享变量的操作是被封装起来的。框的上面只有一个入口,并且在入口旁边还有一个入口等待队列。当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。

img

条件变量条件变量等待队列的作用是什么呢?其实就是解决线程同步问题。你可以结合上面提到的阻塞队列的例子加深一下理解(阻塞队列的例子,是用管程来实现线程安全的阻塞队列,这个阻塞队列和管程内部的等待队列没有关系,本文中一定要注意阻塞队列和等待队列是不同的)。

假设有个线程T1执行阻塞队列的出队操作,执行出队操作,需要注意有个前提条件,就是阻塞队列不能是空的(空队列只能出Null值,是不允许的),阻塞队列不空这个前提条件对应的就是管程里的条件变量。 如果线程T1进入管程后恰好发现阻塞队列是空的,那怎么办呢?等待啊,去哪里等呢?就去条件变量对应的等待队列里面等。此时线程T1就去“队列不空”这个条件变量的等待队列中等待。

再假设之后另外一个线程T2执行阻塞队列的入队操作,入队操作执行成功之后,“阻塞队列不空”*这个条件对于线程T1来说已经满足了,此时线程T2要通知T1,告诉它需要的条件已经满足了。当线程T1得到通知后,会从*等待队列里面出来,但是出来之后不是马上执行,而是重新进入到入口等待队列里面。

Java内置的管程方案synchronized,只有一个条件变量。

img

wait操作是把当前线程放入条件变量的等待队列中,notifyall。

Java线程相关知识

生命周期

通用线程生命周期

img

  1. 初始状态,指的是线程已经被创建,但是还不允许分配CPU执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
  2. 可运行状态,指的是线程可以分配CPU执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配CPU执行。
  3. 当有空闲的CPU时,操作系统会将其分配给一个处于可运行状态的线程,被分配到CPU的线程的状态就转换成了运行状态
  4. 运行状态的线程如果调用一个阻塞的API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放CPU使用权,休眠状态的线程永远没有机会获得CPU使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
  5. 线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。

Java线程生命周期

  1. NEW(初始化状态)
  2. RUNNABLE(可运行/运行状态)
  3. BLOCKED(阻塞状态)
  4. WAITING(无时限等待)
  5. TIMED_WAITING(有时限等待)
  6. TERMINATED(终止状态)

在操作系统层面,Java线程中的BLOCKED、WAITING、TIMED_WAITING是一种状态,即前面我们提到的休眠状态。也就是说只要Java线程处于这三种状态之一,那么这个线程就永远没有CPU的使用权

img

线程数量

CPU密集型

IO密集型

并发工具类

Lock和Condition

使用

class X {
  private final Lock rtl = new ReentrantLock();
  int value;
  public void addOne() {
    // 获取锁
    rtl.lock();  
    try {
      value+=1;
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
}

为什么设计Lock和Condition

我们知道synchronized实现了管程,但是SDK层面又定义了Lock和Condition两个接口,前者可以解决互斥问题,后者可以解决同步问题,同样可以实现管程。那为什么又定义新的接口呢?

解决死锁问题,有一个方案是【破坏不可抢占条件】。但是synchronized并不能解决这个问题。这是因为synchronized申请资源的时候,如果申请不到,线程直接进入阻塞状态了。但是线程一旦进入阻塞,就无法释放线程已经占有的资源。但是我们期望的是:对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。

如果新设计一把锁,那应该怎么做呢?

  1. 能够响应中断。synchronized的问题是,持有锁A后,如果尝试获取锁B失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁A。这样就破坏了不可抢占条件了。
  2. 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

相应的,lock接口提供了三个接口:

// 支持中断的API
void lockInterruptibly() 
  throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) 
  throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();

此外,synchronized只支持一个条件变量,而Condition可以支持多个条件变量。

可见性原理

synchronized的解锁 Happens-Before 于后续对这个锁的加锁。

那么lock呢?以上面的程序为例。利用了volatile相关的Happens-Before规则:

  1. 顺序性规则:对于线程T1,value+=1 Happens-Before 释放锁的操作unlock();
  2. volatile变量规则:由于state = 1会先读取state,所以线程T1的unlock()操作Happens-Before线程T2的lock()操作;
  3. 传递性规则:线程 T1的value+=1 Happens-Before 线程 T2 的 lock() 操作。

两个条件变量实现阻塞队列

一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队)。

public class BlockedQueue{
  final Lock lock = new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull = lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty = lock.newCondition();

  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满
        notFull.await();
      }  
      // 省略入队操作...
      //入队后,通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }  
      // 省略出队操作...
      //出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

注意事项

synchronized不需要释放锁,lock必须要释放。

Semaphore信号量

信号量模型

在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问。

img

  • init():设置计数器的初始值。
  • down():计数器的值减1;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行。
  • up():计数器的值加1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

而且,这三个方法都是原子性的,原子性由信号量模型来实现的。

class Semaphore{
  // 计数器
  int count;
  // 等待队列
  Queue queue;
  // 初始化操作
  Semaphore(int c){
    this.count=c;
  }
  // 
  void down(){
    this.count--;
    if(this.count<0){
      //将当前线程插入等待队列
      //阻塞当前线程
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
      //移除等待队列中的某个线程T
      //唤醒线程T
    }
  }
}

简单使用

static int count;
//初始化信号量
static final Semaphore s = new Semaphore(1);
//用信号量保证互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

假设两个线程T1和T2同时访问addOne()方法,当它们同时调用acquire()的时候,由于acquire()是一个原子操作,所以只能有一个线程(假设T1)把信号量里的计数器减为0,另外一个线程(T2)则是将计数器减为-1。对于线程T1,信号量里面的计数器的值是0,大于等于0,所以线程T1会继续执行;对于线程T2,信号量里面的计数器的值是-1,小于0,按照信号量模型里对down()操作的描述,线程T2将被阻塞。所以此时只有线程T1会进入临界区执行count+=1;

当线程T1执行release()操作,也就是up()操作的时候,信号量里计数器的值是-1,加1之后的值是0,小于等于0,按照信号量模型里对up()操作的描述,此时等待队列中的T2将会被唤醒。于是T2在T1执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。

为什么需要信号量

上面例子,我们用Semaphore实现了互斥锁。那Semaphore还有什么功能呢?Semaphore可以允许多个线程访问一个临界区

应用:限流器

如数据库连接池,同一时刻,允许多个线程使用连接池。对象池也是如此,指的是一次性创建出N个对象,之后所有的线程重复利用这N个对象,当然对象在被释放前,也是不允许其他线程使用的。

class ObjPool<T, R> {

  final List<T> pool;

  // 用信号量实现限流器
  final Semaphore sem;

  // 构造函数
  ObjPool(int size, T t){
    pool = new Vector<T>(){};
    for(int i=0; i<size; i++){
      pool.add(t);
    }
    sem = new Semaphore(size);
  }

  // 利用对象池的对象,调用func
  R exec(Function<T,R> func) {
    T t = null;
    sem.acquire();
    try {
      t = pool.remove(0);
      return func.apply(t);
    } finally {
      pool.add(t);
      sem.release();
    }
  }
}

// 创建对象池

ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

我们用一个List来保存对象实例,用Semaphore实现限流器。关键的代码是ObjPool里面的exec()方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用acquire()方法(与之匹配的是在finally里面调用release()方法),假设对象池的大小是10,信号量的计数器初始化为10,那么前10个线程调用acquire()方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于0,那么说明有线程在等待,此时会自动唤醒等待的线程。

ReadWriteLock读写锁

适用场景

读写锁,适用于读多写少的场景。

读写锁三大原则:

  1. 允许多个线程同时读共享变量;
  2. 只允许一个线程写共享变量;
  3. 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁和互斥锁的区别是:前者允许多个线程同时读共享变量,后者不允许。

读写锁实现缓存

class Cache {
  final Map m = new HashMap<>();
  final ReadWriteLock rwl = new ReentrantReadWriteLock();
  // 读锁
  final Lock r = rwl.readLock();
  // 写锁
  final Lock w = rwl.writeLock();
  // 读缓存
  V get(K key) {
    r.lock();
    try { 
      return m.get(key); 
    }
    finally { 
      r.unlock(); 
    }
  }
  // 写缓存
  V put(K key, V value) {
    w.lock();
    try { 
      return m.put(key, v);
    }
    finally { 
      w.unlock(); 
    }
  }
}

我们声明了一个Cache类,其中类型参数K代表缓存里key的类型,V代表缓存里value的类型。缓存的数据保存在Cache类内部的HashMap里面,HashMap不是线程安全的,这里我们使用读写锁ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是ReentrantReadWriteLock,通过名字你应该就能判断出来,它是支持可重入的。下面我们通过rwl创建了一把读锁和一把写锁。

StampedLock性能更高的读写锁

final StampedLock sl = new StampedLock();
  
// 获取/释放悲观读锁示意代码
long stamp = sl.readLock();
try {
  //省略业务相关代码
} finally {
  sl.unlockRead(stamp);
}

// 获取/释放写锁示意代码
long stamp = sl.writeLock();
try {
  //省略业务相关代码
} finally {
  sl.unlockWrite(stamp);
}

比ReadWriteLock更快的读写锁,始于Java8。

StampedLock和ReadWriteLock的区别:

  • StampedLock支持3种模式:写锁,悲观读和乐观读;支持多个读;乐观读的时候支持写;
  • ReadWriteLock支持2种模式:写锁,悲观读;支持多个读;读的时候不支持写;
class Point {
  private int x, y;
  final StampedLock sl = new StampedLock();
  //计算到原点的距离  
  int distanceFromOrigin() {
    // 乐观读
    long stamp = sl.tryOptimisticRead();
    // 读入局部变量,
    // 读的过程数据可能被修改
    int curX = x, curY = y;
    //判断执行读操作期间,
    //是否存在写操作,如果存在,
    //则sl.validate返回false
    if (!sl.validate(stamp)){
      // 升级为悲观读锁
      stamp = sl.readLock();
      try {
        curX = x;
        curY = y;
      } finally {
        //释放悲观读锁
        sl.unlockRead(stamp);
      }
    }
    return Math.sqrt(
      curX * curX + curY * curY);
  }
}

局限性

StampedLock不支持重入;悲观读锁、写锁都不支持条件变量;不支持中断,会造成CPU飙升;

final StampedLock lock= new StampedLock();
Thread T1 = new Thread(()->{
  // 获取写锁
  lock.writeLock();
  // 永远阻塞在此处,不释放写锁
  LockSupport.park();
});
T1.start();
// 保证T1获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
  //阻塞在悲观读锁
  lock.readLock()
);
T2.start();
// 保证T2阻塞在读锁
Thread.sleep(100);
//中断线程T2
//会导致线程T2所在CPU飙升
T2.interrupt();
T2.join();

CountDownLatch和CyclicBarrier:线程同步工具类

背景:

我们有一个对账系统:

img

可以使用CountDownLatch让主线程等待2个子线程。

// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
  // 计数器初始化为2
  CountDownLatch latch = new CountDownLatch(2);
  // 查询未对账订单
  executor.execute(()-> {
    pos = getPOrders();
    latch.countDown();
  });
  // 查询派送单
  executor.execute(()-> {
    dos = getDOrders();
    latch.countDown();
  });
  
  // 等待两个查询操作结束
  latch.await();
  
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}

前面我们实现了getPOrders()和getDOrders()的并行;事实上,对账操作也可以和查询进行并行;

img

这个方案的难点有2个:一个是线程T1和T2要做到步调一致,另一个是要能够通知到线程T3。

// 订单队列
Vector pos;
// 派送单队列
Vector dos;
// 执行回调的线程池
// 1.使用线程池是为了异步操作,否则回掉函数是同步调用的,也就是本次对账操作执行完才能进行下一轮的检查。
// 2.线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
    executor.execute(()->check());
  });
void check(){
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 执行对账操作
  diff = check(p, d);
  // 差异写入差异库
  save(diff);
}
void checkAll(){
  // 循环查询订单库
  Thread T1 = new Thread(()->{
    while(存在未对账订单){
      // 查询订单库
      pos.add(getPOrders());
      // 等待
      barrier.await();
    }
  });
  T1.start();  
  // 循环查询运单库
  Thread T2 = new Thread(()->{
    while(存在未对账订单){
      // 查询运单库
      dos.add(getDOrders());
      // 等待
      barrier.await();
    }
  });
  T2.start();
}

总结:CountDownLatch主要用来解决一个线程等待多个线程的场景,而CyclicBarrier是一组线程之间互相等待。CountDownLatch的计数器是不能循环利用的,也就是说一旦计数器减到0,再有线程调用await(),该线程会直接通过。但CyclicBarrier的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到0会自动重置到你设置的初始值。除此之外,CyclicBarrier还可以设置回调函数,可以说是功能丰富。

并发容器

img

List

List里面只有一个实现类就是CopyOnWriteArrayList。写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁。

Map

Map接口的两个实现是ConcurrentHashMap和ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于ConcurrentHashMap的key是无序的,而ConcurrentSkipListMap的key是有序的。所以如果你需要保证key的顺序,就只能使用ConcurrentSkipListMap。

img

Set

Set接口的两个实现是CopyOnWriteArraySet和ConcurrentSkipListSet,使用场景可以参考前面讲述的CopyOnWriteArrayList和ConcurrentSkipListMap。

Queue

  • 单端阻塞:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue和DelayQueue
  • 双端阻塞:LinkedBlockingDeque
  • 单端非阻塞:ConcurrentLinkedQueue
  • 双端非阻塞:ConcurrentLinkedDeque

原子类Unsafe

CAS。

img

无锁方案的性能比较好,基本没有死锁问题,但是可能会有饥饿和活锁问题。此外,原子类只能解决单个共享变量的问题,多个共享变量的原子性问题,最好还是采用互斥锁方案。

Java魔法类:Unsafe应用解析

Executor与线程池

Java线程池实现原理及其在美团业务中的实践

Future:获取线程的执行结果

CompletableFuture回调机制的设计与实现

附录

CPU相关并发能力

image-20220307230748609

原子操作实现原理

image-20220307232347611

Java实现原子操作

  1. CAS:Compare And Swap
  2. CAS三大问题
    1. ABA问题
    2. 循环时间开销大
    3. 只能保证一个共享变量的原子操作
  3. 使用锁机制实现原子操作

参考

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦