文章转载自 B 站视频笔记,B 站链接:https://www.bilibili.com/video/BV1JT4y1S7K8
再谈多线程
JUC相对于Java应用层的学习难度更大,开篇推荐掌握的预备知识: JavaSE多线程部分(必备) 、操作系统、JVM(推荐) 、计算机组成原理。掌握预备知识会让你的学习更加轻松,其中,JavaSE多线程部分要求必须掌握,否则无法继续学习本教程!我们不会再去重复教学JavaSE阶段的任何知识了。
各位小伙伴一定要点击收藏按钮(收藏 = 学会)
还记得我们在JavaSE中学习的多线程吗?让我们来回顾一下:
在我们的操作系统之上,可以同时运行很多个进程,并且每个进程之间相互隔离互不干扰。我们的CPU会通过时间片轮转算法,为每一个进程分配时间片,并在时间片使用结束后切换下一个进程继续执行,通过这种方式来实现宏观上的多个程序同时运行。
由于每个进程都有一个自己的内存空间,进程之间的通信就变得非常麻烦(比如要共享某些数据)而且执行不同进程会产生上下文切换,非常耗时,那么有没有一种更好地方案呢?
后来,线程横空出世,一个进程可以有多个线程,线程是程序执行中一个单一的顺序控制流程,现在线程才是程序执行流的最小单元,各个线程之间共享程序的内存空间(也就是所在进程的内存空间),上下文切换速度也高于进程。
现在有这样一个问题:
1 2 3 4 public static void main (String[] args) { int [] arr = new int []{3 , 1 , 5 , 2 , 4 }; }
按照正常思维,我们肯定是这样:
1 2 3 4 5 6 7 8 public static void main (String[] args) { int [] arr = new int []{3 , 1 , 5 , 2 , 4 }; Arrays.sort(arr); for (int i : arr) { System.out.println(i); } }
而我们学习了多线程之后,可以换个思路来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { int [] arr = new int []{3 , 1 , 5 , 2 , 4 }; for (int i : arr) { new Thread (() -> { try { Thread.sleep(i * 1000 ); System.out.println(i); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
我们接触过的很多框架都在使用多线程,比如Tomcat服务器,所有用户的请求都是通过不同的线程来进行处理的,这样我们的网站才可以同时响应多个用户的请求,要是没有多线程,可想而知服务器的处理效率会有多低。
虽然多线程能够为我们解决很多问题,但是,如何才能正确地使用多线程,如何才能将多线程的资源合理使用,这都是我们需要关心的问题。
在Java 5的时候,新增了java.util.concurrent(JUC)包,其中包括大量用于多线程编程的工具类,目的是为了更好的支持高并发任务,让开发者进行多线程编程时减少竞争条件和死锁的问题!通过使用这些工具类,我们的程序会更加合理地使用多线程。而我们这一系列视频的主角,正是JUC
。
但是我们先不着急去看这些内容,第一章,我们先来补点基础知识。
并发与并行 我们经常听到并发编程,那么这个并发代表的是什么意思呢?而与之相似的并行又是什么意思?它们之间有什么区别?
比如现在一共有三个工作需要我们去完成。
顺序执行 顺序执行其实很好理解,就是我们依次去将这些任务完成了:
实际上就是我们同一时间只能处理一个任务,所以需要前一个任务完成之后,才能继续下一个任务,依次完成所有任务。
并发执行 并发执行也是我们同一时间只能处理一个任务,但是我们可以每个任务轮着做(时间片轮转):
只要我们单次处理分配的时间足够的短,在宏观看来,就是三个任务在同时进行。
而我们Java中的线程,正是这种机制,当我们需要同时处理上百个上千个任务时,很明显CPU的数量是不可能赶得上我们的线程数的,所以说这时就要求我们的程序有良好的并发性能,来应对同一时间大量的任务处理。学习Java并发编程,能够让我们在以后的实际场景中,知道该如何应对高并发的情况。
并行执行 并行执行就突破了同一时间只能处理一个任务的限制,我们同一时间可以做多个任务:
比如我们要进行一些排序操作,就可以用到并行计算,只需要等待所有子任务完成,最后将结果汇总即可。包括分布式计算模型MapReduce,也是采用的并行计算思路。
再谈锁机制 谈到锁机制,相信各位应该并不陌生了,我们在JavaSE阶段,通过使用synchronized
关键字来实现锁,这样就能够很好地解决线程之间争抢资源的情况。那么,synchronized
底层到底是如何实现的呢?
我们知道,使用synchronized
,一定是和某个对象相关联的,比如我们要对某一段代码加锁,那么我们就需要提供一个对象来作为锁本身:
1 2 3 4 5 public static void main (String[] args) { synchronized (Main.class) { } }
我们来看看,它变成字节码之后会用到哪些指令:
其中最关键的就是monitorenter
指令了,可以看到之后也有monitorexit
与之进行匹配(注意这里有2个),monitorenter
和monitorexit
分别对应加锁和释放锁,在执行monitorenter
之前需要尝试获取锁,每个对象都有一个monitor
监视器与之对应,而这里正是去获取对象监视器的所有权,一旦monitor
所有权被某个线程持有,那么其他线程将无法获得(管程模型的一种实现)。
在代码执行完成之后,我们可以看到,一共有两个monitorexit
在等着我们,那么为什么这里会有两个呢,按理说monitorenter
和monitorexit
不应该一一对应吗,这里为什么要释放锁两次呢?
首先我们来看第一个,这里在释放锁之后,会马上进入到一个goto指令,跳转到15行,而我们的15行对应的指令就是方法的返回指令,其实正常情况下只会执行第一个monitorexit
释放锁,在释放锁之后就接着同步代码块后面的内容继续向下执行了。而第二个,其实是用来处理异常的,可以看到,它的位置是在12行,如果程序运行发生异常,那么就会执行第二个monitorexit
,并且会继续向下通过athrow
指令抛出异常,而不是直接跳转到15行正常运行下去。
实际上synchronized
使用的锁就是存储在Java对象头中的,我们知道,对象是存放在堆内存中的,而每个对象内部,都有一部分空间用于存储对象头信息,而对象头信息中,则包含了Mark Word用于存放hashCode
和对象的锁信息,在不同状态下,它存储的数据结构有一些不同。
重量级锁 在JDK6之前,synchronized
一直被称为重量级锁,monitor
依赖于底层操作系统的Lock实现,Java的线程是映射到操作系统的原生线程上,切换成本较高。而在JDK6之后,锁的实现得到了改进。我们先从最原始的重量级锁开始:
我们说了,每个对象都有一个monitor与之关联,在Java虚拟机(HotSpot)中,monitor是由ObjectMonitor实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ObjectMonitor () { _header = NULL ; _count = 0 ; _waiters = 0 , _recursions = 0 ; _object = NULL ; _owner = NULL ; _WaitSet = NULL ; _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; FreeNext = NULL ; _EntryList = NULL ; _SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ; }
每个等待锁的线程都会被封装成ObjectWaiter对象,进入到如下机制:
ObjectWaiter首先会进入 Entry Set等着,当线程获取到对象的monitor
后进入 The Owner 区域并把monitor
中的owner
变量设置为当前线程,同时monitor
中的计数器count
加1,若线程调用wait()
方法,将释放当前持有的monitor
,owner
变量恢复为null
,count
自减1,同时该线程进入 WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor
并复位变量的值,以便其他线程进入获取对象的monitor
。
虽然这样的设计思路非常合理,但是在大多数应用上,每一个线程占用同步代码块的时间并不是很长,我们完全没有必要将竞争中的线程挂起然后又唤醒,并且现代CPU基本都是多核心运行的,我们可以采用一种新的思路来实现锁。
在JDK1.4.2时,引入了自旋锁(JDK6之后默认开启),它不会将处于等待状态的线程挂起,而是通过无限循环的方式,不断检测是否能够获取锁,由于单个线程占用锁的时间非常短,所以说循环次数不会太多,可能很快就能够拿到锁并运行,这就是自旋锁。当然,仅仅是在等待时间非常短的情况下,自旋锁的表现会很好,但是如果等待时间太长,由于循环是需要处理器继续运算的,所以这样只会浪费处理器资源,因此自旋锁的等待时间是有限制的,默认情况下为10次,如果失败,那么会进而采用重量级锁机制。
在JDK6之后,自旋锁得到了一次优化,自旋的次数限制不再是固定的,而是自适应变化的,比如在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行,那么这次自旋也是有可能成功的,所以会允许自旋更多次。当然,如果某个锁经常都自旋失败,那么有可能会不再采用自旋策略,而是直接使用重量级锁。
轻量级锁
从JDK 1.6开始,为了减少获得锁和释放锁带来的性能消耗,就引入了轻量级锁。
轻量级锁的目标是,在无竞争情况下,减少重量级锁产生的性能消耗(并不是为了代替重量级锁,实际上就是赌一手同一时间只有一个线程在占用资源),包括系统调用引起的内核态与用户态切换、线程阻塞造成的线程切换等。它不像是重量级锁那样,需要向操作系统申请互斥量。它的运作机制如下:
在即将开始执行同步代码块中的内容时,会首先检查对象的Mark Word,查看锁对象是否被其他线程占用,如果没有任何线程占用,那么会在当前线程中所处的栈帧中建立一个名为锁记录(Lock Record)的空间,用于复制并存储对象目前的Mark Word信息(官方称为Displaced Mark Word)。
接着,虚拟机将使用CAS操作将对象的Mark Word更新为轻量级锁状态(数据结构变为指向Lock Record的指针,指向的是当前的栈帧)
CAS(Compare And Swap)是一种无锁算法(我们之前在Springboot阶段已经讲解过了),它并不会为对象加锁,而是在执行的时候,看看当前数据的值是不是我们预期的那样,如果是,那就正常进行替换,如果不是,那么就替换失败。比如有两个线程都需要修改变量i
的值,默认为10,现在一个线程要将其修改为20,另一个要修改为30,如果他们都使用CAS算法,那么并不会加锁访问i
,而是直接尝试修改i
的值,但是在修改时,需要确认i
是不是10,如果是,表示其他线程还没对其进行修改,如果不是,那么说明其他线程已经将其修改,此时不能完成修改任务,修改失败。
在CPU中,CAS操作使用的是cmpxchg
指令,能够从最底层硬件层面得到效率的提升。
如果CAS操作失败了的话,那么说明可能这时有线程已经进入这个同步代码块了,这时虚拟机会再次检查对象的Mark Word,是否指向当前线程的栈帧,如果是,说明不是其他线程,而是当前线程已经有了这个对象的锁,直接放心大胆进同步代码块即可。如果不是,那确实是被其他线程占用了。
这时,轻量级锁一开始的想法就是错的(这时有对象在竞争资源,已经赌输了),所以说只能将锁膨胀为重量级锁,按照重量级锁的操作执行(注意锁的膨胀是不可逆的)
所以,轻量级锁 -> 失败 -> 自适应自旋锁 -> 失败 -> 重量级锁
解锁过程同样采用CAS算法,如果对象的MarkWord仍然指向线程的锁记录,那么就用CAS操作把对象的MarkWord和复制到栈帧中的Displaced Mark Word进行交换。如果替换失败,说明其他线程尝试过获取该锁,在释放锁的同时,需要唤醒被挂起的线程。
偏向锁 偏向锁相比轻量级锁更纯粹,干脆就把整个同步都消除掉,不需要再进行CAS操作了。它的出现主要是得益于人们发现某些情况下某个锁频繁地被同一个线程获取,这种情况下,我们可以对轻量级锁进一步优化。
偏向锁实际上就是专门为单个线程而生的,当某个线程第一次获得锁时,如果接下来都没有其他线程获取此锁,那么持有锁的线程将不再需要进行同步操作。
可以从之前的MarkWord结构中看到,偏向锁也会通过CAS操作记录线程的ID,如果一直都是同一个线程获取此锁,那么完全没有必要在进行额外的CAS操作。当然,如果有其他线程来抢了,那么偏向锁会根据当前状态,决定是否要恢复到未锁定或是膨胀为轻量级锁。
如果我们需要使用偏向锁,可以添加-XX:+UseBiased
参数来开启。
所以,最终的锁等级为:未锁定 < 偏向锁 < 轻量级锁 < 重量级锁
值得注意的是,如果对象通过调用hashCode()
方法计算过对象的一致性哈希值,那么它是不支持偏向锁的,会直接进入到轻量级锁状态,因为Hash是需要被保存的,而偏向锁的Mark Word数据结构,无法保存Hash值;如果对象已经是偏向锁状态,再去调用hashCode()
方法,那么会直接将锁升级为重量级锁,并将哈希值存放在monitor
(有预留位置保存)中。
锁消除和锁粗化 锁消除和锁粗化都是在运行时的一些优化方案,比如我们某段代码虽然加了锁,但是在运行时根本不可能出现各个线程之间资源争夺的情况,这种情况下,完全不需要任何加锁机制,所以锁会被消除。锁粗化则是我们代码中频繁地出现互斥同步操作,比如在一个循环内部加锁,这样明显是非常消耗性能的,所以虚拟机一旦检测到这种操作,会将整个同步范围进行扩展。
JMM内存模型 注意这里提到的内存模型和我们在JVM中介绍的内存模型不在同一个层次,JVM中的内存模型是虚拟机规范对整个内存区域的规划,而Java内存模型,是在JVM内存模型之上的抽象模型,具体实现依然是基于JVM内存模型实现的,我们会在后面介绍。
Java内存模型 我们在计算机组成原理
中学习过,在我们的CPU中,一般都会有高速缓存,而它的出现,是为了解决内存的速度跟不上处理器的处理速度的问题,所以CPU内部会添加一级或多级高速缓存来提高处理器的数据获取效率,但是这样也会导致一个很明显的问题,因为现在基本都是多核心处理器,每个处理器都有一个自己的高速缓存,那么又该怎么去保证每个处理器的高速缓存内容一致呢?
为了解决缓存一致性的问题,需要各个处理器访问缓存时都遵循一些协议,在读写时要根据协议来进行操作,这类协议有MSI、MESI(Illinois Protocol)、MOSI、Synapse、Firefly及Dragon Protocol等。
而Java也采用了类似的模型来实现支持多线程的内存模型:
JMM(Java Memory Model)内存模型规定如下:
所有的变量全部存储在主内存(注意这里包括下面提到的变量,指的都是会出现竞争的变量,包括成员变量、静态变量等,而局部变量这种属于线程私有,不包括在内)
每条线程有着自己的工作内存(可以类比CPU的高速缓存)线程对变量的所有操作,必须在工作内存中进行,不能直接操作主内存中的数据。
不同线程之间的工作内存相互隔离,如果需要在线程之间传递内容,只能通过主内存完成,无法直接访问对方的工作内存。
也就是说,每一条线程如果要操作主内存中的数据,那么得先拷贝到自己的工作内存中,并对工作内存中数据的副本进行操作,操作完成之后,也需要从工作副本中将结果拷贝回主内存中,具体的操作就是Save
(保存)和Load
(加载)操作。
那么各位肯定会好奇,这个内存模型,结合之前JVM所讲的内容,具体是怎么实现的呢?
主内存:对应堆中存放对象的实例的部分。
工作内存:对应线程的虚拟机栈的部分区域,虚拟机可能会对这部分内存进行优化,将其放在CPU的寄存器或是高速缓存中。比如在访问数组时,由于数组是一段连续的内存空间,所以可以将一部分连续空间放入到CPU高速缓存中,那么之后如果我们顺序读取这个数组,那么大概率会直接缓存命中。
前面我们提到,在CPU中可能会遇到缓存不一致的问题,而Java中,也会遇到,比如下面这种情况:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class Main { private static int i = 0 ; public static void main (String[] args) throws InterruptedException { new Thread (() -> { for (int j = 0 ; j < 100000 ; j++) i++; System.out.println("线程1结束" ); }).start(); new Thread (() -> { for (int j = 0 ; j < 100000 ; j++) i++; System.out.println("线程2结束" ); }).start(); Thread.sleep(1000 ); System.out.println(i); } }
可以看到这里是两个线程同时对变量i
各自进行100000次自增操作,但是实际得到的结果并不是我们所期望的那样。
那么为什么会这样呢?在之前学习了JVM之后,相信各位应该已经知道,自增操作实际上并不是由一条指令完成的(注意一定不要理解为一行代码就是一个指令完成的):
包括变量i
的获取、修改、保存,都是被拆分为一个一个的操作完成的,那么这个时候就有可能出现在修改完保存之前,另一条线程也保存了,但是当前线程是毫不知情的。
所以说,我们当时在JavaSE阶段讲解这个问题的时候,是通过synchronized
关键字添加同步代码块解决的,当然,我们后面还会讲解另外的解决方案(原子类)
重排序 在编译或执行时,为了优化程序的执行效率,编译器或处理器常常会对指令进行重排序,有以下情况:
编译器重排序:Java编译器通过对Java代码语义的理解,根据优化规则对代码指令进行重排序。
机器指令级别的重排序:现代处理器很高级,能够自主判断和变更机器指令的执行顺序。
指令重排序能够在不改变结果(单线程)的情况下,优化程序的运行效率,比如:
1 2 3 4 5 public static void main (String[] args) { int a = 10 ; int b = 20 ; System.out.println(a + b); }
我们其实可以交换第一行和第二行:
1 2 3 4 5 public static void main (String[] args) { int b = 10 ; int a = 20 ; System.out.println(a + b); }
即使发生交换,但是我们程序最后的运行结果是不会变的,当然这里只通过代码的形式演示,实际上JVM在执行字节码指令时也会进行优化,可能两个指令并不会按照原有的顺序进行。
虽然单线程下指令重排确实可以起到一定程度的优化作用,但是在多线程下,似乎会导致一些问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class Main { private static int a = 0 ; private static int b = 0 ; public static void main (String[] args) { new Thread (() -> { if (b == 1 ) { if (a == 0 ) { System.out.println("A" ); }else { System.out.println("B" ); } } }).start(); new Thread (() -> { a = 1 ; b = 1 ; }).start(); } }
上面这段代码,在正常情况下,按照我们的正常思维,是不可能输出A
的,因为只要b等于1,那么a肯定也是1才对,因为a是在b之前完成的赋值。但是,如果进行了重排序,那么就有可能,a和b的赋值发生交换,b先被赋值为1,而恰巧这个时候,线程1开始判定b是不是1了,这时a还没来得及被赋值为1,可能线程1就已经走到打印那里去了,所以,是有可能输出A
的。
volatile关键字 好久好久都没有认识新的关键字了,今天我们来看一个新的关键字volatile
,开始之前我们先介绍三个词语:
原子性:其实之前讲过很多次了,就是要做什么事情要么做完,要么就不做,不存在做一半的情况。
可见性:指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。
有序性:即程序执行的顺序按照代码的先后顺序执行。
我们之前说了,如果多线程访问同一个变量,那么这个变量会被线程拷贝到自己的工作内存中进行操作,而不是直接对主内存中的变量本体进行操作,下面这个操作看起来是一个有限循环,但是是无限的:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Main { private static int a = 0 ; public static void main (String[] args) throws InterruptedException { new Thread (() -> { while (a == 0 ); System.out.println("线程结束!" ); }).start(); Thread.sleep(1000 ); System.out.println("正在修改a的值..." ); a = 1 ; } }
实际上这就是我们之前说的,虽然我们主线程中修改了a的值,但是另一个线程并不知道a的值发生了改变,所以循环中依然是使用旧值在进行判断,因此,普通变量是不具有可见性的。
要解决这种问题,我们第一个想到的肯定是加锁,同一时间只能有一个线程使用,这样总行了吧,确实,这样的话肯定是可以解决问题的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Main { private static int a = 0 ; public static void main (String[] args) throws InterruptedException { new Thread (() -> { while (a == 0 ) { synchronized (Main.class){} } System.out.println("线程结束!" ); }).start(); Thread.sleep(1000 ); System.out.println("正在修改a的值..." ); synchronized (Main.class){ a = 1 ; } } }
但是,除了硬加一把锁的方案,我们也可以使用volatile
关键字来解决,此关键字的第一个作用,就是保证变量的可见性。当写一个volatile
变量时,JMM会把该线程本地内存中的变量强制刷新到主内存中去,并且这个写会操作会导致其他线程中的volatile
变量缓存无效,这样,另一个线程修改了这个变时,当前线程会立即得知,并将工作内存中的变量更新为最新的版本。
那么我们就来试试看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Main { private static volatile int a = 0 ; public static void main (String[] args) throws InterruptedException { new Thread (() -> { while (a == 0 ); System.out.println("线程结束!" ); }).start(); Thread.sleep(1000 ); System.out.println("正在修改a的值..." ); a = 1 ; } }
结果还真的如我们所说的那样,当a发生改变时,循环立即结束。
当然,虽然说volatile
能够保证可见性,但是不能保证原子性,要解决我们上面的i++
的问题,以我们目前所学的知识,还是只能使用加锁来完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Main { private static volatile int a = 0 ; public static void main (String[] args) throws InterruptedException { Runnable r = () -> { for (int i = 0 ; i < 10000 ; i++) a++; System.out.println("任务完成!" ); }; new Thread (r).start(); new Thread (r).start(); Thread.sleep(1000 ); System.out.println(a); } }
不对啊,volatile
不是能在改变变量的时候其他线程可见吗,那为什么还是不能保证原子性呢?还是那句话,自增操作是被瓜分为了多个步骤完成的,虽然保证了可见性,但是只要手速够快,依然会出现两个线程同时写同一个值的问题(比如线程1刚刚将a的值更新为100,这时线程2可能也已经执行到更新a的值这条指令了,已经刹不住车了,所以依然会将a的值再更新为一次100)
那要是真的遇到这种情况,那么我们不可能都去写个锁吧?后面,我们会介绍原子类来专门解决这种问题。
最后一个功能就是volatile
会禁止指令重排,也就是说,如果我们操作的是一个volatile
变量,它将不会出现重排序的情况,也就解决了我们最上面的问题。那么它是怎么解决的重排序问题呢?若用volatile修饰共享变量,在编译时,会在指令序列中插入内存屏障
来禁止特定类型的处理器重排序
内存屏障(Memory Barrier)又称内存栅栏,是一个CPU指令,它的作用有两个:
保证特定操作的顺序
保证某些变量的内存可见性(volatile的内存可见性,其实就是依靠这个实现的)
由于编译器和处理器都能执行指令重排的优化,如果在指令间插入一条Memory Barrier则会告诉编译器和CPU,不管什么指令都不能和这条Memory Barrier指令重排序。
屏障类型
指令示例
说明
LoadLoad
Load1;LoadLoad;Load2
保证Load1的读取操作在Load2及后续读取操作之前执行
StoreStore
Store1;StoreStore;Store2
在Store2及其后的写操作执行前,保证Store1的写操作已刷新到主内存
LoadStore
Load1;LoadStore;Store2
在Store2及其后的写操作执行前,保证Load1的读操作已读取结束
StoreLoad
Store1;StoreLoad;Load2
保证load1的写操作已刷新到主内存之后,load2及其后的读操作才能执行
所以volatile
能够保证,之前的指令一定全部执行,之后的指令一定都没有执行,并且前面语句的结果对后面的语句可见。
最后我们来总结一下volatile
关键字的三个特性:
在之后我们的设计模式系列视频中,还会讲解单例模式下volatile
的运用。
happens-before原则 经过我们前面的讲解,相信各位已经了解了JMM内存模型以及重排序等机制带来的优点和缺点,综上,JMM提出了happens-before
(先行发生)原则,定义一些禁止编译优化的场景,来向各位程序员做一些保证,只要我们是按照原则进行编程,那么就能够保持并发编程的正确性。具体如下:
程序次序规则: 同一个线程中,按照程序的顺序,前面的操作happens-before后续的任何操作。
同一个线程内,代码的执行结果是有序的。其实就是,可能会发生指令重排,但是保证代码的执行结果一定是和按照顺序执行得到的一致,程序前面对某一个变量的修改一定对后续操作可见的,不可能会出现前面才把a修改为1,接着读a居然是修改前的结果,这也是程序运行最基本的要求。
监视器锁规则: 对一个锁的解锁操作,happens-before后续对这个锁的加锁操作。
就是无论是在单线程环境还是多线程环境,对于同一个锁来说,一个线程对这个锁解锁之后,另一个线程获取了这个锁都能看到前一个线程的操作结果。比如前一个线程将变量x
的值修改为了12
并解锁,之后另一个线程拿到了这把锁,对之前线程的操作是可见的,可以得到x
是前一个线程修改后的结果12
(所以synchronized是有happens-before规则的)
volatile变量规则: 对一个volatile变量的写操作happens-before后续对这个变量的读操作。
就是如果一个线程先去写一个volatile
变量,紧接着另一个线程去读这个变量,那么这个写操作的结果一定对读的这个变量的线程可见。
线程启动规则: 主线程A启动线程B,线程B中可以看到主线程启动B之前的操作。
在主线程A执行过程中,启动子线程B,那么线程A在启动子线程B之前对共享变量的修改结果对线程B可见。
线程加入规则: 如果线程A执行操作join()
线程B并成功返回,那么线程B中的任意操作happens-before线程Ajoin()
操作成功返回。
传递性规则: 如果A happens-before B,B happens-before C,那么A happens-before C。
那么我们来从happens-before原则的角度,来解释一下下面的程序结果:
1 2 3 4 5 6 7 8 9 10 11 public class Main { private static int a = 0 ; private static int b = 0 ; public static void main (String[] args) { a = 10 ; b = a + 1 ; new Thread (() -> { if (b > 10 ) System.out.println(a); }).start(); } }
首先我们定义以上出现的操作:
A: 将变量a
的值修改为10
B: 将变量b
的值修改为a + 1
C: 主线程启动了一个新的线程,并在新的线程中获取b
,进行判断,如果为true
那么就打印a
首先我们来分析,由于是同一个线程,并且B 是一个赋值操作且读取了A ,那么按照程序次序规则 ,A happens-before B,接着在B之后,马上执行了C,按照线程启动规则 ,在新的线程启动之前,当前线程之前的所有操作对新的线程是可见的,所以 B happens-before C,最后根据传递性规则 ,由于A happens-before B,B happens-before C,所以A happens-before C,因此在新的线程中会输出a
修改后的结果10
。
多线程编程核心 在前面,我们了解了多线程的底层运作机制,我们终于知道,原来多线程环境下存在着如此之多的问题。在JDK5之前,我们只能选择synchronized
关键字来实现锁,而JDK5之后,由于volatile
关键字得到了升级(具体功能就是上一章所描述的),所以并发框架包便出现了,相比传统的synchronized
关键字,我们对于锁的实现,有了更多的选择。
Doug Lea — JUC并发包的作者
如果IT的历史,是以人为主体串接起来的话,那么肯定少不了Doug Lea。这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于纽约州立大学Oswego分校计算机科学系的老大爷。
说他是这个世界上对Java影响力最大的一个人,一点也不为过。因为两次Java历史上的大变革,他都间接或直接的扮演了举足轻重的角色。2004年所推出的Tiger。Tiger广纳了15项JSRs(Java Specification Requests)的语法及标准,其中一项便是JSR-166。JSR-166是来自于Doug编写的util.concurrent包。
那么,从这章开始,就让我们来感受一下,JUC为我们带来了什么。
锁框架 在JDK 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,Lock接口提供了与synchronized关键字类似的同步功能,但需要在使用时手动获取锁和释放锁。
Lock和Condition接口 使用并发包中的锁和我们传统的synchronized
锁不太一样,这里的锁我们可以认为是一把真正意义上的锁,每个锁都是一个对应的锁对象,我只需要向锁对象获取锁或是释放锁即可。我们首先来看看,此接口中定义了什么:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface Lock { void lock () ; void lockInterruptibly () throws InterruptedException; boolean tryLock () ; boolean tryLock (long time, TimeUnit unit) throws InterruptedException; void unlock () ; Condition newCondition () ; }
这里我们可以演示一下,如何使用Lock类来进行加锁和释放锁操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Main { private static int i = 0 ; public static void main (String[] args) throws InterruptedException { Lock testLock = new ReentrantLock (); Runnable action = () -> { for (int j = 0 ; j < 100000 ; j++) { testLock.lock(); i++; testLock.unlock(); } }; new Thread (action).start(); new Thread (action).start(); Thread.sleep(1000 ); System.out.println(i); } }
可以看到,和我们之前使用synchronized
相比,我们这里是真正在操作一个”锁”对象,当我们需要加锁时,只需要调用lock()
方法,而需要释放锁时,只需要调用unlock()
方法。程序运行的最终结果和使用synchronized
锁是一样的。
那么,我们如何像传统的加锁那样,调用对象的wait()
和notify()
方法呢,并发包提供了Condition接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public interface Condition { void await () throws InterruptedException; void awaitUninterruptibly () ; long awaitNanos (long nanosTimeout) throws InterruptedException; boolean await (long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil (Date deadline) throws InterruptedException; void signal () ; void signalAll () ; }
这里我们通过一个简单的例子来演示一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void main (String[] args) throws InterruptedException { Lock testLock = new ReentrantLock (); Condition condition = testLock.newCondition(); new Thread (() -> { testLock.lock(); System.out.println("线程1进入等待状态!" ); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程1等待结束!" ); testLock.unlock(); }).start(); Thread.sleep(100 ); new Thread (() -> { testLock.lock(); System.out.println("线程2开始唤醒其他等待线程" ); condition.signal(); System.out.println("线程2结束" ); testLock.unlock(); }).start(); }
可以发现,Condition对象使用方法和传统的对象使用差别不是很大。
思考: 下面这种情况跟上面有什么不同?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void main (String[] args) throws InterruptedException { Lock testLock = new ReentrantLock (); new Thread (() -> { testLock.lock(); System.out.println("线程1进入等待状态!" ); try { testLock.newCondition().await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程1等待结束!" ); testLock.unlock(); }).start(); Thread.sleep(100 ); new Thread (() -> { testLock.lock(); System.out.println("线程2开始唤醒其他等待线程" ); testLock.newCondition().signal(); System.out.println("线程2结束" ); testLock.unlock(); }).start(); }
通过分析可以得到,在调用newCondition()
后,会生成一个新的Condition对象,并且同一把锁内是可以存在多个Condition对象的(实际上原始的锁机制等待队列只能有一个,而这里可以创建很多个Condition来实现多等待队列),而上面的例子中,实际上使用的是不同的Condition对象,只有对同一个Condition对象进行等待和唤醒操作才会有效,而不同的Condition对象是分开计算的。
最后我们再来讲解一下时间单位,这是一个枚举类,也是位于java.util.concurrent
包下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public enum TimeUnit { NANOSECONDS { public long toNanos (long d) { return d; } public long toMicros (long d) { return d/(C1/C0); } public long toMillis (long d) { return d/(C2/C0); } public long toSeconds (long d) { return d/(C3/C0); } public long toMinutes (long d) { return d/(C4/C0); } public long toHours (long d) { return d/(C5/C0); } public long toDays (long d) { return d/(C6/C0); } public long convert (long d, TimeUnit u) { return u.toNanos(d); } int excessNanos (long d, long m) { return (int )(d - (m*C2)); } },
可以看到时间单位有很多的,比如DAY
、SECONDS
、MINUTES
等,我们可以直接将其作为时间单位,比如我们要让一个线程等待3秒钟,可以像下面这样编写:
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException { Lock testLock = new ReentrantLock (); new Thread (() -> { testLock.lock(); try { System.out.println("等待是否未超时:" +testLock.newCondition().await(1 , TimeUnit.SECONDS)); } catch (InterruptedException e) { e.printStackTrace(); } testLock.unlock(); }).start(); }
当然,Lock类的tryLock方法也是支持使用时间单位的,各位可以自行进行测试。TimeUnit除了可以作为时间单位表示以外,还可以在不同单位之间相互转换:
1 2 3 4 public static void main (String[] args) throws InterruptedException { System.out.println("60秒 = " +TimeUnit.SECONDS.toMinutes(60 ) +"分钟" ); System.out.println("365天 = " +TimeUnit.DAYS.toSeconds(365 ) +" 秒" ); }
也可以更加便捷地使用对象的wait()
方法:
1 2 3 4 5 6 7 public static void main (String[] args) throws InterruptedException { synchronized (Main.class) { System.out.println("开始等待" ); TimeUnit.SECONDS.timedWait(Main.class, 3 ); System.out.println("等待结束" ); } }
我们也可以直接使用它来进行休眠操作:
1 2 3 public static void main (String[] args) throws InterruptedException { TimeUnit.SECONDS.sleep(1 ); }
可重入锁 前面,我们讲解了锁框架的两个核心接口,那么我们接着来看看锁接口的具体实现类,我们前面用到了ReentrantLock,它其实是锁的一种,叫做可重入锁,那么这个可重入代表的是什么意思呢?简单来说,就是同一个线程,可以反复进行加锁操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock (); lock.lock(); lock.lock(); new Thread (() -> { System.out.println("线程2想要获取锁" ); lock.lock(); System.out.println("线程2成功获取到锁" ); }).start(); lock.unlock(); System.out.println("线程1释放了一次锁" ); TimeUnit.SECONDS.sleep(1 ); lock.unlock(); System.out.println("线程1再次释放了一次锁" ); }
可以看到,主线程连续进行了两次加锁操作(此操作是不会被阻塞的),在当前线程持有锁的情况下继续加锁不会被阻塞,并且,加锁几次,就必须要解锁几次,否则此线程依旧持有锁。我们可以使用getHoldCount()
方法查看当前线程的加锁次数:
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock (); lock.lock(); lock.lock(); System.out.println("当前加锁次数:" +lock.getHoldCount()+",是否被锁:" +lock.isLocked()); TimeUnit.SECONDS.sleep(1 ); lock.unlock(); System.out.println("当前加锁次数:" +lock.getHoldCount()+",是否被锁:" +lock.isLocked()); TimeUnit.SECONDS.sleep(1 ); lock.unlock(); System.out.println("当前加锁次数:" +lock.getHoldCount()+",是否被锁:" +lock.isLocked()); }
可以看到,当锁不再被任何线程持有时,值为0
,并且通过isLocked()
方法查询结果为false
。
实际上,如果存在线程持有当前的锁,那么其他线程在获取锁时,是会暂时进入到等待队列的,我们可以通过getQueueLength()
方法获取等待中线程数量的预估值:
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock (); lock.lock(); Thread t1 = new Thread (lock::lock), t2 = new Thread (lock::lock);; t1.start(); t2.start(); TimeUnit.SECONDS.sleep(1 ); System.out.println("当前等待锁释放的线程数:" +lock.getQueueLength()); System.out.println("线程1是否在等待队列中:" +lock.hasQueuedThread(t1)); System.out.println("线程2是否在等待队列中:" +lock.hasQueuedThread(t2)); System.out.println("当前线程是否在等待队列中:" +lock.hasQueuedThread(Thread.currentThread())); }
我们可以通过hasQueuedThread()
方法来判断某个线程是否正在等待获取锁状态。
同样的,Condition也可以进行判断:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock (); Condition condition = lock.newCondition(); new Thread (() -> { lock.lock(); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } lock.unlock(); }).start(); TimeUnit.SECONDS.sleep(1 ); lock.lock(); System.out.println("当前Condition的等待线程数:" +lock.getWaitQueueLength(condition)); condition.signal(); System.out.println("当前Condition的等待线程数:" +lock.getWaitQueueLength(condition)); lock.unlock(); }
通过使用getWaitQueueLength()
方法能够查看同一个Condition目前有多少线程处于等待状态。
公平锁与非公平锁 前面我们了解了如果线程之间争抢同一把锁,会暂时进入到等待队列中,那么多个线程获得锁的顺序是不是一定是根据线程调用lock()
方法时间来定的呢,我们可以看到,ReentrantLock
的构造方法中,是这样写的:
1 2 3 public ReentrantLock () { sync = new NonfairSync (); }
其实锁分为公平锁和非公平锁,默认我们创建出来的ReentrantLock是采用的非公平锁作为底层锁机制。那么什么是公平锁什么又是非公平锁呢?
公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
简单来说,公平锁不让插队,都老老实实排着;非公平锁让插队,但是排队的人让不让你插队就是另一回事了。
我们可以来测试一下公平锁和非公平锁的表现情况:
1 2 3 public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
这里我们选择使用第二个构造方法,可以选择是否为公平锁实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) throws InterruptedException { ReentrantLock lock = new ReentrantLock (false ); Runnable action = () -> { System.out.println("线程 " +Thread.currentThread().getName()+" 开始获取锁..." ); lock.lock(); System.out.println("线程 " +Thread.currentThread().getName()+" 成功获取锁!" ); lock.unlock(); }; for (int i = 0 ; i < 10 ; i++) { new Thread (action, "T" +i).start(); } }
这里我们只需要对比将在1秒后开始获取锁...
和成功获取锁!
的顺序是否一致即可,如果是一致,那说明所有的线程都是按顺序排队获取的锁,如果不是,那说明肯定是有线程插队了。
运行结果可以发现,在公平模式下,确实是按照顺序进行的,而在非公平模式下,一般会出现这种情况:线程刚开始获取锁马上就能抢到,并且此时之前早就开始的线程还在等待状态,很明显的插队行为。
那么,接着下一个问题,公平锁在任何情况下都一定是公平的吗?有关这个问题,我们会留到队列同步器中再进行讨论。
读写锁 除了可重入锁之外,还有一种类型的锁叫做读写锁,当然它并不是专门用作读写操作的锁,它和可重入锁不同的地方在于,可重入锁是一种排他锁,当一个线程得到锁之后,另一个线程必须等待其释放锁,否则一律不允许获取到锁。而读写锁在同一时间,是可以让多个线程获取到锁的,它其实就是针对于读写场景而出现的。
读写锁维护了一个读锁和一个写锁,这两个锁的机制是不同的。
读锁:在没有任何线程占用写锁的情况下,同一时间可以有多个线程加读锁。
写锁:在没有任何线程占用读锁的情况下,同一时间只能有一个线程加写锁。
读写锁也有一个专门的接口:
1 2 3 4 5 6 7 public interface ReadWriteLock { Lock readLock () ; Lock writeLock () ; }
此接口有一个实现类ReentrantReadWriteLock(实现的是ReadWriteLock接口,不是Lock接口,它本身并不是锁),注意我们操作ReentrantReadWriteLock时,不能直接上锁,而是需要获取读锁或是写锁,再进行锁操作:
1 2 3 4 5 public static void main (String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); lock.readLock().lock(); new Thread (lock.readLock()::lock).start(); }
这里我们对读锁加锁,可以看到可以多个线程同时对读锁加锁。
1 2 3 4 5 public static void main (String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); lock.readLock().lock(); new Thread (lock.writeLock()::lock).start(); }
有读锁状态下无法加写锁,反之亦然:
1 2 3 4 5 public static void main (String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); lock.writeLock().lock(); new Thread (lock.readLock()::lock).start(); }
并且,ReentrantReadWriteLock不仅具有读写锁的功能,还保留了可重入锁和公平/非公平机制,比如同一个线程可以重复为写锁加锁,并且必须全部解锁才真正释放锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); lock.writeLock().lock(); lock.writeLock().lock(); new Thread (() -> { lock.writeLock().lock(); System.out.println("成功获取到写锁!" ); }).start(); System.out.println("释放第一层锁!" ); lock.writeLock().unlock(); TimeUnit.SECONDS.sleep(1 ); System.out.println("释放第二层锁!" ); lock.writeLock().unlock(); }
通过之前的例子来验证公平和非公平:
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock (true ); Runnable action = () -> { System.out.println("线程 " +Thread.currentThread().getName()+" 将在1秒后开始获取锁..." ); lock.writeLock().lock(); System.out.println("线程 " +Thread.currentThread().getName()+" 成功获取锁!" ); lock.writeLock().unlock(); }; for (int i = 0 ; i < 10 ; i++) { new Thread (action, "T" +i).start(); } }
可以看到,结果是一致的。
锁降级和锁升级 锁降级指的是写锁降级为读锁。当一个线程持有写锁的情况下,虽然其他线程不能加读锁,但是线程自己是可以加读锁的:
1 2 3 4 5 6 public static void main (String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); lock.writeLock().lock(); lock.readLock().lock(); System.out.println("成功加读锁!" ); }
那么,如果我们在同时加了写锁和读锁的情况下,释放写锁,是否其他的线程就可以一起加读锁了呢?
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); lock.writeLock().lock(); lock.readLock().lock(); new Thread (() -> { System.out.println("开始加读锁!" ); lock.readLock().lock(); System.out.println("读锁添加成功!" ); }).start(); TimeUnit.SECONDS.sleep(1 ); lock.writeLock().unlock(); }
可以看到,一旦写锁被释放,那么主线程就只剩下读锁了,因为读锁可以被多个线程共享,所以这时第二个线程也添加了读锁。而这种操作,就被称之为”锁降级”(注意不是先释放写锁再加读锁,而是持有写锁的情况下申请读锁再释放写锁)
注意在仅持有读锁的情况下去申请写锁,属于”锁升级”,ReentrantReadWriteLock是不支持的:
1 2 3 4 5 6 public static void main (String[] args) throws InterruptedException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); lock.readLock().lock(); lock.writeLock().lock(); System.out.println("所升级成功!" ); }
可以看到线程直接卡在加写锁的那一句了。
队列同步器AQS 注意: 难度巨大,如果对锁的使用不是很熟悉建议之后再来看!
前面我们了解了可重入锁和读写锁,那么它们的底层实现原理到底是什么样的呢?又是大家看到就想跳过的套娃解析环节。
比如我们执行了ReentrantLock的lock()
方法,那它的内部是怎么在执行的呢?
1 2 3 public void lock () { sync.lock(); }
可以看到,它的内部实际上啥都没做,而是交给了Sync对象在进行,并且,不只是这个方法,其他的很多方法都是依靠Sync对象在进行:
1 2 3 public void unlock () { sync.release(1 ); }
那么这个Sync对象是干什么的呢?可以看到,公平锁和非公平锁都是继承自Sync,而Sync是继承自AbstractQueuedSynchronizer,简称队列同步器:
1 2 3 4 5 6 abstract static class Sync extends AbstractQueuedSynchronizer { }static final class NonfairSync extends Sync {}static final class FairSync extends Sync {}
所以,要了解它的底层到底是如何进行操作的,还得看队列同步器,我们就先从这里下手吧!
底层实现 AbstractQueuedSynchronizer(下面称为AQS)是实现锁机制的基础,它的内部封装了包括锁的获取、释放、以及等待队列。
一个锁(排他锁为例)的基本功能就是获取锁、释放锁、当锁被占用时,其他线程来争抢会进入等待队列,AQS已经将这些基本的功能封装完成了,其中等待队列是核心内容,等待队列是由双向链表数据结构实现的,每个等待状态下的线程都可以被封装进结点中并放入双向链表中,而对于双向链表是以队列的形式进行操作的,它像这样:
AQS中有一个head
字段和一个tail
字段分别记录双向链表的头结点和尾结点,而之后的一系列操作都是围绕此队列来进行的。我们先来了解一下每个结点都包含了哪些内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException (); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
在一开始的时候,head
和tail
都是null
,state
为默认值0
:
1 2 3 4 5 private transient volatile Node head;private transient volatile Node tail;private volatile int state;
不用担心双向链表不会进行初始化,初始化是在实际使用时才开始的,先不管,我们接着来看其他的初始化内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset; private static final long headOffset; private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state" )); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head" )); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail" )); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus" )); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next" )); } catch (Exception ex) { throw new Error (ex); } }private final boolean compareAndSetHead (Node update) { return unsafe.compareAndSwapObject(this , headOffset, null , update); }private final boolean compareAndSetTail (Node expect, Node update) {private static final boolean compareAndSetWaitStatus (Node node, int expect, int update) {private static final boolean compareAndSetNext (Node node, Node expect, Node update) {
可以发现,队列同步器由于要使用到CAS算法,所以,直接使用了Unsafe工具类,Unsafe类中提供了CAS操作的方法(Java无法实现,底层由C++实现)所有对AQS类中成员字段的修改,都有对应的CAS操作封装。
现在我们大致了解了一下它的底层运作机制,我们接着来看这个类是如何进行使用的,它提供了一些可重写的方法(根据不同的锁类型和机制,可以自由定制规则,并且为独占式和非独占式锁都提供了对应的方法),以及一些已经写好的模板方法(模板方法会调用这些可重写的方法),使用此类只需要将可重写的方法进行重写,并调用提供的模板方法,从而实现锁功能(学习过设计模式会比较好理解一些)
我们首先来看可重写方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException (); }protected boolean tryRelease (int arg) { throw new UnsupportedOperationException (); }protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException (); }protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException (); }protected boolean isHeldExclusively () { throw new UnsupportedOperationException (); }
可以看到,这些需要重写的方法默认是直接抛出UnsupportedOperationException
,也就是说根据不同的锁类型,我们需要去实现对应的方法,我们可以来看一下ReentrantLock(此类是全局独占式的)中的公平锁是如何借助AQS实现的:
1 2 3 4 5 6 7 8 9 10 11 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } ... }
我们先看看加锁操作干了什么事情,这里直接调用了AQS提供的模板方法acquire()
,我们来看看它在AQS类中的实现细节:
1 2 3 4 5 6 @ReservedStackAccess public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先会调用tryAcquire()
方法(这里是由FairSync类实现的),如果尝试加独占锁失败(返回false了)说明可能这个时候有其他线程持有了此独占锁,所以当前线程得先等着,那么会调用addWaiter()
方法将线程加入等待队列中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
在了解了addWaiter()
方法会将节点加入等待队列之后,我们接着来看,addWaiter()
会返回已经加入的节点,acquireQueued()
在得到返回的节点时,也会进入自旋状态,等待唤醒(也就是开始进入到拿锁的环节了):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @ReservedStackAccess final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
所以,acquire()
中的if条件如果为true,那么只有一种情况,就是等待过程中被中断了,其他任何情况下都是成功获取到独占锁,所以当等待过程被中断时,会调用selfInterrupt()
方法:
1 2 3 static void selfInterrupt () { Thread.currentThread().interrupt(); }
这里就是直接向当前线程发送中断信号了。
上面提到了LockSupport类,它是一个工具类,我们也可以来玩一下这个park
和unpark
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void main (String[] args) throws InterruptedException { Thread t = Thread.currentThread(); new Thread (() -> { try { TimeUnit.SECONDS.sleep(1 ); System.out.println("主线程可以继续运行了!" ); LockSupport.unpark(t); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); System.out.println("主线程被挂起!" ); LockSupport.park(); System.out.println("主线程继续运行!" ); }
这里我们就把公平锁的lock()
方法实现讲解完毕了(让我猜猜,已经晕了对吧,越是到源码越考验个人的基础知识掌握,基础不牢地动山摇)接着我们来看公平锁的tryAcquire()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static final class FairSync extends Sync { @ReservedStackAccess protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
在了解了公平锁的实现之后,是不是感觉有点恍然大悟的感觉,虽然整个过程非常复杂,但是只要理清思路,还是比较简单的。
加锁过程已经OK,我们接着来看,它的解锁过程,unlock()
方法是在AQS中实现的:
1 2 3 public void unlock () { sync.release(1 ); }
1 2 3 4 5 6 7 8 9 10 @ReservedStackAccess public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
那么我们来看看tryRelease()
方法是怎么实现的,具体实现在Sync中:
1 2 3 4 5 6 7 8 9 10 11 12 13 @ReservedStackAccess protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
综上,我们来画一个完整的流程图:
这里我们只讲解了公平锁,有关非公平锁和读写锁,还请各位观众根据我们之前的思路,自行解读。
公平锁一定公平吗? 前面我们讲解了公平锁的实现原理,那么,我们尝试分析一下,在并发的情况下,公平锁一定公平吗?
我们再次来回顾一下tryAcquire()
方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @ReservedStackAccess protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
所以hasQueuedPredecessors()
这个环节容不得半点闪失,否则会直接破坏掉公平性,假如现在出现了这样的情况:
线程1已经持有锁了,这时线程2来争抢这把锁,走到hasQueuedPredecessors()
,判断出为 false
,线程2继续运行,然后线程2肯定获取锁失败(因为锁这时是被线程1占有的),因此就进入到等待队列中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
而碰巧不巧,这个时候线程3也来抢锁了,按照正常流程走到了hasQueuedPredecessors()
方法,而在此方法中:
1 2 3 4 5 6 7 8 public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
因此,线程3这时就紧接着准备开始CAS操作了,又碰巧,这时线程1释放锁了,现在的情况就是,线程3直接开始CAS判断,而线程2还在插入节点状态,结果可想而知,居然是线程3先拿到了锁,这显然是违背了公平锁的公平机制。
一张图就是:
因此公不公平全看hasQueuedPredecessors()
,而此方法只有在等待队列中存在节点时才能保证不会出现问题。所以公平锁,只有在等待队列存在节点时,才是真正公平的。
Condition实现原理 通过前面的学习,我们知道Condition类实际上就是用于代替传统对象的wait/notify操作的,同样可以实现等待/通知模式,并且同一把锁下可以创建多个Condition对象。那么我们接着来看看,它又是如何实现的呢,我们先从单个Condition对象进行分析:
在AQS中,Condition有一个实现类ConditionObject,而这里也是使用了链表实现了条件队列:
1 2 3 4 5 6 7 8 public class ConditionObject implements Condition , java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L ; private transient Node firstWaiter; private transient Node lastWaiter;
这里是直接使用了AQS中的Node类,但是使用的是Node类中的nextWaiter字段连接节点,并且Node的status为CONDITION:
我们知道,当一个线程调用await()
方法时,会进入等待状态,直到其他线程调用signal()
方法将其唤醒,而这里的条件队列,正是用于存储这些处于等待状态的线程。
我们先来看看最关键的await()
方法是如何实现的,为了防止一会绕晕,在开始之前,我们先明确此方法的目标:
只有已经持有锁的线程才可以使用此方法
当调用此方法后,会直接释放锁,无论加了多少次锁
只有其他线程调用signal()
或是被中断时才会唤醒等待中的线程
被唤醒后,需要等待其他线程释放锁,拿到锁之后才可以继续执行,并且会恢复到之前的状态(await之前加了几层锁唤醒后依然是几层锁)
好了,差不多可以上源码了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); }
实际上await()
方法比较中规中矩,大部分操作也在我们的意料之中,那么我们接着来看signal()
方法是如何实现的,同样的,为了防止各位绕晕,先明确signal的目标:
只有持有锁的线程才能唤醒锁所属的Condition等待的线程
优先唤醒条件队列中的第一个,如果唤醒过程中出现问题,接着找往下找,直到找到一个可以唤醒的
唤醒操作本质上是将条件队列中的结点直接丢进AQS等待队列中,让其参与到锁的竞争中
拿到锁之后,线程才能恢复运行
好了,上源码:
1 2 3 4 5 6 7 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); }
1 2 3 4 5 6 7 8 private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
其实最让人不理解的就是倒数第二行,明明上面都正常进入到AQS等待队列了,应该是可以开始走正常流程了,那么这里为什么还要提前来一次unpark呢?
这里其实是为了进行优化而编写,直接unpark会有两种情况:
如果插入结点前,AQS等待队列的队尾节点就已经被取消,则满足wc > 0
如果插入node后,AQS内部等待队列的队尾节点已经稳定,满足tail.waitStatus == 0,但在执行ws > 0之后!compareAndSetWaitStatus(p, ws, Node.SIGNAL)之前被取消,则CAS也会失败,满足compareAndSetWaitStatus(p, ws, Node.SIGNAL) == false
如果这里被提前unpark,那么在await()
方法中将可以被直接唤醒,并跳出while循环,直接开始争抢锁,因为前一个等待结点是被取消的状态,没有必要再等它了。
所以,大致流程下:
只要把整个流程理清楚,还是很好理解的。
自行实现锁类 既然前面了解了那么多AQS的功能,那么我就仿照着这些锁类来实现一个简单的锁:
要求:同一时间只能有一个线程持有锁,不要求可重入(反复加锁无视即可)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 public class Main { public static void main (String[] args) throws InterruptedException { } private static class MyLock implements Lock { private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (isHeldExclusively()) return true ; if (compareAndSetState(0 , arg)){ setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { if (getState() == 0 ) throw new IllegalMonitorStateException (); if (isHeldExclusively()){ setExclusiveOwnerThread(null ); setState(0 ); return true ; } return false ; } @Override protected boolean isHeldExclusively () { return getExclusiveOwnerThread() == Thread.currentThread(); } protected Condition newCondition () { return new ConditionObject (); } } private final Sync sync = new Sync (); @Override public void lock () { sync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } } }
到这里,我们对应队列同步器AQS的讲解就先到此为止了,当然,AQS的全部机制并非仅仅只有我们讲解的内容,一些我们没有提到的内容,还请各位观众自行探索,会有满满的成就感哦~
原子类 前面我们讲解了锁框架的使用和实现原理,虽然比较复杂,但是收获还是很多的(主要是观摩大佬写的代码)这一部分我们就来讲一点轻松的。
前面我们说到,如果要保证i++
的原子性,那么我们的唯一选择就是加锁,那么,除了加锁之外,还有没有其他更好的解决方法呢?JUC为我们提供了原子类,底层采用CAS算法,它是一种用法简单、性能高效、线程安全地更新变量的方式。
所有的原子类都位于java.util.concurrent.atomic
包下。
原子类介绍 常用基本数据类,有对应的原子类封装:
AtomicInteger:原子更新int
AtomicLong:原子更新long
AtomicBoolean:原子更新boolean
那么,原子类和普通的基本类在使用上有没有什么区别呢?我们先来看正常情况下使用一个基本类型:
1 2 3 4 5 6 public class Main { public static void main (String[] args) { int i = 1 ; System.out.println(i++); } }
现在我们使用int类型对应的原子类,要实现同样的代码该如何编写:
1 2 3 4 5 6 public class Main { public static void main (String[] args) { AtomicInteger i = new AtomicInteger (1 ); System.out.println(i.getAndIncrement()); } }
我们可以将int数值封装到此类中(注意必须调用构造方法,它不像Integer那样有装箱机制),并且通过调用此类提供的方法来获取或是对封装的int值进行自增,乍一看,这不就是基本类型包装类嘛,有啥高级的。确实,还真有包装类那味,但是它可不仅仅是简单的包装,它的自增操作是具有原子性的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Main { private static AtomicInteger i = new AtomicInteger (0 ); public static void main (String[] args) throws InterruptedException { Runnable r = () -> { for (int j = 0 ; j < 100000 ; j++) i.getAndIncrement(); System.out.println("自增完成!" ); }; new Thread (r).start(); new Thread (r).start(); TimeUnit.SECONDS.sleep(1 ); System.out.println(i.get()); } }
同样是直接进行自增操作,我们发现,使用原子类是可以保证自增操作原子性的,就跟我们前面加锁一样。怎么会这么神奇?我们来看看它的底层是如何实现的,直接从构造方法点进去:
1 2 3 4 5 6 7 8 private volatile int value;public AtomicInteger (int initialValue) { value = initialValue; }public AtomicInteger () { }
可以看到,它的底层是比较简单的,其实本质上就是封装了一个volatile
类型的int值,这样能够保证可见性,在CAS操作的时候不会出现问题。
1 2 3 4 5 6 7 8 9 private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value" )); } catch (Exception ex) { throw new Error (ex); } }
可以看到最上面是和AQS采用了类似的机制,因为要使用CAS算法更新value的值,所以得先计算出value字段在对象中的偏移地址,CAS直接修改对应位置的内存即可(可见Unsafe类的作用巨大,很多的底层操作都要靠它来完成)
接着我们来看自增操作是怎么在运行的:
1 2 3 public final int getAndIncrement () { return unsafe.getAndAddInt(this , valueOffset, 1 ); }
可以看到这里调用了unsafe.getAndAddInt()
,套娃时间到,我们接着看看Unsafe里面写了什么:
1 2 3 4 5 6 7 8 9 public final int getAndAddInt (Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; }
可以看到这是一个do-while
循环,那么这个循环在做一个什么事情呢?感觉就和我们之前讲解的AQS队列中的机制差不多,也是采用自旋形式,来不断进行CAS操作,直到成功。
可见,原子类底层也是采用了CAS算法来保证的原子性,包括getAndSet
、getAndAdd
等方法都是这样。原子类也直接提供了CAS操作方法,我们可以直接使用:
1 2 3 4 5 6 public static void main (String[] args) throws InterruptedException { AtomicInteger integer = new AtomicInteger (10 ); System.out.println(integer.compareAndSet(30 , 20 )); System.out.println(integer.compareAndSet(10 , 20 )); System.out.println(integer); }
如果想以普通变量的方式来设定值,那么可以使用lazySet()
方法,这样就不采用volatile
的立即可见机制了。
1 2 AtomicInteger integer = new AtomicInteger (1 ); integer.lazySet(2 );
除了基本类有原子类以外,基本类型的数组类型也有原子类:
AtomicIntegerArray:原子更新int数组
AtomicLongArray:原子更新long数组
AtomicReferenceArray:原子更新引用数组
其实原子数组和原子类型一样的,不过我们可以对数组内的元素进行原子操作:
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws InterruptedException { AtomicIntegerArray array = new AtomicIntegerArray (new int []{0 , 4 , 1 , 3 , 5 }); Runnable r = () -> { for (int i = 0 ; i < 100000 ; i++) array.getAndAdd(0 , 1 ); }; new Thread (r).start(); new Thread (r).start(); TimeUnit.SECONDS.sleep(1 ); System.out.println(array.get(0 )); }
在JDK8之后,新增了DoubleAdder
和LongAdder
,在高并发情况下,LongAdder
的性能比AtomicLong
的性能更好,主要体现在自增上,它的大致原理如下:在低并发情况下,和AtomicLong
是一样的,对value值进行CAS操作,但是出现高并发的情况时,AtomicLong
会进行大量的循环操作来保证同步,而LongAdder
会将对value值的CAS操作分散为对数组cells
中多个元素的CAS操作(内部维护一个Cell[] as数组,每个Cell里面有一个初始值为0的long型变量,在高并发时会进行分散CAS,就是不同的线程可以对数组中不同的元素进行CAS自增,这样就避免了所有线程都对同一个值进行CAS),只需要最后再将结果加起来即可。
使用如下:
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws InterruptedException { LongAdder adder = new LongAdder (); Runnable r = () -> { for (int i = 0 ; i < 100000 ; i++) adder.add(1 ); }; for (int i = 0 ; i < 100 ; i++) new Thread (r).start(); TimeUnit.SECONDS.sleep(1 ); System.out.println(adder.sum()); }
由于底层源码比较复杂,这里就不做讲解了。两者的性能对比(这里用到了CountDownLatch,建议学完之后再来看):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class Main { public static void main (String[] args) throws InterruptedException { System.out.println("使用AtomicLong的时间消耗:" +test2()+"ms" ); System.out.println("使用LongAdder的时间消耗:" +test1()+"ms" ); } private static long test1 () throws InterruptedException { CountDownLatch latch = new CountDownLatch (100 ); LongAdder adder = new LongAdder (); long timeStart = System.currentTimeMillis(); Runnable r = () -> { for (int i = 0 ; i < 100000 ; i++) adder.add(1 ); latch.countDown(); }; for (int i = 0 ; i < 100 ; i++) new Thread (r).start(); latch.await(); return System.currentTimeMillis() - timeStart; } private static long test2 () throws InterruptedException { CountDownLatch latch = new CountDownLatch (100 ); AtomicLong atomicLong = new AtomicLong (); long timeStart = System.currentTimeMillis(); Runnable r = () -> { for (int i = 0 ; i < 100000 ; i++) atomicLong.incrementAndGet(); latch.countDown(); }; for (int i = 0 ; i < 100 ; i++) new Thread (r).start(); latch.await(); return System.currentTimeMillis() - timeStart; } }
除了对基本数据类型支持原子操作外,对于引用类型,也是可以实现原子操作的:
1 2 3 4 5 6 7 public static void main (String[] args) throws InterruptedException { String a = "Hello" ; String b = "World" ; AtomicReference<String> reference = new AtomicReference <>(a); reference.compareAndSet(a, b); System.out.println(reference.get()); }
JUC还提供了字段原子更新器,可以对类中的某个指定字段进行原子操作(注意字段必须添加volatile关键字):
1 2 3 4 5 6 7 8 9 10 11 12 public class Main { public static void main (String[] args) throws InterruptedException { Student student = new Student (); AtomicIntegerFieldUpdater<Student> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Student.class, "age" ); System.out.println(fieldUpdater.incrementAndGet(student)); } public static class Student { volatile int age; } }
了解了这么多原子类,是不是感觉要实现保证原子性的工作更加轻松了?
ABA问题及解决方案 我们来想象一下这种场景:
线程1和线程2同时开始对a
的值进行CAS修改,但是线程1的速度比较快,将a的值修改为2之后紧接着又修改回1,这时线程2才开始进行判断,发现a的值是1,所以CAS操作成功。
很明显,这里的1已经不是一开始的那个1了,而是被重新赋值的1,这也是CAS操作存在的问题(无锁虽好,但是问题多多),它只会机械地比较当前值是不是预期值,但是并不会关心当前值是否被修改过,这种问题称之为ABA
问题。
那么如何解决这种ABA
问题呢,JUC提供了带版本号的引用类型,只要每次操作都记录一下版本号,并且版本号不会重复,那么就可以解决ABA问题了:
1 2 3 4 5 6 7 public static void main (String[] args) throws InterruptedException { String a = "Hello" ; String b = "World" ; AtomicStampedReference<String> reference = new AtomicStampedReference <>(a, 1 ); reference.attemptStamp(a, 2 ); System.out.println(reference.compareAndSet(a, b, 2 , 3 )); }
至此,有关原子类的讲解就到这里。
并发容器 简单的讲完了,又该讲难一点的了。
注意: 本版块的重点在于探究并发容器是如何利用锁机制和算法实现各种丰富功能的,我们会忽略一些常规功能的实现细节(比如链表如何插入元素删除元素),而更关注并发容器应对并发场景算法上的实现(比如在多线程环境下的插入操作是按照什么规则进行的)
在单线程模式下,集合类提供的容器可以说是非常方便了,几乎我们每个项目中都能或多或少的用到它们,我们在JavaSE阶段,为各位讲解了各个集合类的实现原理,我们了解了链表、顺序表、哈希表等数据结构,那么,在多线程环境下,这些数据结构还能正常工作吗?
传统容器线程安全吗 我们来测试一下,100个线程同时向ArrayList中添加元素会怎么样:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Main { public static void main (String[] args) { List<String> list = new ArrayList <>(); Runnable r = () -> { for (int i = 0 ; i < 100 ; i++) list.add("lbwnb" ); }; for (int i = 0 ; i < 100 ; i++) new Thread (r).start(); TimeUnit.SECONDS.sleep(1 ); System.out.println(list.size()); } }
不出意外的话,肯定是会报错的:
1 2 3 4 5 6 7 8 9 Exception in thread "Thread-0" java.lang.ArrayIndexOutOfBoundsException: 73 at java.util.ArrayList.add(ArrayList.java:465) at com.test.Main.lambda$main$0(Main.java:13) at java.lang.Thread.run(Thread.java:750) Exception in thread "Thread-19" java.lang.ArrayIndexOutOfBoundsException: 1851 at java.util.ArrayList.add(ArrayList.java:465) at com.test.Main.lambda$main$0(Main.java:13) at java.lang.Thread.run(Thread.java:750) 9773
那么我们来看看报的什么错,从栈追踪信息可以看出,是add方法出现了问题:
1 2 3 4 5 public boolean add (E e) { ensureCapacityInternal(size + 1 ); elementData[size++] = e; return true ; }
也就是说,同一时间其他线程也在疯狂向数组中添加元素,那么这个时候有可能在ensureCapacityInternal
(确认容量足够)执行之后,elementData[size++] = e;
执行之前,其他线程插入了元素,导致size的值超出了数组容量。这些在单线程的情况下不可能发生的问题,在多线程下就慢慢出现了。
我们再来看看比较常用的HashMap呢?
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException { Map<Integer, String> map = new HashMap <>(); for (int i = 0 ; i < 100 ; i++) { int finalI = i; new Thread (() -> { for (int j = 0 ; j < 100 ; j++) map.put(finalI * 1000 + j, "lbwnb" ); }).start(); } TimeUnit.SECONDS.sleep(2 ); System.out.println(map.size()); }
经过测试发现,虽然没有报错,但是最后的结果并不是我们期望的那样,实际上它还有可能导致Entry对象出现环状数据结构,引起死循环。
所以,在多线程环境下,要安全地使用集合类,我们得找找解决方案了。
并发容器介绍 怎么才能解决并发情况下的容器问题呢?我们首先想到的肯定是给方法前面加个synchronzed
关键字,这样总不会抢了吧,在之前我们可以使用Vector或是Hashtable来解决,但是它们的效率实在是太低了,完全依靠锁来解决问题,因此现在已经很少再使它们了,这里也不会再去进行讲解。
JUC提供了专用于并发场景下的容器,比如我们刚刚使用的ArrayList,在多线程环境下是没办法使用的,我们可以将其替换为JUC提供的多线程专用集合类:
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws InterruptedException { List<String> list = new CopyOnWriteArrayList <>(); Runnable r = () -> { for (int i = 0 ; i < 100 ; i++) list.add("lbwnb" ); }; for (int i = 0 ; i < 100 ; i++) new Thread (r).start(); TimeUnit.SECONDS.sleep(1 ); System.out.println(list.size()); }
我们发现,使用了CopyOnWriteArrayList
之后,再没出现过上面的问题。
那么它是如何实现的呢,我们先来看看它是如何进行add()
操作的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
可以看到添加操作是直接上锁,并且会先拷贝一份当前存放元素的数组,然后对数组进行修改,再将此数组替换(CopyOnWrite)接着我们来看读操作:
1 2 3 public E get (int index) { return get(getArray(), index); }
因此,CopyOnWriteArrayList
对于读操作不加锁,而对于写操作是加锁的,类似于我们前面讲解的读写锁机制,这样就可以保证不丢失读性能的情况下,写操作不会出现问题。
接着我们来看对于HashMap的并发容器ConcurrentHashMap
:
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException { Map<Integer, String> map = new ConcurrentHashMap <>(); for (int i = 0 ; i < 100 ; i++) { int finalI = i; new Thread (() -> { for (int j = 0 ; j < 100 ; j++) map.put(finalI * 100 + j, "lbwnb" ); }).start(); } TimeUnit.SECONDS.sleep(1 ); System.out.println(map.size()); }
可以看到这里的ConcurrentHashMap就没有出现之前HashMap的问题了。因为线程之间会争抢同一把锁,我们之前在讲解LongAdder的时候学习到了一种压力分散思想,既然每个线程都想抢锁,那我就干脆多搞几把锁,让你们每个人都能拿到,这样就不会存在等待的问题了,而JDK7之前,ConcurrentHashMap的原理也比较类似,它将所有数据分为一段一段地存储,先分很多段出来,每一段都给一把锁,当一个线程占锁访问时,只会占用其中一把锁,也就是仅仅锁了一小段数据,而其他段的数据依然可以被其他线程正常访问。
这里我们重点讲解JDK8之后它是怎么实现的,它采用了CAS算法配合锁机制实现,我们先来回顾一下JDK8下的HashMap是什么样的结构:
HashMap就是利用了哈希表,哈希表的本质其实就是一个用于存放后续节点的头结点的数组,数组里面的每一个元素都是一个头结点(也可以说就是一个链表),当要新插入一个数据时,会先计算该数据的哈希值,找到数组下标,然后创建一个新的节点,添加到对应的链表后面。当链表的长度达到8时,会自动将链表转换为红黑树,这样能使得原有的查询效率大幅度降低!当使用红黑树之后,我们就可以利用二分搜索的思想,快速地去寻找我们想要的结果,而不是像链表一样挨个去看。
又是基础不牢地动山摇环节,由于ConcurrentHashMap的源码比较复杂,所以我们先从最简单的构造方法开始下手:
我们发现,它的构造方法和HashMap的构造方法有很大的出入,但是大体的结构和HashMap是差不多的,也是维护了一个哈希表,并且哈希表中存放的是链表或是红黑树,所以我们直接来看put()
操作是如何实现的,只要看明白这个,基本上就懂了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public V put (K key, V value) { return putVal(key, value, false ); }final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException (); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node <K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { ...实现细节略 } else if (f instanceof TreeBin) { ...实现细节略 } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
怎么样,是不是感觉看着挺复杂,其实也还好,总结一下就是:
我们接着来看看get()
操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
综上,ConcurrentHashMap的put操作,实际上是对哈希表上的所有头结点元素分别加锁,理论上来说哈希表的长度很大程度上决定了ConcurrentHashMap在同一时间能够处理的线程数量,这也是为什么treeifyBin()
会优先考虑为哈希表进行扩容的原因。显然,这种加锁方式比JDK7的分段锁机制性能更好。
其实这里也只是简单地介绍了一下它的运行机制,ConcurrentHashMap真正的难点在于扩容和迁移操作,我们主要了解的是他的并发执行机制,有关它的其他实现细节,这里暂时不进行讲解。
阻塞队列 除了我们常用的容器类之外,JUC还提供了各种各样的阻塞队列,用于不同的工作场景。
阻塞队列本身也是队列,但是它是适用于多线程环境下的,基于ReentrantLock实现的,它的接口定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public interface BlockingQueue <E> extends Queue <E> { boolean add (E e) ; boolean offer (E e) ; void put (E e) throws InterruptedException; boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException; E take () throws InterruptedException; E poll (long timeout, TimeUnit unit) throws InterruptedException; int remainingCapacity () ; boolean remove (Object o) ; public boolean contains (Object o) ; int drainTo (Collection<? super E> c) ; int drainTo (Collection<? super E> c, int maxElements) ;
比如现在有一个容量为3的阻塞队列,这个时候一个线程put
向其添加了三个元素,第二个线程接着put
向其添加三个元素,那么这个时候由于容量已满,会直接被阻塞,而这时第三个线程从队列中取走2个元素,线程二停止阻塞,先丢两个进去,还有一个还是进不去,所以说继续阻塞。
利用阻塞队列,我们可以轻松地实现消费者和生产者模式,还记得我们在JavaSE中的实战吗?
所谓的生产者消费者模型,是通过一个容器来解决生产者和消费者的强耦合问题。通俗的讲,就是生产者在不断的生产,消费者也在不断的消费,可是消费者消费的产品是生产者生产的,这就必然存在一个中间容器,我们可以把这个容器想象成是一个货架,当货架空的时候,生产者要生产产品,此时消费者在等待生产者往货架上生产产品,而当货架有货物的时候,消费者可以从货架上拿走商品,生产者此时等待货架出现空位,进而补货,这样不断的循环。
通过多线程编程,来模拟一个餐厅的2个厨师和3个顾客,假设厨师炒出一个菜的时间为3秒,顾客吃掉菜品的时间为4秒,窗口上只能放一个菜。
我们来看看,使用阻塞队列如何实现,这里我们就使用ArrayBlockingQueue
实现类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class Main { public static void main (String[] args) throws InterruptedException { BlockingQueue<Object> queue = new ArrayBlockingQueue <>(1 ); Runnable supplier = () -> { while (true ){ try { String name = Thread.currentThread().getName(); System.err.println(time()+"生产者 " +name+" 正在准备餐品..." ); TimeUnit.SECONDS.sleep(3 ); System.err.println(time()+"生产者 " +name+" 已出餐!" ); queue.put(new Object ()); } catch (InterruptedException e) { e.printStackTrace(); break ; } } }; Runnable consumer = () -> { while (true ){ try { String name = Thread.currentThread().getName(); System.out.println(time()+"消费者 " +name+" 正在等待出餐..." ); queue.take(); System.out.println(time()+"消费者 " +name+" 取到了餐品。" ); TimeUnit.SECONDS.sleep(4 ); System.out.println(time()+"消费者 " +name+" 已经将饭菜吃完了!" ); } catch (InterruptedException e) { e.printStackTrace(); break ; } } }; for (int i = 0 ; i < 2 ; i++) new Thread (supplier, "Supplier-" +i).start(); for (int i = 0 ; i < 3 ; i++) new Thread (consumer, "Consumer-" +i).start(); } private static String time () { SimpleDateFormat format = new SimpleDateFormat ("HH:mm:ss" ); return "[" +format.format(new Date ()) + "] " ; } }
可以看到,阻塞队列在多线程环境下的作用是非常明显的,算上ArrayBlockingQueue,一共有三种常用的阻塞队列:
ArrayBlockingQueue:有界带缓冲阻塞队列(就是队列是有容量限制的,装满了肯定是不能再装的,只能阻塞,数组实现)
SynchronousQueue:无缓冲阻塞队列(相当于没有容量的ArrayBlockingQueue,因此只有阻塞的情况)
LinkedBlockingQueue:无界带缓冲阻塞队列(没有容量限制,也可以限制容量,也会阻塞,链表实现)
这里我们以ArrayBlockingQueue为例进行源码解读,我们先来看看构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;public ArrayBlockingQueue (int capacity, boolean fair) { if (capacity <= 0 ) throw new IllegalArgumentException (); this .items = new Object [capacity]; lock = new ReentrantLock (fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
接着我们来看put
和offer
方法是如何实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public boolean offer (E e) { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lock(); try { if (count == items.length) return false ; else { enqueue(e); return true ; } } finally { lock.unlock(); } }public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private E dequeue () { final Object[] items = this .items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
接着我们来看出队操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } }public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
1 2 3 4 5 6 7 8 9 10 private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
可见,如果各位对锁的使用非常熟悉的话,那么在阅读这些源码的时候,就会非常轻松了。
接着我们来看一个比较特殊的队列SynchronousQueue,它没有任何容量,也就是说正常情况下出队必须和入队操作成对出现,我们先来看它的内部,可以看到内部有一个抽象类Transferer,它定义了一个transfer
方法:
1 2 3 4 5 6 7 8 9 10 11 12 abstract static class Transferer <E> { abstract E transfer (E e, boolean timed, long nanos) ; }
乍一看,有点迷惑,难不成还要靠这玩意去实现put和take操作吗?实际上它是直接以生产者消费者模式进行的,由于不需要依靠任何容器结构来暂时存放数据,所以我们可以直接通过transfer
方法来对生产者和消费者之间的数据进行传递。
比如一个线程put一个新的元素进入,这时如果没有其他线程调用take方法获取元素,那么会持续被阻塞,直到有线程取出元素,而transfer
正是需要等生产者消费者双方都到齐了才能进行交接工作,单独只有其中一方都需要进行等待。
1 2 3 4 5 6 7 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); if (transferer.transfer(e, false , 0 ) == null ) { Thread.interrupted(); throw new InterruptedException (); } }
它在公平和非公平模式下,有两个实现,这里我们来看公平模式下的SynchronousQueue是如何实现的:
1 2 3 4 5 6 7 8 9 10 11 12 static final class TransferQueue <E> extends Transferer <E> { transient volatile QNode head; transient volatile QNode tail; static final class QNode { volatile QNode next; volatile Object item; volatile Thread waiter; final boolean isData;
公平模式下,Transferer的实现是TransferQueue,是以先进先出的规则的进行的,内部有一个QNode类来保存等待的线程。
好了,我们直接上transfer()
方法的实现(这里再次提醒各位,多线程环境下的源码分析和单线程的分析不同,我们需要时刻关注当前代码块的加锁状态,如果没有加锁,一定要具有多线程可能会同时运行的意识,这个意识在以后你自己处理多线程问题伴随着你,才能保证你的思路在多线程环境下是正确的):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 E transfer (E e, boolean timed, long nanos) { QNode s = null ; boolean isData = (e != null ); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null ) continue ; if (h == t || t.isData == isData) { QNode tn = t.next; if (t != tail) continue ; if (tn != null ) { advanceTail(t, tn); continue ; } if (timed && nanos <= 0 ) return null ; if (s == null ) s = new QNode (e, isData); if (!t.casNext(null , s)) continue ; advanceTail(t, s); Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { clean(t, s); return null ; } if (!s.isOffList()) { advanceHead(t, s); if (x != null ) s.item = s; s.waiter = null ; } return (x != null ) ? (E)x : e; } else { QNode m = h.next; if (t != tail || m == null || h != head) continue ; Object x = m.item; if (isData == (x != null ) || x == m || !m.casItem(x, e)) { advanceHead(h, m); continue ; } advanceHead(h, m); LockSupport.unpark(m.waiter); return (x != null ) ? (E)x : e; } } }
所以,总结为以下流程:
对于非公平模式下的SynchronousQueue,则是采用的栈结构来存储等待节点,但是思路也是与这里的一致,需要等待并进行匹配操作,各位如果感兴趣可以继续了解一下非公平模式下的SynchronousQueue实现。
在JDK7的时候,基于SynchronousQueue产生了一个更强大的TransferQueue,它保留了SynchronousQueue的匹配交接机制,并且与等待队列进行融合。
我们知道,SynchronousQueue并没有使用锁,而是采用CAS操作保证生产者与消费者的协调,但是它没有容量,而LinkedBlockingQueue虽然是有容量且无界的,但是内部基本都是基于锁实现的,性能并不是很好,这时,我们就可以将它们各自的优点单独拿出来,揉在一起,就成了性能更高的LinkedTransferQueue
1 2 3 4 5 6 public static void main (String[] args) throws InterruptedException { LinkedTransferQueue<String> queue = new LinkedTransferQueue <>(); queue.put("1" ); queue.put("2" ); queue.forEach(System.out::println); }
相比 SynchronousQueue
,它多了一个可以存储的队列,我们依然可以像阻塞队列那样获取队列中所有元素的值,简单来说,LinkedTransferQueue
其实就是一个多了存储队列的SynchronousQueue
。
接着我们来了解一些其他的队列:
PriorityBlockingQueue - 是一个支持优先级的阻塞队列,元素的获取顺序按优先级决定。
DelayQueue - 它能够实现延迟获取元素,同样支持优先级。
我们先来看优先级阻塞队列:
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws InterruptedException { PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue <>(10 , Integer::compare); queue.add(3 ); queue.add(1 ); queue.add(2 ); System.out.println(queue); System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); }
我们的重点是DelayQueue,它能实现延时出队,也就是说当一个元素插入后,如果没有超过一定时间,那么是无法让此元素出队的。
1 2 public class DelayQueue <E extends Delayed > extends AbstractQueue <E> implements BlockingQueue <E> {
可以看到此类只接受Delayed的实现类作为元素:
1 2 3 4 5 public interface Delayed extends Comparable <Delayed> { long getDelay (TimeUnit unit) ; }
这里我们手动实现一个:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private static class Test implements Delayed { private final long time; private final int priority; private final long startTime; private final String data; private Test (long time, int priority, String data) { this .time = TimeUnit.SECONDS.toMillis(time); this .priority = priority; this .startTime = System.currentTimeMillis(); this .data = data; } @Override public long getDelay (TimeUnit unit) { long leftTime = time - (System.currentTimeMillis() - startTime); return unit.convert(leftTime, TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed o) { if (o instanceof Test) return priority - ((Test) o).priority; return 0 ; } @Override public String toString () { return data; } }
接着我们在主方法中尝试使用:
1 2 3 4 5 6 7 8 public static void main (String[] args) throws InterruptedException { DelayQueue<Test> queue = new DelayQueue <>(); queue.add(new Test (1 , 2 , "2号" )); queue.add(new Test (3 , 1 , "1号" )); System.out.println(queue.take()); System.out.println(queue.take()); }
我们来研究一下DelayQueue是如何实现的,首先来看add()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public boolean add (E e) { return offer(e); }public boolean offer (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null ; available.signal(); } return true ; } finally { lock.unlock(); } }public void put (E e) { offer(e); }
可以看到无论是哪种入队操作,都会加锁进行,属于常规操作。我们接着来看take()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0 ) return q.poll(); first = null ; if (leader != null ) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock.unlock(); } }
到此,有关并发容器的讲解就到这里。
下一章我们会继续讲解线程池以及并发工具类。
并发编程进阶 欢迎来到JUC学习的最后一章,王炸当然是放在最后了。
线程池 在我们的程序中,多多少少都会用到多线程技术,而我们以往都是使用Thread类来创建一个新的线程:
1 2 3 4 public static void main (String[] args) { Thread t = new Thread (() -> System.out.println("Hello World!" )); t.start(); }
利用多线程,我们的程序可以更加合理地使用CPU多核心资源,在同一时间完成更多的工作。但是,如果我们的程序频繁地创建线程,由于线程的创建和销毁也需要占用系统资源,因此这样会降低我们整个程序的性能,那么怎么做,才能更高效地使用多线程呢?
我们其实可以将已创建的线程复用,利用池化技术,就像数据库连接池一样,我们也可以创建很多个线程,然后反复地使用这些线程,而不对它们进行销毁。
虽然听起来这个想法比较新颖,但是实际上线程池早已利用到各个地方,比如我们的Tomcat服务器,要在同一时间接受和处理大量的请求,那么就必须要在短时间内创建大量的线程,结束后又进行销毁,这显然会导致很大的开销,因此这种情况下使用线程池显然是更好的解决方案。
由于线程池可以反复利用已有线程执行多线程操作,所以它一般是有容量限制的,当所有的线程都处于工作状态时,那么新的多线程请求会被阻塞,直到有一个线程空闲出来为止,实际上这里就会用到我们之前讲解的阻塞队列。
所以我们可以暂时得到下面一个样子:
当然,JUC提供的线程池肯定没有这么简单,接下来就让我们深入进行了解。
线程池的使用 我们可以直接创建一个新的线程池对象,它已经提前帮助我们实现好了线程的调度机制,我们先来看它的构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
参数稍微有一点多,这里我们依次进行讲解:
corePoolSize:核心线程池大小 ,我们每向线程池提交一个多线程任务时,都会创建一个新的核心线程
,无论是否存在其他空闲线程,直到到达核心线程池大小为止,之后会尝试复用线程资源。当然也可以在一开始就全部初始化好,调用 prestartAllCoreThreads()
即可。
maximumPoolSize:最大线程池大小 ,当目前线程池中所有的线程都处于运行状态,并且这时来了新的多线程任务,如果当前线程池中线程数量小于最大线程池大小,那么会继续创建新的非核心线程
运行,直到最大大小。
keepAliveTime:线程最大空闲时间 ,当一个非核心线程
空闲超过一定时间,会自动销毁。
unit:线程最大空闲时间的时间单位
workQueue:线程等待队列 ,当线程池中确实无法分配线程执行任务的时候,就会将任务暂时存到等待队列中,直到有线程资源可用为止,这里可以使用我们上一章学到的阻塞队列。
threadFactory:线程创建工厂 ,我们可以干涉线程池中线程的创建过程,进行自定义。
handler:拒绝策略 ,当等待队列和线程池都没有空间了,真的不能再来新的任务时,来了个新的多线程任务,那么只能拒绝了,这时就会根据当前设定的拒绝策略进行处理。
最为重要的就是线程池大小的限定了,这个也是很有学问的,合理地分配大小会使得线程池的执行效率事半功倍:
首先我们可以分析一下,线程池执行任务的特性,是CPU 密集型还是 IO 密集型
CPU密集型: 主要是执行计算任务,响应时间很快,CPU一直在运行,这种任务CPU的利用率很高,那么线程数应该是根据 CPU 核心数来决定,CPU 核心数 = 最大同时执行线程数,以 i5-9400F 处理器为例,CPU 核心数为 6,那么最多就能同时执行 6 个线程。
IO密集型: 主要是进行 IO 操作,因为执行 IO 操作的时间比较较长,比如从硬盘读取数据之类的,CPU就得等着IO操作,很容易出现空闲状态,导致 CPU 的利用率不高,这种情况下可以适当增加线程池的大小,让更多的线程可以一起进行IO操作,一般可以配置为CPU核心数的2倍。
这里我们手动创建一个新的线程池看看效果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static void main (String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor (2 , 4 , 3 , TimeUnit.SECONDS, new ArrayBlockingQueue <>(2 )); for (int i = 0 ; i < 6 ; i++) { int finalI = i; executor.execute(() -> { try { System.out.println(Thread.currentThread().getName()+" 开始执行!(" + finalI); TimeUnit.SECONDS.sleep(1 ); System.out.println(Thread.currentThread().getName()+" 已结束!(" +finalI); } catch (InterruptedException e) { e.printStackTrace(); } }); } TimeUnit.SECONDS.sleep(1 ); System.out.println("线程池中线程数量:" +executor.getPoolSize()); TimeUnit.SECONDS.sleep(5 ); System.out.println("线程池中线程数量:" +executor.getPoolSize()); executor.shutdownNow(); }
这里我们创建了一个核心容量为2,最大容量为4,等待队列长度为2,空闲时间为3秒的线程池,现在我们向其中执行6个任务,每个任务都会进行1秒钟休眠,那么当线程池中4个线程都被占用时,还有两个线程就只能进入到等待队列中了,当线程池中4个线程完成后,等待队列中的两个任务才能开始执行。并且在等待5秒后,超过了线程池的最大空闲时间,非核心线程
被回收了,所以线程池中只有2个线程存在。
那么要是等待队列设定为没有容量的SynchronousQueue呢,这个时候会发生什么?
1 2 3 4 5 6 7 8 9 pool-1 -thread-1 开始执行!(0 pool-1 -thread-4 开始执行!(3 pool-1 -thread-3 开始执行!(2 pool-1 -thread-2 开始执行!(1 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.test.Main$$Lambda$1 /1283928880 @682a0b20 rejected from java.util.concurrent.ThreadPoolExecutor@3d075dc0[Running, pool size = 4 , active threads = 4 , queued tasks = 0 , completed tasks = 0 ] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063 ) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830 ) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379 ) at com.test.Main.main(Main.java:15 )
可以看到,前4个任务都可以正常执行,但是到第五个任务时,直接抛出了异常,这其实就是因为等待队列的容量为0,相当于没有容量,那么这个时候,就只能拒绝任务了,拒绝的操作会根据拒绝策略决定。
线程池的拒绝策略默认有以下几个:
AbortPolicy(默认):像上面一样,直接抛异常。
CallerRunsPolicy:直接让提交任务的线程运行这个任务,比如在主线程向线程池提交了任务,那么就直接由主线程执行。
DiscardOldestPolicy:丢弃队列中最近的一个任务,替换为当前任务。
DiscardPolicy:什么也不用做。
这里我们进行一下测试:
1 2 3 4 5 6 public static void main (String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor (2 , 4 , 3 , TimeUnit.SECONDS, new SynchronousQueue <>(), new ThreadPoolExecutor .CallerRunsPolicy());
CallerRunsPolicy策略是谁提交的谁自己执行,所以:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 pool-1 -thread-1 开始执行!(0 pool-1 -thread-2 开始执行!(1 main 开始执行!(4 pool-1 -thread-4 开始执行!(3 pool-1 -thread-3 开始执行!(2 pool-1 -thread-3 已结束!(2 pool-1 -thread-2 已结束!(1 pool-1 -thread-1 已结束!(0 main 已结束!(4 pool-1 -thread-4 已结束!(3 pool-1 -thread-1 开始执行!(5 pool-1 -thread-1 已结束!(5 线程池中线程数量:4 线程池中线程数量:2
可以看到,当队列塞不下时,直接在主线程运行任务,运行完之后再继续向下执行。
我们吧策略修改为DiscardOldestPolicy试试看:
1 2 3 4 5 6 public static void main (String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor (2 , 4 , 3 , TimeUnit.SECONDS, new ArrayBlockingQueue <>(1 ), new ThreadPoolExecutor .DiscardOldestPolicy());
它会移除等待队列中的最近的一个任务,所以可以看到有一个任务实际上是被抛弃了的:
1 2 3 4 5 6 7 8 9 10 11 12 pool-1-thread-1 开始执行!(0 pool-1-thread-4 开始执行!(4 pool-1-thread-3 开始执行!(3 pool-1-thread-2 开始执行!(1 pool-1-thread-1 已结束!(0 pool-1-thread-4 已结束!(4 pool-1-thread-1 开始执行!(5 线程池中线程数量:4 pool-1-thread-3 已结束!(3 pool-1-thread-2 已结束!(1 pool-1-thread-1 已结束!(5 线程池中线程数量:2
比较有意思的是,如果选择没有容量的SynchronousQueue作为等待队列会爆栈:
1 2 3 4 5 6 7 8 9 10 11 12 pool-1 -thread-1 开始执行!(0 pool-1 -thread-3 开始执行!(2 pool-1 -thread-2 开始执行!(1 pool-1 -thread-4 开始执行!(3 Exception in thread "main" java.lang.StackOverflowError at java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:912 ) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371 ) ... pool-1 -thread-1 已结束!(0 pool-1 -thread-2 已结束!(1 pool-1 -thread-4 已结束!(3 pool-1 -thread-3 已结束!(2
这是为什么呢?我们来看看这个拒绝策略的源码:
1 2 3 4 5 6 7 8 9 10 public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
可以看到,它会先对等待队列进行出队操作,但是由于SynchronousQueue压根没容量,所有这个操作毫无意义,然后就会递归执行execute
方法,而进入之后,又发现没有容量不能插入,于是又重复上面的操作,这样就会无限的递归下去,最后就爆栈了。
当然,除了使用官方提供的4种策略之外,我们还可以使用自定义的策略:
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor (2 , 4 , 3 , TimeUnit.SECONDS, new SynchronousQueue <>(), (r, executor1) -> { System.out.println("哎呀,线程池和等待队列都满了,你自己耗子尾汁吧" ); r.run(); });
接着我们来看线程创建工厂,我们可以自己决定如何创建新的线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor (2 , 4 , 3 , TimeUnit.SECONDS, new SynchronousQueue <>(), new ThreadFactory () { int counter = 0 ; @Override public Thread newThread (Runnable r) { return new Thread (r, "我的自定义线程-" +counter++); } }); for (int i = 0 ; i < 4 ; i++) { executor.execute(() -> System.out.println(Thread.currentThread().getName()+" 开始执行!" )); } }
这里传入的Runnable对象就是我们提交的任务,可以看到需要我们返回一个Thread对象,其实就是线程池创建线程的过程,而如何创建这个对象,以及它的一些属性,就都由我们来决定。
各位有没有想过这样一个情况,如果我们的任务在运行过程中出现异常了,那么是不是会导致线程池中的线程被销毁呢?
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor (1 , 1 , 0 , TimeUnit.MILLISECONDS, new LinkedBlockingDeque <>()); executor.execute(() -> { System.out.println(Thread.currentThread().getName()); throw new RuntimeException ("我是异常!" ); }); TimeUnit.SECONDS.sleep(1 ); executor.execute(() -> { System.out.println(Thread.currentThread().getName()); }); }
可以看到,出现异常之后,再次提交新的任务,执行的线程是一个新的线程了。
除了我们自己创建线程池之外,官方也提供了很多的线程池定义,我们可以使用Executors
工具类来快速创建线程池:
1 2 3 public static void main (String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(2 ); }
可以看到它的内部实现为:
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
这里直接将最大线程和核心线程数量设定为一样的,并且等待时间为0,因为压根不需要,并且采用的是一个无界的LinkedBlockingQueue作为等待队列。
使用newSingleThreadExecutor来创建只有一个线程的线程池:
1 2 3 4 public static void main (String[] args) throws InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); }
原理如下:
1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
可以看到这里并不是直接创建的一个ThreadPoolExecutor对象,而是套了一层FinalizableDelegatedExecutorService,那么这个又是什么东西呢?
1 2 3 4 5 6 7 8 9 static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super (executor); } protected void finalize () { super .shutdown(); } }
1 2 3 4 5 6 static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute (Runnable command) { e.execute(command); } public void shutdown () { e.shutdown(); } public List<Runnable> shutdownNow () { return e.shutdownNow(); }
所以,下面两种写法的区别在于:
1 2 3 4 public static void main (String[] args) throws InterruptedException { ExecutorService executor1 = Executors.newSingleThreadExecutor(); ExecutorService executor2 = Executors.newFixedThreadPool(1 ); }
前者实际上是被代理了,我们没办法直接修改前者的相关属性,显然使用前者创建只有一个线程的线程池更加专业和安全(可以防止属性被修改)一些。
最后我们来看newCachedThreadPool
方法:
1 2 3 4 public static void main (String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); }
我们来看看它的实现:
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
可以看到,核心线程数为0,那么也就是说所有的线程都是非核心线程
,也就是说线程空闲时间超过1秒钟,一律销毁。但是它的最大容量是Integer.MAX_VALUE
,也就是说,它可以无限制地增长下去,所以这玩意一定要慎用。
执行带返回值的任务 一个多线程任务不仅仅可以是void无返回值任务,比如我们现在需要执行一个任务,但是我们需要在任务执行之后得到一个结果,这个时候怎么办呢?
这里我们就可以使用到Future了,它可以返回任务的计算结果,我们可以通过它来获取任务的结果以及任务当前是否完成:
1 2 3 4 5 6 public static void main (String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(() -> "我是字符串!" ); System.out.println(future.get()); executor.shutdown(); }
当然结果也可以一开始就定义好,然后等待Runnable执行完之后再返回:
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } }, "我是字符串!" ); System.out.println(future.get()); executor.shutdown(); }
还可以通过传入FutureTask对象的方式:
1 2 3 4 5 6 7 public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newSingleThreadExecutor(); FutureTask<String> task = new FutureTask <>(() -> "我是字符串!" ); service.submit(task); System.out.println(task.get()); executor.shutdown(); }
我们可以还通过Future对象获取当前任务的一些状态:
1 2 3 4 5 6 7 8 public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(() -> "都看到这里了,不赏UP主一个一键三连吗?" ); System.out.println(future.get()); System.out.println("任务是否执行完成:" +future.isDone()); System.out.println("任务是否被取消:" +future.isCancelled()); executor.shutdown(); }
我们来试试看在任务执行途中取消任务:
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(() -> { TimeUnit.SECONDS.sleep(10 ); return "这次一定!" ; }); System.out.println(future.cancel(true )); System.out.println(future.isCancelled()); executor.shutdown(); }
执行定时任务 既然线程池怎么强大,那么线程池能不能执行定时任务呢?我们之前如果需要执行一个定时任务,那么肯定会用到Timer和TimerTask,但是它只会创建一个线程处理我们的定时任务,无法实现多线程调度,并且它无法处理异常情况一旦抛出未捕获异常那么会直接终止,显然我们需要一个更加强大的定时器。
JDK5之后,我们可以使用ScheduledThreadPoolExecutor来提交定时任务,它继承自ThreadPoolExecutor,并且所有的构造方法都必须要求最大线程池容量为Integer.MAX_VALUE,并且都是采用的DelayedWorkQueue作为等待队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue ()); }public ScheduledThreadPoolExecutor (int corePoolSize, ThreadFactory threadFactory) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue (), threadFactory); }public ScheduledThreadPoolExecutor (int corePoolSize, RejectedExecutionHandler handler) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue (), handler); }public ScheduledThreadPoolExecutor (int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue (), threadFactory, handler); }
我们来测试一下它的方法,这个方法可以提交一个延时任务,只有到达指定时间之后才会开始:
1 2 3 4 5 6 7 8 public static void main (String[] args) throws ExecutionException, InterruptedException { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor (1 ); executor.schedule(() -> System.out.println("HelloWorld!" ), 3 , TimeUnit.SECONDS); executor.shutdown(); }
我们也可以像之前一样,传入一个Callable对象,用于接收返回值:
1 2 3 4 5 6 7 8 public static void main (String[] args) throws ExecutionException, InterruptedException { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor (2 ); ScheduledFuture<String> future = executor.schedule(() -> "????" , 3 , TimeUnit.SECONDS); System.out.println("任务剩余等待时间:" +future.getDelay(TimeUnit.MILLISECONDS) / 1000.0 + "s" ); System.out.println("任务执行结果:" +future.get()); executor.shutdown(); }
可以看到schedule
方法返回了一个ScheduledFuture对象,和Future一样,它也支持返回值的获取、包括对任务的取消同时还支持获取剩余等待时间。
那么如果我们希望按照一定的频率不断执行任务呢?
1 2 3 4 5 6 public static void main (String[] args) throws ExecutionException, InterruptedException { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor (2 ); executor.scheduleAtFixedRate(() -> System.out.println("Hello World!" ), 3 , 1 , TimeUnit.SECONDS); }
Executors也为我们预置了newScheduledThreadPool方法用于创建线程池:
1 2 3 4 public static void main (String[] args) throws ExecutionException, InterruptedException { ScheduledExecutorService service = Executors.newScheduledThreadPool(1 ); service.schedule(() -> System.out.println("Hello World!" ), 1 , TimeUnit.SECONDS); }
线程池实现原理 前面我们了解了线程池的使用,那么接着我们来看看它的详细实现过程,结构稍微有点复杂,坐稳,发车了。
这里需要首先介绍一下ctl变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf (int c) { return c & ~CAPACITY; } private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }
我们先从最简单的入手,看看在调用execute
方法之后,线程池会做些什么:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private final BlockingQueue<Runnable> workQueue;public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
是不是感觉思路还挺清晰的,我们接着来看addWorker
是怎么创建和执行任务的,又是一大堆代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
接着我们来看Worker类是如何实现的,它继承自AbstractQueuedSynchronizer,时隔两章,居然再次遇到AQS,那也就是说,它本身就是一把锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } ... }
最后我们来看看一个Worker到底是怎么在进行任务的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
那么它是怎么从阻塞队列里面获取任务的呢:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
虽然我们的源码解读越来越深,但是只要各位的思路不断,依然是可以继续往下看的。到此,有关execute()
方法的源码解读,就先到这里。
接着我们来看当线程池关闭时会做什么事情:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
1 2 3 4 5 6 7 8 private void advanceRunState (int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
而shutdownNow()
方法也差不多,但是这里会更直接一些:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
1 2 3 4 5 6 7 8 9 10 private void interruptWorkers () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
最后的最后,我们再来看看tryTerminate()
是怎么完完全全终止掉一个线程池的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
OK,有关线程池的实现原理,我们就暂时先介绍到这里,关于更高级的定时任务线程池,这里就不做讲解了。
并发工具类 计数器锁 CountDownLatch 多任务同步神器。它允许一个或多个线程,等待其他线程完成工作,比如现在我们有这样的一个需求:
有20个计算任务,我们需要先将这些任务的结果全部计算出来,每个任务的执行时间未知
当所有任务结束之后,立即整合统计最终结果
要实现这个需求,那么有一个很麻烦的地方,我们不知道任务到底什么时候执行完毕,那么可否将最终统计延迟一定时间进行呢?但是最终统计无论延迟多久进行,要么不能保证所有任务都完成,要么可能所有任务都完成了而这里还在等。
所以说,我们需要一个能够实现子任务同步的工具。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch (20 ); for (int i = 0 ; i < 20 ; i++) { int finalI = i; new Thread (() -> { try { Thread.sleep((long ) (2000 * new Random ().nextDouble())); System.out.println("子任务" + finalI +"执行完成!" ); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); }).start(); } latch.await(); System.out.println("所有子任务都完成!任务完成!!!" ); }
我们在调用await()
方法之后,实际上就是一个等待计数器衰减为0的过程,而进行自减操作则由各个子线程来完成,当子线程完成工作后,那么就将计数器-1,所有的子线程完成之后,计数器为0,结束等待。
那么它是如何实现的呢?实现 原理非常简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } } private final Sync sync; public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void countDown () { sync.releaseShared(1 ); } public long getCount () { return sync.getCount(); } public String toString () { return super .toString() + "[Count = " + sync.getCount() + "]" ; } }
在深入讲解之前,我们先大致了解一下CountDownLatch的基本实现思路:
利用共享锁实现
在一开始的时候就是已经上了count层锁的状态,也就是state = count
await()
就是加共享锁,但是必须state
为0
才能加锁成功,否则按照AQS的机制,会进入等待队列阻塞,加锁成功后结束阻塞
countDown()
就是解1
层锁,也就是靠这个方法一点一点把state
的值减到0
由于我们前面只对独占锁进行了讲解,没有对共享锁进行讲解,这里还是稍微提一下它:
1 2 3 4 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
1 2 3 4 5 6 7 8 9 10 11 12 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
我们接着来看,它的countdown过程:
1 2 3 4 5 6 7 8 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
可能看完之后还是有点乱,我们再来理一下:
共享锁是线程共享的,同一时刻能有多个线程拥有共享锁。
如果一个线程刚获取了共享锁,那么在其之后等待的线程也很有可能能够获取到锁,所以得传播下去继续尝试唤醒后面的结点,不像独占锁,独占的压根不需要考虑这些。
如果一个线程刚释放了锁,不管是独占锁还是共享锁,都需要唤醒后续等待结点的线程。
回到CountDownLatch,再结合整个AQS共享锁的实现机制,进行一次完整的推导,看明白还是比较简单的。
循环屏障 CyclicBarrier 好比一场游戏,我们必须等待房间内人数足够之后才能开始,并且游戏开始之后玩家需要同时进入游戏以保证公平性。
假如现在游戏房间内一共5人,但是游戏开始需要10人,所以我们必须等待剩下5人到来之后才能开始游戏,并且保证游戏开始时所有玩家都是同时进入,那么怎么实现这个功能呢?我们可以使用CyclicBarrier,翻译过来就是循环屏障,那么这个屏障正式为了解决这个问题而出现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) { CyclicBarrier barrier = new CyclicBarrier (10 , () -> System.out.println("飞机马上就要起飞了,各位特种兵请准备!" )); for (int i = 0 ; i < 10 ; i++) { int finalI = i; new Thread (() -> { try { Thread.sleep((long ) (2000 * new Random ().nextDouble())); System.out.println("玩家 " + finalI +" 进入房间进行等待... (" +barrier.getNumberWaiting()+"/10)" ); barrier.await(); System.out.println("玩家 " + finalI +" 进入游戏!" ); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } }
可以看到,循环屏障会不断阻挡线程,直到被阻挡的线程足够多时,才能一起冲破屏障,并且在冲破屏障时,我们也可以做一些其他的任务。这和人多力量大的道理是差不多的,当人足够多时方能冲破阻碍,到达美好的明天。当然,屏障由于是可循环的,所以它在被冲破后,会重新开始计数,继续阻挡后续的线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) { CyclicBarrier barrier = new CyclicBarrier (5 ); for (int i = 0 ; i < 10 ; i++) { int finalI = i; new Thread (() -> { try { Thread.sleep((long ) (2000 * new Random ().nextDouble())); System.out.println("玩家 " + finalI +" 进入房间进行等待... (" +barrier.getNumberWaiting()+"/5)" ); barrier.await(); System.out.println("玩家 " + finalI +" 进入游戏!" ); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } }
可以看到,通过使用循环屏障,我们可以对线程进行一波一波地放行,每一波都放行5个线程,当然除了自动重置之外,我们也可以调用reset()
方法来手动进行重置操作,同样会重新计数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void main (String[] args) throws InterruptedException { CyclicBarrier barrier = new CyclicBarrier (5 ); for (int i = 0 ; i < 3 ; i++) new Thread (() -> { try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); Thread.sleep(500 ); System.out.println("当前屏障前的等待线程数:" +barrier.getNumberWaiting()); barrier.reset(); System.out.println("重置后屏障前的等待线程数:" +barrier.getNumberWaiting()); }
可以看到,在调用reset()
之后,处于等待状态下的线程,全部被中断并且抛出BrokenBarrierException异常,循环屏障等待线程数归零。那么要是处于等待状态下的线程被中断了呢?屏障的线程等待数量会不会自动减少?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) throws InterruptedException { CyclicBarrier barrier = new CyclicBarrier (10 ); Runnable r = () -> { try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }; Thread t = new Thread (r); t.start(); t.interrupt(); new Thread (r).start(); }
可以看到,当await()
状态下的线程被中断,那么屏障会直接变成损坏状态,一旦屏障损坏,那么这一轮就无法再做任何等待操作了。也就是说,本来大家计划一起合力冲破屏障,结果有一个人摆烂中途退出了,那么所有人的努力都前功尽弃,这一轮的屏障也不可能再被冲破了(所以CyclicBarrier告诉我们,不要做那个害群之马,要相信你的团队,不然没有好果汁吃),只能进行reset()
重置操作进行重置才能恢复正常。
乍一看,怎么感觉和之前讲的CountDownLatch有点像,好了,这里就得区分一下了,千万别搞混:
CountDownLatch:
它只能使用一次,是一个一次性的工具
它是一个或多个线程用于等待其他线程完成的同步工具
CyclicBarrier
它可以反复使用,允许自动或手动重置计数
它是让一定数量的线程在同一时间开始运行的同步工具
我们接着来看循环屏障的实现细节:
public class CyclicBarrier { private static class Generation { boolean broken = false ; } private final ReentrantLock lock = new ReentrantLock (); private final Condition trip = lock.newCondition(); private final int parties; private final Runnable barrierCommand; private Generation generation = new Generation (); private int count; public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException (); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; } public CyclicBarrier (int parties) { this (parties, null ); } private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation (); } private void breakBarrier () { generation.broken = true ; count = parties; trip.signalAll(); } public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error (toe); } } public int await (long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true , unit.toNanos(timeout)); } private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException (); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException (); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException (); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException (); } } } finally { lock.unlock(); } } public int getParties () { return parties; } public boolean isBroken () { final ReentrantLock lock = this .lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } public void reset () { final ReentrantLock lock = this .lock; lock.lock(); try { breakBarrier(); nextGeneration(); } finally { lock.unlock(); } } public int getNumberWaiting () { final ReentrantLock lock = this .lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
看完了CyclicBarrier的源码之后,是不是感觉比CountDownLatch更简单一些?
信号量 Semaphore 还记得我们在《操作系统》中学习的信号量机制吗?它在解决进程之间的同步问题中起着非常大的作用。
信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。
通过使用信号量,我们可以决定某个资源同一时间能够被访问的最大线程数,它相当于对某个资源的访问进行了流量控制。简单来说,它就是一个可以被N个线程占用的排它锁(因此也支持公平和非公平模式),我们可以在最开始设定Semaphore的许可证数量,每个线程都可以获得1个或n个许可证,当许可证耗尽或不足以供其他线程获取时,其他线程将被阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void main (String[] args) throws ExecutionException, InterruptedException { Semaphore semaphore = new Semaphore (2 ); for (int i = 0 ; i < 3 ; i++) { new Thread (() -> { try { semaphore.acquire(); System.out.println("许可证申请成功!" ); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws ExecutionException, InterruptedException { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 2 ; i++) new Thread (() -> { try { semaphore.acquire(2 ); System.out.println("许可证申请成功!" ); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
我们也可以通过Semaphore获取一些常规信息:
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 5 ; i++) new Thread (semaphore::acquireUninterruptibly).start(); Thread.sleep(500 ); System.out.println("剩余许可证数量:" +semaphore.availablePermits()); System.out.println("是否存在线程等待许可证:" +(semaphore.hasQueuedThreads() ? "是" : "否" )); System.out.println("等待许可证线程数量:" +semaphore.getQueueLength()); }
我们可以手动回收掉所有的许可证:
1 2 3 4 5 6 public static void main (String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore (3 ); new Thread (semaphore::acquireUninterruptibly).start(); Thread.sleep(500 ); System.out.println("收回剩余许可数量:" +semaphore.drainPermits()); }
这里我们模拟一下,比如现在有10个线程同时进行任务,任务要求是执行某个方法,但是这个方法最多同时只能由5个线程执行,这里我们使用信号量就非常合适。
数据交换 Exchanger 线程之间的数据传递也可以这么简单。
使用Exchanger,它能够实现线程之间的数据交换:
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws InterruptedException { Exchanger<String> exchanger = new Exchanger <>(); new Thread (() -> { try { System.out.println("收到主线程传递的交换数据:" +exchanger.exchange("AAAA" )); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); System.out.println("收到子线程传递的交换数据:" +exchanger.exchange("BBBB" )); }
在调用exchange
方法后,当前线程会等待其他线程调用同一个exchanger对象的exchange
方法,当另一个线程也调用之后,方法会返回对方线程传入的参数。
可见功能还是比较简单的。
Fork/Join框架 在JDK7时,出现了一个新的框架用于并行执行任务,它的目的是为了把大型任务拆分为多个小任务,最后汇总多个小任务的结果,得到整大任务的结果,并且这些小任务都是同时在进行,大大提高运算效率。Fork就是拆分,Join就是合并。
我们来演示一下实际的情况,比如一个算式:18x7+36x8+9x77+8x53,可以拆分为四个小任务:18x7、36x8、9x77、8x53,最后我们只需要将这四个任务的结果加起来,就是我们原本算式的结果了,有点归并排序的味道。
它不仅仅只是拆分任务并使用多线程,而且还可以利用工作窃取算法,提高线程的利用率。
工作窃取算法: 是指某个线程从其他队列里窃取任务来执行。一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务待处理。干完活的线程与其等着,不如帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
现在我们来看看如何使用它,这里以计算1-1000的和为例,我们可以将其拆分为8个小段的数相加,比如1-125、126-250… ,最后再汇总即可,它也是依靠线程池来实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class Main { public static void main (String[] args) throws InterruptedException, ExecutionException { ForkJoinPool pool = new ForkJoinPool (); System.out.println(pool.submit(new SubTask (1 , 1000 )).get()); } private static class SubTask extends RecursiveTask <Integer> { private final int start; private final int end; public SubTask (int start, int end) { this .start = start; this .end = end; } @Override protected Integer compute () { if (end - start > 125 ) { SubTask subTask1 = new SubTask (start, (end + start) / 2 ); subTask1.fork(); SubTask subTask2 = new SubTask ((end + start) / 2 + 1 , end); subTask2.fork(); return subTask1.join() + subTask2.join(); } else { System.out.println(Thread.currentThread().getName()+" 开始计算 " +start+"-" +end+" 的值!" ); int res = 0 ; for (int i = start; i <= end; i++) { res += i; } return res; } } } }
1 2 3 4 5 6 7 8 9 ForkJoinPool-1-worker-2 开始计算 1-125 的值! ForkJoinPool-1-worker-2 开始计算 126-250 的值! ForkJoinPool-1-worker-0 开始计算 376-500 的值! ForkJoinPool-1-worker-6 开始计算 751-875 的值! ForkJoinPool-1-worker-3 开始计算 626-750 的值! ForkJoinPool-1-worker-5 开始计算 501-625 的值! ForkJoinPool-1-worker-4 开始计算 251-375 的值! ForkJoinPool-1-worker-7 开始计算 876-1000 的值! 500500
可以看到,结果非常正确,但是整个计算任务实际上是拆分为了8个子任务同时完成的,结合多线程,原本的单线程任务,在多线程的加持下速度成倍提升。
包括Arrays工具类提供的并行排序也是利用了ForkJoinPool来实现:
1 2 3 4 5 6 7 8 9 10 11 public static void parallelSort (byte [] a) { int n = a.length, p, g; if (n <= MIN_ARRAY_SORT_GRAN || (p = ForkJoinPool.getCommonPoolParallelism()) == 1 ) DualPivotQuicksort.sort(a, 0 , n - 1 ); else new ArraysParallelSortHelpers .FJByte.Sorter (null , a, new byte [n], 0 , n, 0 , ((g = n / (p << 2 )) <= MIN_ARRAY_SORT_GRAN) ? MIN_ARRAY_SORT_GRAN : g).invoke(); }
并行排序的性能在多核心CPU环境下,肯定是优于普通排序的,并且排序规模越大优势越显著。
至此,并发编程篇完结。