Dionysun

愚钝的人

笨鸟先飞,勤能补拙

G1

G1

G1也是属于并发分代收集器。

堆区域:

  • 年轻代区域
  • Eden区域 - 新分配的对象 - TLAB
  • Survivor区域 - 年轻代GC存活后但不需要晋升的对象 - RSet
  • 老年代区域
  • 晋升到老年代的对象
  • 直接分配到老年代的大对象,占用多个区域的对象,通常是大于Region的一半 image.png

G1的Collector可以分为两个大部分:

  • 全局并发标记 (global concurrent marking)
  • 拷贝存活对象(evacuation)

其分代收集模式又有,区别在于选定的CSet:

  • Young GC
  • Mixed GC
  • Full GC 前两者都是标记-复制作为回收算法

其中的几个关键技术有:

  • 停顿预测模型
  • TLAB
  • RSet
  • SATB & SATB MarkQueue & Write barrier
  • Safe Point

全局并发标记

全局并发标记是基于SATB形式的并发标记,具体来说,其分为下面几个阶段:

  1. 初始标记(Initial Marking): STW 扫描GC roots集合,标记所有从根集合可直接到达的对象并将它们的字段压入扫描栈 (marking stack)中等待后续扫描。G1使用外部的bitmap来记录mark信息,而不是用对象头的mark word里的mark bit。在分代式G1模式中,初始标记阶段借用Young GC的暂停,即没有额外、单独的STW。并且在Mixed GC中,还会根据Youn GC扫描后的Survivor的RSet作为根更新待扫描的GC Roots,将指向的对象同样压入扫描栈。
  2. 并发标记(Concurrent marking): Concurrent 不断从扫描栈取出引用递归扫描整个堆里的对象图。每扫描到一个对象就会对其标记,并将其字段压入扫描栈。重复扫描过程直到扫描栈清空。过程中还会扫描SATB write barrier所记录下的引用。
  3. 最终标记(final marking,在实现中也叫remarking): STW 在完成并发标记后,每个Java线程还会有一些剩下的SATB write barrier记录的引用尚未处理。这个阶段就负责把剩下的引用处理完。同时这个阶段也进行弱引用处理(reference processing)。注意这个暂停与CMS的remark有一个本质上的区别,那就是这个暂停只需要扫描SATB buffer,而CMS的remark需要重新扫描mod-union table里的dirty card外加整个根集合,而此时整个young gen(不管对象死活)都会被当作根集合的一部分,因而CMS remark有可能会非常慢
  4. 清理(cleanup): STW 清点和重置标记状态。这个阶段有点像mark-sweep中的sweep阶段,不过不是在堆上sweep实际对象,而是在marking bitmap里统计每个region被标记为活的对象有多少。这个阶段如果发现完全没有活对象的region就会将其整体回收到可分配region列表中。

拷贝存活对象

Evacuation阶段是全暂停的。它负责把一部分region里的活对象拷贝到空region里去,然后回收原本的region的空间。
Evacuation阶段可以自由选择任意多个region来独立收集构成收集集合(collection set,简称CSet),靠per-region remembered set(简称RSet)实现。这是regional garbage collector的特征。
在选定CSet后,evacuation其实就跟ParallelScavenge的young GC的算法类似,采用并行copying(或者叫scavenging)算法把CSet里每个region里的活对象拷贝到新的region里,整个过程完全暂停。从这个意义上说,G1的evacuation跟传统的mark-compact算法的compaction完全不同:前者会自己从根集合遍历对象图来判定对象的生死,不需要依赖global concurrent marking的结果,有就用,没有拉倒;而后者则依赖于之前的mark阶段对对象生死的判定。

注意,该部分也与初始标记阶段相同,复用了Young GC的代码。

TLAB

TLAB的核心思想在于优化各个线程从堆内存中新建对象的过程,通过估算每轮GC内每个线程使用的内存大小,提前分配好内存给线程,提高分配效率,避免由于撞针分配及CAS操作带来的性能损失,称之为TLAB(Thread Local Allocate Buffer)。注意,TLAB是线程私有的。

JVM 对象堆内存分配流程简述

具体来说,当分配一个对象堆内存空间时(new Object()),CollectedHeap首先检查是否启用了TLAB,如果启用了,则会尝试TLAB分配 - 如果当前线程的 TLAB 大小足够,那么从线程当前的 TLAB 中分配;如果不够,但是当前 TLAB 剩余空间小于最大浪费空间限制,则从堆上(一般是 Eden 区) 重新申请一个新的 TLAB 进行分配。否则,直接在 TLAB 外进行分配。TLAB 外的分配策略,不同的 GC 算法不同。例如G1:

  1. 对于超过1/2 region大小的对象,直接分配在Humongous区域
  2. 根据当前用户线程的状态进行region下标的分配

G1 与 TLAB 的关联

new新对象 -> TLAB分配 -> TLAB不可容纳 -> TLAB外进行分配 -> 从Eden区获取新的TLAB -> Eden区不够 -> 判断回收时间,是触发Young GC还是将空闲分区加入Eden区再次分配

GC回收 -> 将Eden Region变为free region -> 更改相应TLAB参数,重新分配 -> 循环进行 new 新对象 draw.io.drawio.png

TLAB的生命周期

TLAB 是线程私有的,线程初始化的时候,会创建并初始化 TLAB。同时,在 GC 扫描对象发生之后,线程第一次尝试分配对象的时候,也会创建并初始化 TLAB,即再分配。 TLAB 生命周期停止(TLAB 声明周期停止不代表内存被回收,只是代表这个 TLAB 不再被这个线程私有管理,即可以通过EMA计算大小后再分配)在:

  • 当前 TLAB 不够分配,并且剩余空间小于最大浪费空间限制,那么这个 TLAB 会被退回 Eden,重新申请一个新的
  • 发生 GC 的时候,TLAB 被回收。

==最大浪费空间可以看作是可容忍的internal fragmentation大小,即小于这个数值的内存碎片,是可以被容忍的==

直接从堆上分配

直接从堆上分配是最慢的分配方式。一种情况就是,如果当前 TLAB 剩余空间大于当前最大浪费空间限制,直接在堆上分配。并且,还会增加当前最大浪费空间限制,每次有这样的分配就会增加 TLABWasteIncrement 的大小,这样在一定次数的直接堆上分配之后,当前最大浪费空间限制一直增大会导致当前 TLAB 剩余空间小于当前最大浪费空间限制,从而申请新的 TLAB 进行分配。

返还TLAB给Eden区时,填充Dummy Object的必要性

当GC扫描时,此时的TLAB要返还给Eden区,即堆区域。

具体而言,填充Dummy Object实际发生在GC之前,以G1举例,此时要先确保堆内存是可解析的,即将所有线程的TLAB填充Dummy Object后,返还给堆,核心在于更快速的扫描堆上的对象,以及采样一些东西利于接下来TLAB大小的计算。

考虑不填充dummy object,此时堆内存中对于已经存在的对象不会存在影响,但是对于未使用的部分,GC线程并不知道其中是否会有对象,就会逐字节的扫描,影响效率。

填充Dummy Object是以对象头中的Mark word的一个int[]数组决定的,因此不能超过int[]数组的最大大小

TLAB的再分配

通过EMA采样(Exponential Moving Averange, 指数平均数),得到新的TLAB大小期望值,该期望值与Eden区的总大小有关,在[#G1 与 TLAB 的关联](#G1 与 TLAB 的关联 “wikilink”)中,概述了这个Eden区的大小的变化过程

具体来说,EMA算法的核心在于最小权重,即最小权重越大,变化得越快,受历史数据影响越小

每个线程 TLAB 初始大小 = Eden区大小 / (线程单个 GC 轮次内最多从 Eden 申请多少次 TLAB * 当前 GC 分配线程个数 EMA)

GC 后,重新计算 TLAB 大小 = Eden区大小 / (线程单个 GC 轮次内最多从 Eden 申请多少次 TLAB * 当前 GC 分配线程个数 EMA)

RSet - Remember Set

G1将堆内存划分为大小相等的region,新创建的对象都是放在新生代的Eden区。为了加速Initial Marking阶段中的GC Roots根扫描阶段,引入了RSet这一概念,具体来说,RSet存储了Region间的引用关系,主要是记录了如下两种:

  • Old Region -> Young Region
  • Old Region -> Old Region

内部数据结构

Card Table 卡表

卡表就是将Region进一步划分为若干个物理连续的512Byte的Card Page,这样每个Region就都有一个Card Table来映射,并且整个堆空间也有一个Global Card Table

Sparse PRT 稀疏哈希表

image.png

此方法内存开销较大,进一步缩减,得到细粒度PRT

细粒度 PRT

通过一个bit位来表示卡表 image.png

粗粒度位图

再度优化细粒度PRT的内存,每个bit位表示一个Region

image.png

Refine线程

Refine线程的核心功能在于:

  • 处理新生代分区的抽样 - 更新Young Heap Region的数目
  • 使G1满足GC的预测停顿时间-XX:MaxGCPauseMillis
  • 管理RSet
  • 更新RSet
  • 将G1中更新的引用关系从DCQS - Dirty Card Queue Set 中更新到RSet中
  • 每个线程都有一个私有的DCQ,而DCQS是全局静态变量
  • 并发、异步处理

SATB & SATB MarkQueue & Write barrier

SATB,SnapShot-At-The-Beginning,是维护并发GC的正确性的一个手段,G1 GC并发理论基础就是SATB。

抽象来说,G1 GC认为在一次GC开始的时候是活的对象就被认为是活的(GC Roots搜索),此时的对象图形成一个逻辑快照(SnapShot),然后在GC过程中新分配的对象都当作是活的,其他不可到达的对象就是死的。通过每个Region记录着的Top-At-Mark-Start,TAMS,指针,分别为prevTAMSnextTAMS,在TAMS以上的对象就是新分配的。但是在并发GC中,Collector和Mutator线程都在进行,如果collector并发mark的过程中mutator覆盖了某些引用字段的值而collector还没mark到那里,那collector就得不到完整的snapshot了,因此,引入了SATB Write Barrier来解决这个问题。

SATB Write Barrier

Write barrier是对"对引用类型字段赋值"这个动作的环切,也就是说赋值的前后都在barrier覆盖的范畴内。在赋值前的部分的write barrier叫做pre-write barrier,在赋值后的则叫做post-write barrier。

前面提到SATB要维持"在GC开始时活的对象"的状态这个逻辑snapshot。除了从root出发把整个对象图mark下来之外,其实只需要用pre-write barrier把每次引用关系变化时旧的引用值记下来就好了。这样,等concurrent marker到达某个对象时,这个对象的所有引用类型字段的变化全都有记录在案,就不会漏掉任何在snapshot里活的对象。当然,很可能有对象在snapshot中是活的,但随着并发GC的进行它可能本来已经死了,但SATB还是会让它活过这次GC。

SATB Mark Queue

为了尽量减少write barrier对mutator性能的影响,G1将一部分原本要在barrier里做的事情挪到别的线程上并发执行。
实现这种分离的方式就是通过logging形式的write barrier:mutator只在barrier里把要做的事情的信息记(log)到一个队列里,然后另外的线程从队列里取出信息批量完成剩余的动作。

以SATB write barrier为例,每个Java线程有一个独立的、定长的SATBMarkQueue,mutator在barrier里只把old_value压入该队列中。一个队列满了之后,它就会被加到全局的SATB队列集合SATBMarkQueueSet里等待处理,然后给对应的Java线程换一个新的、干净的队列继续执行下去。

并发标记(concurrent marker)会定期检查全局SATB队列集合的大小。当全局集合中队列数量超过一定阈值后,concurrent marker就会处理集合里的所有队列:把队列里记录的每个oop都标记上,并将其引用字段压到标记栈(marking stack)上等后面做进一步标记。

这个队列与DCQ的区别在于,前者是Refine线程处理,用于在堆的日常运维中追踪被修改的内存区域,优化垃圾收集的效率,SATB mark queue是mutator线程协助处理,用于记录并发标记阶段开始时对象的引用状态,确保标记的完整性和一致性。并且两者处理的数据类型和GC过程中的角色不一样:SATB Mark Queue 处理对象引用,特别是在堆修改前的状态;Dirty Card Queue 处理的是堆内存中的区域(card),这些区域在被修改时被标记为脏;SATB Mark Queue 在并发标记阶段发挥作用,帮助实现堆状态的一致性快照;Dirty Card Queue 在整个垃圾收集过程中都可能被使用,用于标记和处理那些自上次垃圾收集以来发生变化的堆区域。

选取CSet的子模式

Young GC

选定所有young gen里的region。通过控制young gen的region个数来控制young GC的开销(Refine线程所做的事)

  • 触发时机:

    • 新的对象创建会放入Eden区
    • Eden区满、G1会根据停顿预测模型-计算当前Eden区GC大概耗时多久
    • 如果回收时间远 < -XX:MaxGCPauseMills,则分配空闲分区加入Eden 区存放
    • 如果回收时间接近-XX:MaxGCPauseMills,则触发一次Young GC
  • 年轻代初始占总堆5%,随着空闲分区加入而增加,最多不超过60%

  • Young GC 会回收all新生代分区 - Eden区和Survivor 区

  • Young GC 会STW(stop the world),暂停所有用户线程

  • GC后重新调整新生代Region数目,每次GC回收Region数目不固定

  • 回收过程:

    • 扫描根 - GC Roots
    • 更新RSet - Refine线程
    • 处理RSet - 扫描RSet
    • 复制对象 - Evacuation
    • 处理引用 - 重构RSet及卡表

Mixed GC

选定所有young gen里的region,外加根据global concurrent marking统计得出收集收益高的若干old gen region。在用户指定的开销目标范围内尽可能选择收益高的old gen region。

  • 回收过程
    • 复用 Young GC 的 Initial Marking,并根据Survivor区中的RSet再次根扫描,压入扫描栈
    • 并发标记阶段 - 从扫描栈与SATB MarkQueue中递归进行标记
    • 再标记阶段 - STW,完全处理SATB MarkQueue
    • 清理阶段 - 确认回收对象
    • 复制对象阶段 - 复用 Young GC 中的复制代码 ## 分代式G1

分代式G1的正常工作流程就是在young GC与mixed GC之间视情况切换,背后定期做做全局并发标记。Initial marking默认搭在young GC上执行;当全局并发标记正在工作时,G1不会选择做mixed GC,反之如果有mixed GC正在进行中G1也不会启动initial marking。 在正常工作流程中没有full GC的概念,old gen的收集全靠mixed GC来完成。

如果mixed GC实在无法跟上程序分配内存的速度,导致old gen填满无法继续进行mixed GC,就会切换到G1之外的serial old GC来收集整个GC heap(注意,包括young、old、perm)。这才是真正的full GC。Full GC之所以叫full就是要收集整个堆,只选择old gen的部分region算不上full GC。进入这种状态的G1就跟-XX:+UseSerialGC的full GC一样(背后的核心代码是两者共用的)。
顺带一提,G1 GC的System.gc()默认还是full GC,也就是serial old GC。只有加上 -XX:+ExplicitGCInvokesConcurrent 时G1才会用自身的并发GC来执行System.gc()——此时System.gc()的作用是强行启动一次global concurrent marking;一般情况下暂停中只会做initial marking然后就返回了,接下来的concurrent marking还是照常并发执行。

Safe Point

正如 Safe Point 名称的寓意一样,Safe Point 是一个线程可以安全停留在这里的代码点。当我们需要进行 GC 操作的时候,JVM 可以让所有线程在 Safe Point 处停留下来,等到所有线程都停在 Safe Point 处时,就可以进行内存引用分析,从而确定哪些对象是存活的、哪些对象是不存活的。

为什么让大家更加场景化地理解 Safe Point 这个概念,可以设想如下场景:

  1. 当需要 GC 时,需要知道哪些对象还被使用,或者已经不被使用可以回收了,这样就需要每个线程的对象使用情况。
  2. 对于偏向锁(Biased Lock),在高并发时想要解除偏置,需要线程状态还有获取锁的线程的精确信息。
  3. 对方法进行即时编译优化(OSR 栈上替换),或者反优化(bailout 栈上反优化),这需要线程究竟运行到方法的哪里的信息。

对于上面这些操作,都需要知道现场的各种信息,例如寄存器有什么内容,堆使用情况等等。在做这些操作的时候,线程需要暂停,等到这些操作完成才行,否则会有并发问题,这就需要 Safe Point 的存在。

因此,我们可以将 Safe Point 理解成代码执行过程中的一些特殊位置,当线程执行到这个位置时,线程可以暂停。 Safe Point 处保存了其他位置没有的一些当前线程信息,可以提供给其他线程读取,这些信息包括:线程上下文信息,对象的内部指针等。

而 Stop the World 就是所有线程同时进入 Safe Point 并停留在那里,等待 JVM 进行内存分析扫描,接着进行内存垃圾回收的时间。

为啥需要 Safe Point

前面我们说到,Safe Point 其实就是一个代码的特殊位置,在这个位置时线程可以暂停下来。而当我们进行 GC 的时候,所有线程都要进入到 Safe Point 处,才可以进行内存的分析及垃圾回收。根据这个过程,其实我们可以看到:Safe Point 其实就是栅栏的作用,让所有线程停下来,否则如果所有线程都在运行的话,JVM 无法进行对象引用的分析,那么也无法进行垃圾回收了。

此外,另一个重要的 Java 线程特性 —— interrupted 也是根据 Safe Point 实现的。当我们在代码里写入 Thread.interrupt() 时,只有线程运行到 Safe Point 处时才知道是否发生了 interrupted。因此,Safe Point 也承担了存储线程通信的功能。

GC 日志打印

  • 打印基本GC信息
    • -XX:+PrintGCDetails -XX:PrintGCDateStamps
  • 打印对象分布 - 根据Age
    • -XX:+PrintTenuringDistribution
  • GC后打印堆数据
    • -XX:+PrintHeapAtGC
  • 打印STW时间
    • -XX:+PrintGCApplicationStoppedTime
  • 打印 Safe Point 信息
    • -XX:+PringSafepointStatistics -XX:PrintSafepointStatisticsCount=1
  • 打印 Reference 处理信息
    • -XX:+PrintReferenceGC
  • 日志分割
    • -Xloggc:/path/tp/gc.log - GC日志输出的文件路径
    • -XX:UseGCLogFileRotation - 开启日志文件分割
  • 时间戳命名文件
    • -XX:PrintGCDetails -XX:+PrintGCDataStamps -Xloggc:/path/to/gc-%t.log

JVM

走进JVM

编译JDK8

使用 WSL2 + Docker进行编译测试

  • 配置Docker内的ssh,开放端口
  • 更新源时忽略ssl验证[options]
  • 修改三个报错信息
    • 描述符
    • 时间
    • 已不支持的头文件

JVM启动流程

JavaMain

该函数是Main入口点,位于jdk/src/share/bin/java.c

JLI_Launch

LoadJavaVM()JVMInit()ContinueInNewThread0()都由不同的系统实现,只定义了头文件(动态加载jvm.so这个共享库,并把jvm.so中的相关函数导出并且初始化)

JLI_Launch

JNI调用本地方法

javac -h out/production/jni_test src/com/test/Main.java

.dylib/.dll 动态链接库

static{ System.load(.dll) }

JVM内存管理

内存区域划分

MemoryArea

程序计数器(线程独立)

与8086CPU中PC寄存器类似,指向当前线程所执行字节码的行号

虚拟机栈(线程独立)

当每个方法被执行的时候,JVM都会同步创建一个栈帧,其中包含当前方法的一些信息,比如局部变量表,操作数栈,动态链接,方法出口等。

局部变量表

方法中的局部变量

操作数栈

执行字节码时使用到的栈结构

运行时常量池

在当前方法中如果需要调用其他方法的时候,能够从运行时常量池中找到对应的符号引用,然后在==类加载的解析阶段==替换为直接引用,调用对应的方法,即为动态链接。

在JDK8之后,运行时常量池存储于metaSpace中,字符串常量池除外,依旧在Heap中。

本地方法栈

与虚拟机栈类似,但是供JNI使用

是整个Java应用程序共享的区域,此区域的职责就是存放和管理对象和数组,是垃圾回收机制的主要作用区域

方法区 (JDK8 - Metaspace)

该区域也是整个Java应用程序共享的区域,它存储所有的类信息、常量、静态变量、动态编译缓存等数据,大体分为,类信息表与运行时常量池,两个部分。

类信息表中存放的是当前应用程序加载的所有类信息,包括类的版本、字段、方法、接口等信息,同时会将编译时生成的常量池数据全部存放到运行时常量池中。当然,常量也并不是只能从类信息中获取,在程序运行时,也有可能会有新的常量进入到常量池。

  • String -> 常量池
  • .intern() Heap与常量池的关系
  • Integer与int在jvm的存储差异
    • 前者存储在Heap中,是一个对象,后者直接存储在StackFrame中,是一个实际的数值

申请堆外内存

Unsafe.allocateMemory()

垃圾回收机制

对象存活判定算法

引用计数法

存在互相引用问题

可达性分析算法

可以被选为GC Roots的条件如下:

  • 在虚拟机栈的栈帧中的局部变量表中的指向GC堆里的对象的引用;当前所有正在被调用的方法的引用类型的参数/局部变量/临时值
  • 虚拟机内部需要用到的对象,例如类加载器的引用,等
  • JNI handles
  • 所有当前被加载的Java类
  • 使用类的静态成员变量对对象的引用
  • 常量池中对对象的引用(.intern())
  • 被添加了锁的对象的引用(synchronized)

是对象的引用作为gc root 而不是被引用的对象

[!NOTE] Title 在Java虚拟机(JVM)的垃圾回收(GC)过程中,当考虑局部变量表中的元素作为GC Roots时,是对象的引用而非被引用的对象本身,作为GC Roots。这个区分是重要的,因为它影响了垃圾回收器如何确定对象的可达性。

解释 局部变量表中的引用: 在每个活动线程的栈帧中,局部变量表存储着各种类型的数据,包括各种基本数据类型的值和对象引用。 对于对象类型,局部变量表存储的是指向堆中对象的引用(也就是对象的内存地址),而不是对象本身。 这些引用作为起点(GC Roots)被用于在垃圾回收过程中的可达性分析。 为什么是引用而不是对象: GC Roots的概念是用来标识垃圾收集算法的起点。这些起点本身必须是明确的、易于识别的,且在栈上直接可访问的元素。 局部变量表中的引用直接存在于栈帧中,JVM可以快速访问这些引用,并使用它们来查找实际的对象实例。 如果对象实例本身位于堆中,就不能直接作为GC Roots。堆中的对象的存活与否是需要通过引用的可达性来判断的。

最终判定

Object#finalize()方法 ObjectFinalize

垃圾回收算法

分代收集机制

JVM将堆内存区域划分为新生代老年代永久代(JDK8后由metaSpace代替)

通过设置JVM不同的垃圾收集器,提供不同的具体实现

Minor GC

次要垃圾回收,主要进行新生代区域的垃圾收集。触发条件:新生代的Eden区容量已满时。

Major GC

主要垃圾回收,主要进行老年代的垃圾收集。

FullGC

完全垃圾回收,对整个Java堆内存和方法区进行垃圾回收。 触发条件:

  • 每次晋升到老年代的对象平均大小大于老年代剩余空间
  • Minor GC后存活的对象超过了老年代剩余空间
  • 永久代内存不足(JDK8之前)
  • 手动调用System.gc()方法

空间分配担保

若在一次GC后,新生代Eden区存在的大量对象,超出了Survivor区的容量,这时候就需要使用该机制,将Survivor区无法容纳的对象直接送到老年代,而老年代也存在无法容纳的情况,这时候就会调用Full GC进行大规模垃圾回收,尝试腾出空间,否则直接抛出OOM错误。 Space

标记-清除算法

algo1

标记-复制算法

algo2

标记-整理算法

algo3

垃圾收集器实现

Serial收集器

新生代收集算法采用标记-复制,老年代采用标记-整理 Serial

ParNew收集器

相当于Serial收集器的多线程版本,除了GC线程支持多线程以外没有大区别

Parallel Scavenge/Parallel Old收集器

新生代收集算法采用标记-复制,老年代采用标记-整理,ParNew收集器不同的是,它会自动衡量一个吞吐量,并根据吞吐量来决定每次垃圾回收的时间,这种自适应机制,能够很好地权衡当前机器的性能,根据性能选择最优方案。

CMS收集器

该收集器可以并发执行,主要采用标记-清除算法 CMS CMS垃圾回收分为4个阶段:

  • 初始标记(需要暂停用户线程):这个阶段的主要任务仅仅只是标记出GC Roots能直接关联到的对象,速度比较快,不用担心会停顿太长时间。
  • 并发标记:从GC Roots的直接关联对象开始遍历整个对象图的过程,这个过程耗时较长但是不需要停顿用户线程,可以与垃圾收集线程一起并发运行。
  • 重新标记(需要暂停用户线程):由于并发标记阶段可能某些用户线程会导致标记产生变得,因此这里需要再次暂停所有线程进行并行标记,这个时间会比初始标记时间长一丢丢。
  • 并发清除:最后就可以直接将所有标记好的无用对象进行删除,因为这些对象程序中也用不到了,所以可以与用户线程并发运行。

虽然它的优点非常之大,但是缺点也是显而易见的,标记清除算法会产生大量的内存碎片,导致可用连续空间逐渐变少,长期这样下来,会有更高的概率触发Full GC,并且在与用户线程并发执行的情况下,也会占用一部分的系统资源,导致用户线程的运行速度一定程度上减慢。

Garbage First(G1)收集器

G1收集器绕过了Minor GC、Major GC、Full GC,将整个Java堆划分为2048个大小相同的独立Region块,每一个Region都可以根据需要,自由决定扮演哪个角色(Eden、Survivor和老年代),收集器会根据对应的角色采用不同的回收策略。此外,G1收集器还存在一个Humongous区域,它专门用于存放大对象(一般认为大小超过了Region容量一半的对象为大对象)这样,新生代、老年代在物理上,不再是一个连续的内存区域,而是到处分布的。 G1-1 G1-2 G1的回收过程与CMS大体类似:

  • 初始标记(暂停用户线程):仅仅只是标记一下GC Roots能直接关联到的对象,并且修改TAMS指针的值,让下一阶段用户线程并发运行时,能正确地在可用的Region中分配新对象。这个阶段需要停顿线程,但耗时很短,而且是借用进行Minor GC的时候同步完成的,所以G1收集器在这个阶段实际并没有额外的停顿。
  • 并发标记:从GC Root开始对堆中对象进行可达性分析,递归扫描整个堆里的对象图,找出要回收的对象,这阶段耗时较长,但可与用户程序并发执行。
  • 最终标记(暂停用户线程):对用户线程做一个短暂的暂停,用于处理并发标记阶段漏标的那部分对象。
  • 筛选回收:负责更新Region的统计数据,对各个Region的回收价值和成本进行排序,根据用户所期望的停顿时间来制定回收计划,可以自由选择任意多个Region构成回收集,然后把决定回收的那一部分Region的存活对象复制到空的Region中,再清理掉整个旧Region的全部空间。这里的操作涉及存活对象的移动,是必须暂停用户线程,由多个收集器线程并行完成的。

元空间

其他引用类型

类与类加载

类文件结构

类文件信息

  • 存储顺序
    • 魔数
    • 版本号
    • 常量池 - 字面量和符号引用 具体内容可翻看书籍与笔记,注意主要存储信息都是以字节为单位

字节码指令

ASM字节码编程

类加载机制

类加载过程

要加载一个类,一定是出于某种目的的,比如我们要运行我们的Java程序,那么就必须要加载主类才能运行主类中的主方法,又或是我们需要加载数据库驱动,那么可以通过反射来将对应的数据库驱动类进行加载。

所以,一般在这些情况下,如果类没有被加载,那么会被自动加载:

  • 使用new关键字创建对象时
  • 使用某个类的静态成员(包括方法和字段)的时候(当然,final类型的静态字段有可能在编译的时候被放到了当前类的常量池中,这种情况下是不会触发自动加载的)
  • 使用反射对类信息进行获取的时候(之前的数据库驱动就是这样的)
  • 加载一个类的子类时
  • 加载接口的实现类,且接口带有default的方法默认实现时 ClassLoading 加载过程详见itbaima笔记

一些需要注意的点:

  • static final string类型的字符串会被JVM优化到字符串常量池中,不会加载对应的类
  • 数组类型在创建时不会导致类加载,但是数组中的对象创建时,就会导致类加载
  • class文件,metaspace,heap中类对象,gc roots,不同的类加载器
  • 类字面量,String aString.class之间的差异
  • 静态方法是在自动生成的<clinit>方法中执行

类加载器

不同的Class<?>对象的GC ROOTs可以为不同的类加载器,于是同一.class文件加载的类也可以不属于同一个metaSpace中的类信息,具体见JavaSE中的反射与双亲委派机制。

  • Object#getClass()也和BootstrapClassLoader一样是JNI方法
  • 区分Object.class和Object().getClass()方法的区别,前者是类字面量,后者是方法,但都是获取Heap中的对应的Class<?>对象,其指向metaSpace区的类信息

JVM - OOM错误

Spring 主要源码分析

Bean的生命周期为:BeanFactory初始化 - Bean注册 - 实例化 - 属性注入 - 初始化 - 后处理

Bean的注册

  1. 扫描:Spring通过配置(XML配置或Java配置)或自动扫描(@ComponentScan)来发现应用中定义的Bean。对于自动扫描,Spring会在指定的包路径下查找标注了@Component@Service@Repository@Controller等注解的类
  2. 解析:一旦Bean被发现,Spring将解析Bean的定义信息,包括Bean的作用域(如单例、原型)、生命周期回调(如@PostConstruct@PreDestroy注解方法)、依赖注入的需求(通过@Autowired@Resource等注解标记)等 —— BeanDefinition
  3. 注册:Spring将Bean的定义信息注册到BeanDefinitionRegistry中。这是一个重要步骤,因为注册后的Bean定义将被用于后续的Bean实例化和依赖注入过程。此时,Bean还没有被实例化。

BeanDefinition

BeanDefinition

Spring在初始化过程中,先收集所有bean的元数据信息并注册,bean的元数据描述为接口BeanDefinition,该接口定义了你能想到的一切有关bean的属性信息

BeanDefinition衍生出一系列实现类

  • AbstractBeanDefinition: 如同其他Spring类,大部分BeanDefinition接口的逻辑都由该抽象类实现
  • GenericBeanDefinition: 是一站式、用户可见的bean definition;可见的bean definition意味着可以在该bean definition上定义post-processor来对bean进行操作
  • RootBeanDefinition: 当bean definition存在父子关系的时候,RootBeanDefinition用来承载父元数据的角色(也可独立存在),同时它也作为一个可合并的bean definition使用,在Spring初始化阶段,所有的bean definition均会被(向父级)合并为RootBeanDefinition,子bean definition(GenericBeanDefinition/ChildBeanDefinition)中的定义会覆盖其父bean definition(由parentName指定)的定义
  • AnnotatedBeanDefinition: 用来定义注解Bean Definition

BeanDefinitionHolder只是简单捆绑了BeanDefinition、bean-name、bean-alias,用于注册BeanDefinition及别名alias ^BeanDefinitionHolder

BeanRegistry

Bean的注册逻辑分为两步,一为BeanDefinition的注册,二为别名的注册

  • BeanDefinition注册的定义在BeanDefinitionRegistry#registerBeanDefinition,其实现使用一个Map<String, BeanDefinition> 来保存bean-name和BeanDefinition的关系
  • 别名的注册定义在AliasRegistry#registerAlias,其实现同样使用一个Map<String, String> 来保存别名alias-name和bean-name(或另一个别名alias-name)的关系

注意Bean的注册时机,通常应该在应用上下文的刷新过程之前进行(onRefresh())。一旦上下文被刷新,对Bean定义的任何修改可能不会被识别,或者可能会导致不一致的状态

Bean的实例化

BigMap

BeanFactory

BeanFactory 几个核心接口:

  • AliasRegistry bean别名注册和管理
  • BeanDefinitionRegistry bean元数据注册和管理
  • SingletonBeanRegistry 单例bean注册和管理
  • BeanFactory bean工厂,提供各种bean的获取及判断方法

通过上述的类依赖图,对于Bean的实例化,核心实现是在DefaultListableBeanFactory

DefaultListableBeanFactory - AbstractBeanFactory

bean的实例化过程发生在getBean调用阶段(对于singleton则发生在首次调用阶段),getBean的实现方法众多,我们追根溯源,找到最通用的方法AbstractBeanFactory#doGetBean

doGetBean

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// org.springframework.beans.factory.support.AbstractBeanFactory
protected <T> T doGetBean(final String name, @Nullable final Class<T> requiredType,
        @Nullable final Object[] args, boolean typeCheckOnly) throws BeansException {

    // 1. 获取真正的beanName
    final String beanName = transformedBeanName(name);
    Object bean;

    // 2. 尝试获取(提前曝光的)singleton bean实例(为了解决循环依赖)
    Object sharedInstance = getSingleton(beanName);
    
    // 3. 如果存在
    if (sharedInstance != null && args == null) { ... }
    
    // 4. 如果不存在
    else { ... }
    
    // 5. 尝试类型转换
    if (requiredType != null && !requiredType.isInstance(bean)) { ... }
    
    return (T) bean;
}
doGetBean
Bean Name的转换

在使用bean-name获取bean的时候,除了可以使用原始bean-name之外,还可以使用alias别名等,bean-name的转换则是将传入的’bean-name’一层层转为最原始的bean-name

  • 函数canonicalName的作用则是利用别名注册aliasMap,将别名alias转为原始bean-name
  • 函数transformedBeanName比较特殊,其是将FactoryBean的bean-name前缀 ‘&’ 去除 bean-name-transform
尝试获取单例

拿到原始的bean-name之后,便可以实例化bean或者直接获取已经实例化的singleton-bean

在获取singleton-bean的时候一般存在三种情况:1. 还未实例化(或者不是单例);2. 已经实例化;3. 正在实例化;

  • 对于 “1. 还未实例化” ,返回null即可,后续进行实例化动作
  • 对于 “2. 已经实例化”,直接返回实例化的singleton-bean
  • 对于 “3. 正在实例化”,会存在循环依赖问题

Spring中对于singleton-bean,有一个sharedInstance的概念,在调用getSingleton函数时,返回的不一定是完全实例化的singleton-bean,有可能是一个中间状态(创建完成,但未进行属性依赖注入及其他后处理逻辑),这种中间状态会通过getSingleton函数提前曝光出来,目的是为了解决循环依赖

因此,Spring通过提供三层缓存来解决循环依赖问题,并且可以通过这种机制实现诸多的PostProcessor增强Bean,例如AOP

  • singletonObjects 缓存已经实例化完成的singleton-bean
  • earlySingletonObjects 缓存正在实例化的、提前曝光的singleton-bean,用于处理循环依赖

  • singletonFactories 缓存用于生成earlySingletonObject的 ObjectFactory

ObjectFactory,定义了一个用于创建、生成对象实例的工厂方法

1
2
3
4
@FunctionalInterface
public interface ObjectFactory<T> {
    T getObject() throws BeansException;
}

因此getSingleton的逻辑如下: getSingleton

NOTE: 在提前暴露实体中,将相应的ObjectFactory放入了singletonFactories

FactoryBean的处理(sharedInstance存在的逻辑)

==sharedInstance不一定是我们所需要的bean实例==

例如,我们在定义Bean的时候可以通过实现FactoryBean接口来定制bean实例化的逻辑(实现FactoryBean),通过注册FactoryBean类型的Bean,实例化后的原始实例类型同样为FactoryBean,但我们需要的是通过FactoryBean#getObject方法得到的实例,这需要针对FactoryBean做一些处理,即AbstractBeanFactory#getObjectForBeanInstance

Get the object for the given bean instance, either the bean instance itself or its created object in case of a FactoryBean. Now we have the bean instance, which may be a normal bean or a FactoryBean. If it’s a FactoryBean, we use it to create a bean instance.

该函数要实现的逻辑比较简单,如果sharedInstance是 FactoryBean,则使用getObject方法创建真正的实例

getObjectForBeanInstance是一个通用函数,并不只针对通过getSingleton得到的sharedInstance,任何通过缓存或者创建得到的 rawInstance,都需要经过getObjectForBeanInstance处理,拿到真正需要的 beanInstance

1
2
3
4
5
6
7
8
/**
 * @param beanInstance  sharedInstance / rawInstance,可能为FactoryBean
 * @param name            传入的未做转换的 bean name
 * @param beanName        对name做过转换后的原始 canonical bean name
 * @param mbd            合并后的RootBeanDefinition,下文会介绍
 */
protected Object getObjectForBeanInstance(
    Object beanInstance, String name, String beanName, RootBeanDefinition mbd)
getObjectBeanInstance
getObjectForBeanInstance

在这个判断逻辑中,如果入参name以’&‘开头则直接返回,这里兼容了一种情况,如果需要获取/注入FactoryBean而不是getObject生成的实例,则需要在bean-name/alias-name前加入'&'

对于singleton,FactoryBean#getObject的结果会被缓存到factoryBeanObjectCache,对于缓存中不存在或者不是singleton的情况,会通过FactoryBean#getObject生成 ^factorybeangetobject

FactoryBeanRegistrySupport#getObjectFromFactoryBean
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
protected Object getObjectFromFactoryBean(FactoryBean<?> factory, String beanName, boolean shouldPostProcess) {  
    if (factory.isSingleton() && this.containsSingleton(beanName)) {  
        synchronized(this.getSingletonMutex()) {  
            Object object = this.factoryBeanObjectCache.get(beanName);  
            if (object == null) {  
                object = this.doGetObjectFromFactoryBean(factory, beanName);  
                Object alreadyThere = this.factoryBeanObjectCache.get(beanName);  
                if (alreadyThere != null) {  
                    object = alreadyThere;  
                } else {  
                    if (shouldPostProcess) {  
                        if (this.isSingletonCurrentlyInCreation(beanName)) {  
                            return object;  
                        }  
  
                        this.beforeSingletonCreation(beanName);  
  
                        try {  
                            object = this.postProcessObjectFromFactoryBean(object, beanName);  
                        } catch (Throwable var14) {  
                            throw new BeanCreationException(beanName, "Post-processing of FactoryBean's singleton object failed", var14);  
                        } finally {  
                            this.afterSingletonCreation(beanName);  
                        }  
                    }  
  
                    if (this.containsSingleton(beanName)) {  
                        this.factoryBeanObjectCache.put(beanName, object);  
                    }  
                }  
            }  
  
            return object;  
        }  
    } else {  
        Object object = this.doGetObjectFromFactoryBean(factory, beanName);  
        if (shouldPostProcess) {  
            try {  
                object = this.postProcessObjectFromFactoryBean(object, beanName);  
            } catch (Throwable var17) {  
                throw new BeanCreationException(beanName, "Post-processing of FactoryBean's object failed", var17);  
            }  
        }  
  
        return object;  
    }  
}

对于Singleton:

  • 首先从缓存中尝试获取,如获取失败,调用[doGetObjectFromFactoryBean](#FactoryBeanRegistrySupport doGetObjectFromFactoryBean “wikilink”),其中内核是调用FactoryBean#getObject()方法
  • 对于需要后处理的Bean,首先判断是否处于正在创建状态(isSingletonCurrentlyInCreation),并且通过this.beforeSingletonCreate() this.afterSingletonCreation()将实际的BeanPostProcessor过程保护
  • 对于BeanPostProcessor,调用this.postProcessObjectFromFactoryBean,其具体实现在[AbstractAutowireCapableBeanFactory#applyBeanPostProcessorAfterInitialization](#AbstractAutowireCapableBeanFactory applyBeanPostProcessorAfterInitialization “wikilink”)
FactoryBeanRegistrySupport#doGetObjectFromFactoryBean
FactoryBeanRegistrySupport_doGetObjectFromFactoryBean
AbstractAutowireCapableBeanFactory#applyBeanPostProcessorAfterInitialization

postProcessAfterInitialization函数可以对现有bean instance做进一步的处理,甚至可以返回新的bean instance,这就为bean的增强提供了一个非常方便的扩展方式

加载Bean实例 (sharedInstance不存在的逻辑)
createBeanInstance

Bean的加载/创建分为三大部分

  1. 将BeanDefinition合并为RootBeanDefinition,类似类继承,子BeanDefinition属性会覆盖父BeanDefinition
  2. 依次加载所依赖的bean,对于有依赖的情况,优先递归加载依赖的bean
  3. 按照不同的bean类型,根据BeanDefinition的定义进行加载/创建
BeanDefinition合并 (RootBeanDefinition)

AbstractBeanFactory#getMergedLocalBeanDefinition中执行核心逻辑

加载dependes-On beans
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
String[] dependsOn = mbd.getDependsOn();
if (dependsOn != null) {
    // 遍历所有的依赖
    for (String dep : dependsOn) {
        // 检测循环依赖
        if (isDependent(beanName, dep)) { /* throw exception */ }
        // 注册依赖关系
        registerDependentBean(dep, beanName);
        // 递归getBean,加载依赖bean
        try { getBean(dep); }
        catch (NoSuchBeanDefinitionException ex) { /* throw exception */ }
    }
}

该过程中涉及两个中间态

  • dependentBeanMap 存储哪些bean依赖了我(哪些bean里注入了我) 如果 beanB -> beanA, beanC -> beanA,key为beanA,value为[beanB, beanC]
  • dependenciesForBeanMap 存储我依赖了哪些bean(我注入了哪些bean) 如果 beanA -> beanB, beanA -> beanC,key为beanA,value为[beanB, beanC]
加载singleton bean实例
1
2
3
4
5
6
7
8
if (mbd.isSingleton()) {
    sharedInstance = getSingleton(beanName, () -> {
        // singletonFactory - ObjectFactory
        try { return createBean(beanName, mbd, args); }
        catch (BeansException ex) {    destroySingleton(beanName);    throw ex; }
    });
    bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
}

其中核心为createBeangetObjectForBeanInstance

  • createBean 根据BeanDefinition的内容,创建/初始化 bean instance
  • #getObjectBeanInstance 主要处理FactoryBean

createBean被包装在lambda(singletonFactory),重写ObjectFactory#getObject(),作为[getSingleton](#DefaultSingletonBeanRegistry getSingleton(String, ObjectFactory) “wikilink”)的参数

DefaultSingletonBeanRegistry#getSingleton(String, ObjectFactory)
createSingletonBean

同样的,会先在缓存中查找该singleton,如果不存在,创建的核心逻辑在于[createBean](#AbstractAutowireCapableBeanFactory createBean “wikilink”)

AbstractAutowireCapableBeanFactory#createBean
createBean
  1. resolveBeanClass 这一步骤用于锁定bean class,在没有显示指定beanClass的情况下,使用className加载beanClass
  2. 验证method overrides ==在BeanDefinitionReader 中有提到过lookup-method及replace-method,该步骤是为了确认以上两种配置中的method是否存在==
  3. 执行InstantiationAwareBeanPostProcessor前处理器(postProcessBeforeInstantiation) 如果这个步骤中生成了"代理"bean instance,则会有一个短路操作,直接返回该bean instance而不再执行doCreate,其中的核心逻辑为调用this.applyBeanPostProcessorsBeforeInstantiation() ^fcb215
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
try {
    // Give BeanPostProcessors a chance to return a proxy instead of the target bean instance.
    Object bean = resolveBeforeInstantiation(beanName, mbdToUse);
    if (bean != null) {
          // 如果这里生成了代理的bean instance会直接返回
        return bean;
    }
} cache (Throwable ex) { // throw exception }

try {
  // 创建bean instance
  Object beanInstance = doCreateBean(beanName, mbdToUse, args);
  // ...
}
  1. doCreateBean (AbstractAutowireCapableBeanFactory) 真正bean的创建及初始化过程在此处实现
doCreateBean
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
protected Object doCreateBean(String beanName, RootBeanDefinition mbd, @Nullable Object[] args) throws BeanCreationException {  
    BeanWrapper instanceWrapper = null;  
    if (mbd.isSingleton()) {  
        instanceWrapper = (BeanWrapper)this.factoryBeanInstanceCache.remove(beanName);  
    }  
  
    if (instanceWrapper == null) {  
        instanceWrapper = this.createBeanInstance(beanName, mbd, args);  
    }  
  
    Object bean = instanceWrapper.getWrappedInstance();  
    Class<?> beanType = instanceWrapper.getWrappedClass();  
    if (beanType != NullBean.class) {  
        mbd.resolvedTargetType = beanType;  
    }  
  
    synchronized(mbd.postProcessingLock) {  
        if (!mbd.postProcessed) {  
            try {  
                this.applyMergedBeanDefinitionPostProcessors(mbd, beanType, beanName);  
            } catch (Throwable var17) {  
                throw new BeanCreationException(mbd.getResourceDescription(), beanName, "Post-processing of merged bean definition failed", var17);  
            }  
  
            mbd.markAsPostProcessed();  
        }  
    }  
  
    boolean earlySingletonExposure = mbd.isSingleton() && this.allowCircularReferences && this.isSingletonCurrentlyInCreation(beanName);  
    if (earlySingletonExposure) {  
        if (this.logger.isTraceEnabled()) {  
            this.logger.trace("Eagerly caching bean '" + beanName + "' to allow for resolving potential circular references");  
        }  
  
        this.addSingletonFactory(beanName, () -> {  
            return this.getEarlyBeanReference(beanName, mbd, bean);  
        });  
    }  
  
    Object exposedObject = bean;  
  
    try {  
        this.populateBean(beanName, mbd, instanceWrapper);  
        exposedObject = this.initializeBean(beanName, exposedObject, mbd);  
    } catch (Throwable var18) {  
        if (var18 instanceof BeanCreationException bce) {  
            if (beanName.equals(bce.getBeanName())) {  
                throw bce;  
            }  
        }  
  
        throw new BeanCreationException(mbd.getResourceDescription(), beanName, var18.getMessage(), var18);  
    }  
  
    if (earlySingletonExposure) {  
        Object earlySingletonReference = this.getSingleton(beanName, false);  
        if (earlySingletonReference != null) {  
            if (exposedObject == bean) {  
                exposedObject = earlySingletonReference;  
            } else if (!this.allowRawInjectionDespiteWrapping && this.hasDependentBean(beanName)) {  
                String[] dependentBeans = this.getDependentBeans(beanName);  
                Set<String> actualDependentBeans = new LinkedHashSet(dependentBeans.length);  
                String[] var12 = dependentBeans;  
                int var13 = dependentBeans.length;  
  
                for(int var14 = 0; var14 < var13; ++var14) {  
                    String dependentBean = var12[var14];  
                    if (!this.removeSingletonIfCreatedForTypeCheckOnly(dependentBean)) {  
                        actualDependentBeans.add(dependentBean);  
                    }  
                }  
  
                if (!actualDependentBeans.isEmpty()) {  
                    throw new BeanCurrentlyInCreationException(beanName, "Bean with name '" + beanName + "' has been injected into other beans [" + StringUtils.collectionToCommaDelimitedString(actualDependentBeans) + "] in its raw version as part of a circular reference, but has eventually been wrapped. This means that said other beans do not use the final version of the bean. This is often the result of over-eager type matching - consider using 'getBeanNamesForType' with the 'allowEagerInit' flag turned off, for example.");  
                }  
            }  
        }  
    }  
  
    try {  
        this.registerDisposableBeanIfNecessary(beanName, bean, mbd);  
        return exposedObject;  
    } catch (BeanDefinitionValidationException var16) {  
        throw new BeanCreationException(mbd.getResourceDescription(), beanName, "Invalid destruction signature", var16);  
    }  
}

可以将该流程细分为如下:

  1. [创建Bean实体](#创建Bean实体 AbstractAutowireCapableBeanFactory createBeanInstance “wikilink”)
  2. [BeanDefinition后处理](#BeanDefinition后处理 - AbstractAutowireCapableBeanFactory applyMergedBeanDefinitionPostProcessors “wikilink”)
  3. 提前暴露实体
  4. [属性注入](#属性注入 - AbstractAutowireCapableBeanFactory populateBean “wikilink”)
  5. [初始化](#初始化 - AbstractAutowireCapableBeanFactory initializeBean “wikilink”)
  6. [注册Disposable](#注册Disposable - AbstractBeanFactory registerDisposableBeanIfNecessary “wikilink”)
创建Bean实体 - AbstractAutowireCapableBeanFactory#createBeanInstance
createBeanInstance_2
  1. instanceSupplier 从上面的流程图可以看出,创建bean实体不一定会使用到构造函数,可以使用Supplier的方式
  2. factory method 工厂模式 @Configuration + @Bean的实现方式就是factory-bean + factory-method [对应的参数获取](#ConstructorResolver resolvePreparedArguments “wikilink”)
  3. 有参构造函数 AbstractAutowireCapableBeanFactory#autowireConstructor -> [ConstructorResolver#autowireConstructor](#**ConstructorResolver autowireConstructor** “wikilink”)
  4. 无参构造函数 与有参构造创建过程一致,除了不需要参数的依赖注入,使用默认无参构造函数进行实例化
ConstructorResolver#resolvePreparedArguments

使用指定(类)bean的(静态)方法创建bean实体的逻辑在ConstructorResolver#instantiate(String, RootBeanDefinition, Object, Method, args),而真正的逻辑在SimpleInstantiationStrategy#instantiate(RootBeanDefinition, String, BeanFactory, Object, Method, Object…),其核心的执行逻辑非常简单,有了方法factoryMethod(factoryBean)及入参args,便可以调用该方法创建bean实体

1
Object result = factoryMethod.invoke(factoryBean, args);

factoryBean可以通过beanFactory.getBean获取到(正是当前在讲的逻辑),factoryMethod可以通过反射获取到,而入参args就从ConstructorResolver#resolvePreparedArguments中获取,即是Spring中依赖注入的核心实现

该函数的作用是将BeanDefinition中定义的入参转换为需要的参数(==将BeanDefinitionReader中封装的对象转换==)

resolvePreparedArguments

More in blogs

ConstructorResolver#autowireConstructor

同样的,调用ConstructorResolver#resolvePreparedArguments进行参数的解析和转换(参数的依赖注入),然后调用 [ConstructorResolver#instantiate](#ConstructorResolver instantiate “wikilink”) 来创建Bean实例

ConstructorResolver#instantiate

内部并没有统一利用反射技术直接使用构造函数创建,而是通过InstantiationStrategy.instantiate进行创建

Instantiate
  • 没有设置override-method时,直接使用构造函数创建
  • 设置了override-method时,使用cglib技术构造代理类,并代理override方法

Spring默认的实例化策略为CglibSubclassingInstantiationStrategy

BeanDefinition后处理 - AbstractAutowireCapableBeanFactory#applyMergedBeanDefinitionPostProcessors

在属性注入之前提供一次机会来对BeanDefinition进行处理,内部执行所有注册MergedBeanDefinitionPostProcessorpostProcessMergedBeanDefinition方法

[!hint] MergedBeanDefinitionPostProcessor MergedBeanDefinitionPostProcessor 是一个特定类型的 BeanPostProcessorMergedBeanDefinitionPostProcessorpostProcessMergedBeanDefinition 方法允许在实例化bean之后但在设置bean属性之前,对bean的定义(BeanDefinition)进行后处理。这个阶段是用于修改或增强bean定义的,例如,可以解析注解并相应地修改 BeanDefinition 的属性。

对于MergedBeanDefinitionPostProcessor的实现类AutowiredAnnotationBeanPostProcessor,其内部方法AutowiredAnnotationBeanPostProcessor#buildAutowiringMetadata 实现了两个注解类的解析 @Value 及 @Autowired ,找到注解修饰的Filed或者Method并缓存,具体逻辑在[属性注入](#属性注入 - AbstractAutowireCapableBeanFactory populateBean “wikilink”) ^autowiredAnnotationBeanPostProcessor1

提前暴露实体

通过将AbstractAutowireCapableBeanFactory#getEarlyBeanReference封装为ObjectFactory,调用DefaultSingletonBeanRegistry#addSingletonFactory,将该ObjectFactory缓存在DefaultSingletonBeanRegistry.singletonFactories中,在getBean逻辑中的getSingleton会执行ObjectFactory将singleton提前暴露 ==此处即为何时添加ObjectFactory进入singletonFactories中,解决循环依赖==

此时暴露的singleton-bean仅完成了bean的实例化,属性注入、初始化等逻辑均暂未执行

属性注入 - AbstractAutowireCapableBeanFactory#populateBean

在[创建Bean实体](#创建Bean实体 - AbstractAutowireCapableBeanFactory createBeanInstance “wikilink”)中介绍了factory method方式及有参构造函数方式的参数注入逻辑,除此之外还有一种注入便是属性注入

populateBean

流程中出现了两次InstantiationAwareBeanPostProcessor,在第一次出现中调用的postProcessorAfterInstantiation也与前面的InstantiationAwareBeanPostProcessor.postProcessBeforeInstantiation相同,拥有短路操作:如果该步骤生成了"代理"bean instance,直接返回该bean instance而不再执行后续的doCreate;如果有任意一个InstantiationAwareBeanPostProcessor的postProcessAfterInstantiation方法返回false,则会跳出属性注入的逻辑,官方对此的解释如下

Give any InstantiationAwareBeanPostProcessors the opportunity to modify the state of the bean before properties are set. This can be used, for example, to support styles of field injection.

autowireByNameautowireByType方法作为"候补"补充BeanDefinition的propertyValues

PropertyValue中记录了需要注入的属性信息及需要注入的属性值,那BeanDefinition的propertyValues都来自哪里?xml中的bean配置、自定义的BeanDefinition等

通过注解修饰的属性(方法)通过InstantiationAwareBeanPostProcessor#postProcessProperties进行注入 -> ==AutowiredAnnotationBeanPostProcessor#postProcessProperties & CommonAnnotationBeanPostProcessor#postProcessProperties==

最后,通过AbstractAutowireCapableBeanFactory#applyPropertyValues 将PropertyValue中记录的需要注入的属性,已经依赖的类型(String、RuntimeBeanReference、等),根据不同的类型解析依赖的bean并设置到对应的属性上(==此过程与DefaultListableBeanFactory#doResolveDependency相似==)

初始化 - AbstractAutowireCapableBeanFactory#initializeBean

以上,完成了bean实例的创建和属性注入,之后还有一些初始化的方法,比如各种AwaresetXxx是如何调用的、@PostConstruct是怎么调用的?

initializeBean
注册Disposable - AbstractBeanFactory#registerDisposableBeanIfNecessary

至此,终于完成了bean实例的创建、属性注入以及之后的初始化,此后便可以开始使用了

在使用Spring的过程中经常还会碰到设置销毁逻辑的情况,如数据库连接池、线程池等等,在Spring销毁bean的时候还需要做一些处理,类似于C++中的析构

在bean的创建逻辑中,最后一个步骤则是注册bean的销毁逻辑(DisposableBean)

销毁逻辑的注册有几个条件

  1. 非prototype(singleton或者注册的scope)
  2. 非NullBean
  3. 指定了destroy-method(如xml中指定或者BeanDefinition中直接设置)或者存在**@PreDestroy** 注解的方法(CommonAnnotationBeanPostProcessor.requiresDestruction
1
if (!mbd.isPrototype() && requiresDestruction(bean, mbd))

满足以上条件的bean会被封装为DisposableBeanAdapter,并注册在DefaultSingletonBeanRegistry.disposableBeans

加载prototype bean实例
1
2
3
4
5
6
7
8
9
else if (mbd.isPrototype()) {
    Object prototypeInstance = null;
    try {
        beforePrototypeCreation(beanName);
        prototypeInstance = createBean(beanName, mbd, args);
    }
    finally { afterPrototypeCreation(beanName);    }
    bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
}

prototype bean的创建与singleton bean类似,只是不会缓存创建完成的bean

加载其他scope bean实例

scope,即作用域,或者可以理解为生命周期

上文介绍了singleton-bean及prototype-bean的创建过程,严格意义上讲以上两种都是一种特殊的scope-bean,分别对应ConfigurableBeanFactory#SCOPE_SINGLETON及ConfigurableBeanFactory#SCOPE_PROTOTYPE,前者作用域为整个IOC容器,也可理解为单例,后者作用域为所注入的bean,每次注入(每次触发getBean)都会重新生成

Spring中还提供很多其他的scope,如WebApplicationContext#SCOPE_REQUEST或WebApplicationContext#SCOPE_SESSION,前者作用域为一次web request,后者作用域为一个web session周期

自定义scope的bean实例创建过程与singleton bean的创建过程十分相似,需要实现Scope的get方法(org.springframework.beans.factory.config.Scope#get)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
else {
    String scopeName = mbd.getScope();
    final Scope scope = this.scopes.get(scopeName);
    if (scope == null) { /* throw exception */ }
    try {
        Object scopedInstance = scope.get(beanName, () -> {
            beforePrototypeCreation(beanName);
            // createBean被封装在Scope#get函数的lambda参数ObjectFactory中
            try { return createBean(beanName, mbd, args); }
            finally { afterPrototypeCreation(beanName); }
        });
        bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
    } catch (IllegalStateException ex) { /* throw exception */}
}

Scope接口除了get方法之外,还有一个remove方法,前者用于定义bean的初始化逻辑,后者用于定义bean的销毁逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public interface Scope {
  /**
   * Return the object with the given name from the underlying scope
   */
  Object get(String name, ObjectFactory<?> objectFactory);
  
   /**
   * Remove the object with the given name from the underlying scope.
   */
  Object remove(String name);
}

WebApplicationContext#SCOPE_SESSION对应的Scope实现见org.springframework.web.context.request.SessionScope

WebApplicationContext#SCOPE_REQUEST对应的Scope实现见org.springframework.web.context.request.RequestScope

以上两种Scope实现都较为简单,前者将初始化的bean存储在request attribute中,后者将初始化的bean存储在http session中

尝试类型转换

以上,完成了bean的创建、属性的注入、dispose逻辑的注册,但获得的bean类型与实际需要的类型可能依然不相符,在最终交付bean之前(getBean)还需要进行一次类型转换,使用PropertyEditor进行类型转换,将bean转换为真正需要的类型后,便完成了整个getBean的使命

Bean销毁过程

bean的创建过程始于DefaultListableBeanFactory#getBean,销毁过程则终于ConfigurableApplicationContext#close,跟踪下去,具体的逻辑在DefaultSingletonBeanRegistry#destroySingletons

  1. DefaultSingletonBeanRegistry.disposableBeans 需要注册销毁逻辑的bean会被封装为DisposableBeanAdapter并缓存在此处
  2. DefaultSingletonBeanRegistry.dependentBeanMap 对于存在依赖注入关系的bean,会将bean的依赖关系缓存在此处(dependentBeanMap: 哪些bean依赖了我; dependenciesForBeanMap: 我依赖了哪些bean)
destory

从上图中可以看出,bean的销毁顺序与创建顺序正好相反,如果有 beanA –dependsOn–> beanB –> beanC ,创建(getBean)时一定是beanC -> beanB -> beanA,销毁时一定是 beanA -> beanB -> beanC,以此避免因为依赖关系造成的一些异常情况

循环依赖

earlySingletonObject是用来解决循环依赖的问题,具体时机是在实例化完后属性注入之前,会提前将当前的bean实体暴露出来,以防止在属性注入过程中所注入的bean又依赖当前的bean造成的类似"死锁"的状态

但是存在以下情况,Spring依旧会陷入循环依赖死锁:

  • 显式设置dependsOn的循环依赖
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@DependsOn("beanB")
@Component
public class BeanA {}

@DependsOn("beanC")
@Component
public class BeanB {}

@DependsOn("beanA")
@Component
public class BeanC {}
  • 构造函数循环依赖
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Component
public class BeanA {
    public BeanA(BeanB beanB) {
    }
}

@Component
public class BeanB {
    public BeanB(BeanC beanC) {
    }
}

@Component
public class BeanC {
    public BeanC(BeanA beanA) {
    }
}
  • factory-method循环依赖
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Bean
public BeanA beanA(BeanB beanB) {
    return new BeanA();
}

@Bean
public BeanB beanB(BeanC beanC) {
    return new BeanB();
}

@Bean
public BeanC beanC(BeanA beanA) {
    return new BeanC();
}
  • 上述三种依赖混合

只要一个循环依赖中的所有bean,其依赖关系都需要在创建bean实例之前进行解决,此循环依赖则一定无解

要打破无解的循环依赖,在构成循环依赖的一个环中,只需要保证其中至少一个Bean的依赖在该Bean创建且暴露earlySingleton之后处理即可,即在属性注入阶段进行属性依赖的处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Component
public class BeanA {
    @Autowired
    private BeanB beanB;
}

@Component
public class BeanB {
    public BeanB(BeanC beanC) {
    }
}

@Bean
public BeanC beanC(BeanA beanA) {
    return new BeanC();
}

以"bean创建且暴露earlySingleton"为节点,在此之前处理依赖的有instance supplier parameterfactory method parameterconstructor parameter、等,在此之后处理的依赖有 class propertysetter parameter

ApplicationContext

BeanFactory实现了IoC的基础能力,而ApplicationContextBeanFactory的子类,除了继承IoC的基础能力外

  • 支持国际化 (MessageSource)
  • 支持资源访问 (ResourcePatternResolver)
  • 事件机制 (ApplicationEventPublisher)
  • 默认初始化所有Singleton
  • 提供扩展能力
ApplicationContext

无论何种功能的ApplicationContext,在做完基本的初始化后均会调用AbstractApplicationContext#Refresh

AbstractApplicationContext#Refresh

refresh

准备上下文 - AbstractApplicationContext#prepareRefresh

该部分主要实现对上下文的准备工作,其主要涉及到两个接口AbstractApplicationContext#initPropertySourcesConfigurablePropertyResolver#validateRequiredProperties,前者由子类实现,用于初始化PropertySource;后者用于对必要属性进行验证

1
2
3
4
5
6
7
public class MyClasspathXmlApplicationContext extends ClassPathXmlApplicationContext {
    @Override
    protected void initPropertySources() {
        super.initPropertySources();
        getEnvironment().setRequiredProperties("runtimeEnv");
    }
}

重写initPropertySources方法,并添加runtimeEnv为必须的环境变量属性,如此在系统启动的时候便会进行检测,对于不存在任何一个必要环境变量的情况均会抛出异常终止启动

加载BeanFactory - AbstractApplicationContext#obtainFreshBeanFactory

该函数内部实现比较简单,重点在refreshBeanFactory,该函数同样由子类实现

对于AbstractRefreshableApplicationContext,refreshBeanFactory基本步骤为

  1. 创建BeanFactory (DefaultListableBeanFactory)
  2. 设置BeanFactory
  3. 加载BeanDefinition

在第3步中,AbstractXmlApplicationContext的实现则是对xml配置文件的解析及加载;AnnotationConfigWebApplicationContext的实现则是对class文件的扫描并加载,以及其他基于AbstractRefreshableApplicationContext的ApplicationContext实现

对于GenericApplicationContext,BeanFactory的创建及BeanDefinition的加载在refresh调用之前早已完成,refreshBeanFactory的实现则是对BeanFactory加载状态的简单校验

AbstractRefreshableApplicationContext & GenericApplicationContext

AbstractRefreshableApplicationContext

对于继承自 AbstractRefreshableApplicationContext 的上下文,例如 ClassPathXmlApplicationContextAnnotationConfigApplicationContext,它们通过覆盖 refreshBeanFactory() 方法来实现具体的 BeanDefinition 加载逻辑。这些上下文类型专门用于从外部资源(如 XML 文件、Java 配置类等)加载配置信息,并将这些配置信息解析为一组 BeanDefinition,然后注册到内部的 BeanFactory 中。这个过程通常发生在上下文的 refresh() 方法调用过程中(我们正在讨论的),这个方法不仅负责加载和注册 BeanDefinition,还包括初始化单例bean、处理别名定义、注册BeanPostProcessor等一系列容器启动时的活动。

[!QUOTE] refresh()关键步骤 ^configurerRelated

  1. 创建 BeanFactoryAbstractRefreshableApplicationContext 首先会创建一个新的 BeanFactory 实例,这通常是一个 DefaultListableBeanFactory 实例。这个 BeanFactory 实现了 BeanDefinitionRegistry 接口,使得它能够注册 BeanDefinition。
  2. ==加载 BeanDefinition:接着,上下文会调用特定的方法(例如,对于基于 XML 的配置,会使用 XmlBeanDefinitionReader;对于基于注解的配置,会使用 AnnotatedBeanDefinitionReaderClassPathBeanDefinitionScanner)来加载 BeanDefinition。这些 Reader 和 Scanner 实现了 BeanDefinitionRegistry 接口的 registerBeanDefinition 方法来实际完成注册工作。==
  3. 刷新 BeanFactory:加载完所有 BeanDefinition 后,AbstractRefreshableApplicationContext 会对 BeanFactory 进行刷新,这涉及到预实例化单例、注册 BeanPostProcessor、初始化剩余的非懒加载单例等一系列操作。
  4. 发布事件:在整个容器刷新过程中,还会发布各种应用事件,如 ContextRefreshedEvent,允许应用中的其他组件对这些事件作出响应。

通过上述步骤,AbstractRefreshableApplicationContext 完成了 BeanDefinition 的加载、注册以及整个 Spring 容器的初始化和刷新工作。在这个过程中,BeanDefinitionRegistry 接口扮演了 BeanDefinition 注册的关键角色

GenericApplicationContext

GenericApplicationContext 直接实现了 BeanDefinitionRegistry 接口,使得它可以在运行时动态注册 BeanDefinition。与 AbstractRefreshableApplicationContext 的子类不同,GenericApplicationContext 并不专门依赖于外部资源来加载 BeanDefinition。相反,它提供了一套程序化的接口,允许开发者直接在代码中通过调用 registerBeanDefinition(String beanName, BeanDefinition beanDefinition) 方法来注册 BeanDefinition。这种方式使得 GenericApplicationContext 非常灵活,适用于那些需要在运行时动态调整 Spring 配置的场景。

关系和区别
  • 加载方式的区别AbstractRefreshableApplicationContext 的子类通常通过解析配置资源(XML、注解等)来加载 BeanDefinition,而 GenericApplicationContext 允许以编程方式直接注册 BeanDefinition。
  • 使用场景的区别AbstractRefreshableApplicationContext 的子类适合于静态配置资源的场景,其中配置信息在应用启动时已经确定。GenericApplicationContext 更适合于动态配置的场景,比如基于条件的 BeanDefinition 注册或运行时的配置调整。
  • 刷新容器的能力:虽然两者都可以通过 refresh() 方法来刷新应用上下文,但 AbstractRefreshableApplicationContext 的子类通常在设计时就考虑了完整的容器刷新流程(包括重新加载配置资源),而 GenericApplicationContext 刷新主要是为了应用新注册的 BeanDefinition。==前者会重置BeanFactory而后者不会==

填充部分扩展 - AbstractApplicationContext#prepareBeanFactory

该函数执行以下逻辑

  1. 设置BeanFactory的ClassLoader
  2. 注册默认BeanExpressionResolver,用于依赖注入时SpEL的支持
  3. 注册默认PropertyEditor,用于依赖注入时对参数的解析转换
  4. 注册几个特殊Aware的处理逻辑
  5. 注册AspectJ相关的几个处理器,用于AOP的支持
  6. 注册几个特殊的BeanDefinition

==2-3 的核心逻辑在于解析依赖的值,DefaultListableBenFactory#doResolveDependency==

注册几个特殊Aware的处理逻辑

在Bean实例化、注入依赖之后会对Bean进行[最后的初始化](#初始化 - AbstractAutowireCapableBeanFactory initializeBean “wikilink”),调用相应的setter方法分别针对BeanNameAwareBeanClassLoaderAwareBeanFactoryAware进行处理

在该函数中,会注册几个特殊的BeanPostProcessor

1
beanFactory.addBeanPostProcessor(new ApplicationContextAwareProcessor(this));

其实现了postProcessBeforeInitialization方法,内部调用ApplicationContextAwareProcessor#invokeAwareInterfaces针对另外的几类Aware进行了处理

除此之外,Spring会将上述几类Aware设置为ignoreDependencyInterface,这意味着以上几类Bean的注入只能通过Aware的方式而不能通过其他属性依赖注入的方式(属性注入、函数参数注入等)

注册特殊的Bean

在使用Spring时,是否有过直接注入BeanFactory亦或是ResourceLoader,这些bean正是在这里被Spring注册进去的,除以上外Spring还注入了

  • BeanFactory
  • ResourceLoader
  • ApplicationEventPublisher
  • ApplicationContext
  • Environment
  • systemProperties - Environment#.getSystemProperties:Map<String, Object>
  • systemEnvironment - Environment#.getSystemEnvironment:Map<String, Object>

    AbstractApplicationContext#refresh#postProcessBeanFactory()

对于不同的实现类,注册相应的BeanPostProcessor,例如ServletWebServerApplicationContext

激活BeanFactoryPostProcessor - AbstractApplicationContext#invokeBeanFactoryPostProcessors

其内部实现在PostProcessorRegistrationDelegate#invokeBeanFactoryPostProcessors

BeanFactoryPostProcessor的定义非常简单,其postProcessBeanFactory方法允许在bean实例化前对BeanFactory做一些额外的设置

1
2
3
4
5
6
7
8
9
public interface BeanFactoryPostProcessor {
    /**
     * Modify the application context's internal bean factory after its standard
     * initialization. All bean definitions will have been loaded, but no beans
     * will have been instantiated yet. This allows for overriding or adding
     * properties even to eager-initializing beans.
     */
    void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException;
}

核心逻辑如下

invokeBeanFactoryPostProcessors

其中涉及两种类型,BeanDefinitionRegistryPostProcessorBeanFactoryPostProcessor,前者为后者的子类,BeanDefinitionRegistryPostProcessors提供了额外的接口postProcessBeanDefinitionRegistry,用于更加方便地动态地注册额外的BeanDefinition (registryProcessor.postProcessBeanDefinitionRegistry(registry)),如读取配置文件(json、properties、yml)并解析(或者任何其他的形式),并通过该接口注册相应的BeanDefinition,基于Spring Boot Starter的很多框架均使用该方式进行bean的注册

以上流程图可以看出,优先执行BeanDefinitionRegistryPostProcessor#postProcessBeanDefinitionRegistry,再执行BeanFactoryPostProcessor#postProcessBeanFactory,各自内部优先执行PriorityOrdered实现,再执行Ordered实现,最后执行无任何排序的实现

注册BeanPostProcessor - AbstractApplicationContext#registerBeanPostProcessors

其内部实现在PostProcessorRegistrationDelegate#registerBeanPostProcessors b

BeanPostProcessor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface BeanPostProcessor {
    /**
     * Apply this BeanPostProcessor to the given new bean instance before any bean
     * initialization callbacks (like InitializingBean's afterPropertiesSet
     * or a custom init-method). 
     * The returned bean instance may be a wrapper around the original.
     */
    @Nullable
    default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * Apply this BeanPostProcessor to the given new bean instance after any bean
     * initialization callbacks (like InitializingBean's afterPropertiesSet
     * or a custom init-method).
     * The returned bean instance may be a wrapper around the original.
     */
    @Nullable
    default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }
}
  1. postProcessBeforeInitialization方法在调用bean的init-method之前执行
  2. postProcessAfterInitialization方法在调用bean的init-method之后执行
  3. 任何一个方法可对现有bean实例做进一步的修改
  4. 任何一个方法可返回新的bean实例,用来替代现有的bean实例

第四点即是AOP生成当前Bean代理的方法

InstantiationAwareBeanPostProcessor

该接口继承自BeanPostProcessor,其同样有两个方法,一个在创建bean实例之前调用([createBean](#AbstractAutowireCapableBeanFactory createBean “wikilink”)),一个在创建bean实例之后、属性注入之前调用([属性注入](#属性注入 - AbstractAutowireCapableBeanFactory populateBean “wikilink”))

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public interface InstantiationAwareBeanPostProcessor extends BeanPostProcessor {  
    @Nullable  
    default Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException {  
        return null;  
    }  
  
    default boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException {  
        return true;  
    }  
  
    @Nullable  
    default PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName) throws BeansException {  
        return pvs;  
    }  
}

AbstractApplicationContext#registerBeanPostProcessors,其内部逻辑与BeanFactoryPostProcessor的注册逻辑类似:

  1. 找到所有BeanPostProcessor并实例化
  2. 按照实现的Ordered接口分别放入priorityOrderedPostProcessors、orderedPostProcessors、nonOrderedPostProcessors并各自排序
  3. 如果实现了MergedBeanDefinitionPostProcessor则放入internalPostProcessors并排序
  4. 按顺序依次注册priorityOrderedPostProcessors、orderedPostProcessors、nonOrderedPostProcessors
  5. 最后注册internalPostProcessors

MergedBeanDefinitionPostProcessor其有一个接口postProcessMergedBeanDefinition,在bean实例化完成后属性注入之前被调用,可以用来对当前的BeanDefinition做进一步的修改,如增加PropertyValue等,实现特殊的属性依赖注入,参考[BeanDefinition后处理](#BeanDefinition后处理 - AbstractAutowireCapableBeanFactory applyMergedBeanDefinitionPostProcessors “wikilink”)与[属性注入](#属性注入 - AbstractAutowireCapableBeanFactory populateBean “wikilink”)

初始化MessageSource - AbstractApplicationContext#initMessageSource

Spring的MessageSource提供了国际化能力,在开发者未注册MessageSource的情况下Spring会提供一个默认的DelegatingMessageSource

初始化ApplicationEventMulticaster - AbstractApplicationContext#initApplicationEventMulticaster

Spring提供了一套事件(ApplicationEvent)的发布&订阅机制,开发者可自定义事件(继承ApplicationEvent),注册事件监听器来订阅消费事件(实现ApplicationListener 或使用@EventListener 注解),并使用ApplicationEventPublisher(直接依赖注入或者使用ApplicationEventPublisherAware)发送事件,使用示例可参考https://www.baeldung.com/spri…

其实ApplicationContext实现了ApplicationEventPublisher,跟踪其publishEvent方法会发现,最终调用了AbstractApplicationContext#applicationEventMulticaster.multicastEvent,开发者可以自行注册一个ApplicationEventMulticaster,如果没有Spring会提供一个默认的SimpleApplicationEventMulticaster

SimpleApplicationEventMulticaster#multicastEvent的逻辑比较简单,会根据事件的类型找到可以处理的所有ApplicationListener,依次调用它们的onApplicationEvent方法消费事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
    ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
    Executor executor = getTaskExecutor();
    for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
        if (executor != null) {
      // 设置了executor,则异步执行
            executor.execute(() -> invokeListener(listener, event));
        }
        else {
      // 否则同步执行
            invokeListener(listener, event);
        }
    }
}

默认情况下会同步、顺序的调用listeners的onApplicationEvent方法,只有设置了executor才会异步调用,不过这样的控制粒度比较粗,要么全部同步消费要么全部异步消费,比较细粒度的控制事件的消费有几种常用方法

  1. 使用@Async注解,独立控制某一listener异步消费(https://www.baeldung.com/spri…
  2. 自行编码,将onApplicationEvent逻辑放在线程中执行
  3. 注册自定义的ApplicationEventMulticaster,内部实现自己的同步、异步Event处理逻辑

注册ApplicationListener - AbstractApplicationContext#registerListeners

这里的逻辑比较简单

  1. 在BeanFactory中找到ApplicationListener类型的bean并实例化
  2. 调用ApplicationEventMulticaster#addApplicationListenerBean方法将ApplicationListeners注册进去

初始化所有非Lazy Bean - AbstractApplicationContext#finishBeanFactoryInitialization

对于Singleton Bean而言,实例化发生在首次getBean,但你是否有疑惑,我们只是注册了众多Singleton Bean,但在Spring初始化完成后所有的Singleton Bean(Lazy Bean除外)均已经完成实例化

回到AbstractApplicationContext#finishBeanFactoryInitialization,该函数会实现几个逻辑

  1. 如果自定义了ConversionService(另一种注入类型转换的方式)类型bean且bean-name为conversionService,则将其注册到BeanFactory中
  2. 如果BeanFactory中不存在EmbeddedValueResolverPropertyResourceConfigurer会注册一个PlaceholderResolvingStringValueResolver到BeanFactory中),则会注册一个默认的StringValueResolver用来处理 ${ ... }类型的值(Environment#resolvePlaceholders
  3. 找到所有非Lazy的Singleton BeanDefinition进行实例化(getBean
    1. 如果是FactoryBean,则在bean name前加上’&’,并实例化该FactoryBean,随后实例化真实的bean
    2. 如果不是FactoryBean,则直接实例化该bean
  4. 执行SmartInitializingSingleton实现类的afterSingletonsInstantiated方法

Refresh的后续动作 - AbstractApplicationContext#finishRefresh

除了一些中间状态需要清理外,还有两件比较特殊的地方

LifecycleProcessor - AbstractApplicationContext#initLifecycleProcessor

Spring提供了LifecycleProcessor用于监听BeanFactory的refresh及close,在BeanFactory的各阶段会调用LifecycleProcessoronFreshonClose方法

开发者可以自行注册LifecycleProcessor类型的bean,bean-name必须为"lifecycleProcessor",否则Spring会提供一个默认的DefaultLifecycleProcessor

之后则会触发LifecycleProcessoronFresh方法

除此之外,还可以监听ContextRefreshedEventContextClosedEvent消息

refresh事件

在BeanFactory初始化完成后,则会发出ContextRefreshedEvent事件

BeanFactory的销毁 - AbstractApplicationContext#registerShutdownHook

该函数用来注册BeanFactory的销毁逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public void registerShutdownHook() {  
    if (this.shutdownHook == null) {  
        this.shutdownHook = new Thread("SpringContextShutdownHook") {  
            public void run() {  
                synchronized(AbstractApplicationContext.this.startupShutdownMonitor) {  
                    AbstractApplicationContext.this.doClose();  
                }  
            }  
        };  
        Runtime.getRuntime().addShutdownHook(this.shutdownHook);  
    }  
  
}

其直接使用了java的addShutdownHook函数,在jvm进程正常退出的时候触发

AbstractApplicationContext#doClose函数定义了BeanFactory具体的销毁过程

  1. 发出ContextClosedEvent事件
  2. 触发LifecycleProcessoronClose方法
  3. 销毁bean,细节参考Bean销毁过程
  4. 由子类实现的AbstractApplicationContext#closeBeanFactoryAbstractApplicationContext#onClose方法

ASIDE

  • BeanDefinition的加载在[AbstractApplicationContext#obtainFreshBeanFactory](#加载BeanFactory - AbstractApplicationContext obtainFreshBeanFactory “wikilink”)中实现
  • TODO
    • #{ ... }类型值的解析由StandardBeanExpressionResolve实现
    • ${ ... }类型值的解析由PlaceholderResolvingStringValueResolver实现
    • Spring提供了众多默认的PropertyEditor,若需要自定义PropertyEditor可以通过注册CustomEditorConfigurer实现
    • Spring提供了众多Aware,若需要自定义Aware可以通过BeanPostProcessor实现
    • BeanFactoryPostProcessor用于在实例化bean之前对BeanFactory做额外的动作 如,PropertyResourceConfigurer用来将PlaceholderResolvingStringValueResolver注册到BeanFactory的embeddedValueResolvers中
  • [BeanDefinitionRegistryPostProcessor](#激活BeanFactoryPostProcessor - AbstractApplicationContext invokeBeanFactoryPostProcessors “wikilink”)用于在实例化bean之前(动态)注册额外的BeanDefinition ^fa1ce8
  • BeanPostProcessor用于在调用bean的init-method前后,对实例化完成的bean做一些额外的干预 如,CommonAnnotationBeanPostProcessor用来处理@PostConstructor,AbstractAdvisingBeanPostProcessor用来实现AOP

ApplicationContext具体实现类 - AnnotationConfigApplicationContext

1
2
3
4
5
public AnnotationConfigApplicationContext(Class<?>... componentClasses) { 
    this(); //1. 首先会调用自己的无参构造 
    register(componentClasses); //2. 然后注册我们传入的配置类 
    refresh(); //3. 最后进行刷新操作(关键) 
}

无参构造

1
2
3
4
5
6
7
8
public AnnotationConfigApplicationContext() {
        StartupStep createAnnotatedBeanDefReader = this.getApplicationStartup().start("spring.context.annotated-bean-reader.create");
      //创建AnnotatedBeanDefinitionReader对象,用于后续处理 @Bean 注解
        this.reader = new AnnotatedBeanDefinitionReader(this);
        createAnnotatedBeanDefReader.end();
      //创建ClassPathBeanDefinitionScanner对象,用于扫描类路径上的Bean
        this.scanner = new ClassPathBeanDefinitionScanner(this);
}

AnnotatedBeanDefinitionReader

1
2
3
4
5
6
7
8
public AnnotatedBeanDefinitionReader(BeanDefinitionRegistry registry, Environment environment) {
        Assert.notNull(registry, "BeanDefinitionRegistry must not be null");
        Assert.notNull(environment, "Environment must not be null");
        this.registry = registry;
        this.conditionEvaluator = new ConditionEvaluator(registry, environment, null);
      //这里注册了注解处理配置相关的后置处理器
        AnnotationConfigUtils.registerAnnotationConfigProcessors(this.registry);
}

这里会将ConfigurationClassPostProcessor后置处理器加入到BeanFactory中,它继承自BeanFactoryPostProcessor,也就是说一会会在BeanFactory初始化完成之后进行后置处理,同时这里也会注册一个AutowiredAnnotationBeanPostProcessor后置处理器到BeanFactory,它继承自BeanPostProcessor,用于处理后续生成的Bean对象,其实看名字就知道,这玩意就是为了处理@Autowire、@Value这种注解,用于自动注入

注册传入的配置类 - register

1
2
3
4
5
6
7
8
9
@Override
public void register(Class<?>... componentClasses) {
        Assert.notEmpty(componentClasses, "At least one component class must be specified");
        StartupStep registerComponentClass = this.getApplicationStartup().start("spring.context.component-classes.register")
                .tag("classes", () -> Arrays.toString(componentClasses));
      //使用我们上面创建的Reader注册配置类
        this.reader.register(componentClasses);
        registerComponentClass.end();
}

==TODO==

  • ☒ Spring AOP
  • ☐ 注解运行逻辑
  • ☒ AnnotationConfigApplicationContext - 与 配置类的关系 - 具体例子
  • ☐ BeanDefinitionReader和BeanDefinitionRegistry
  • ☐ 完善调用链图

    配置类的注册 - ConfigurationClassPostProcessor

ConfigurationClassPostProcessor继承自BeanDefinitionRegistryPostProcessor -> BeanFactoryPostProcessor,这个后置处理器是Spring中提供的,这是专门用于处理配置类的后置处理器,其中ImportBeanDefinitionRegistrar,还有ImportSelector都是靠它来处理

ConfigurationClassPostProcessor#postProcessBeanDefinitionRegistry

内部调用 processConfigBeanDefinitions(BeanDefinitionRegistry) 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void processConfigBeanDefinitions(BeanDefinitionRegistry registry) {
    // 将Spring认为可能是配置类的候选类加入candidates,例如@Configuration、@Component
    // @ComponentScan、@Import,以及通过实现ImportSelector或ImportBeanDefinitionRegistrar间接引入的配置
    List<BeanDefinitionHolder> configCandidates = new ArrayList<>();
    // 直接取出所有已注册Bean的名称
    String[] candidateNames = registry.getBeanDefinitionNames();
    for (String beanName : candidateNames) {
       // 依次拿到对应的Bean定义,然后进行判断
       BeanDefinition beanDef = registry.getBeanDefinition(beanName);
       if (beanDef.getAttribute(ConfigurationClassUtils.CONFIGURATION_CLASS_ATTRIBUTE) != null) {
          ...
       }
       // 检查一个Bean定义是否符合作为配置类的条件,即使它没有直接使用@Configuration注解
       else if (ConfigurationClassUtils.checkConfigurationClassCandidate(beanDef, this.metadataReaderFactory)) {
          configCandidates.add(new BeanDefinitionHolder(beanDef, beanName));
       }
    }
    // 如果一个打了 @Configuration 的类都没发现,直接返回
    if (configCandidates.isEmpty()) {
       return;
    }
    // 对所有的配置类依据 @Order 进行排序
    configCandidates.sort((bd1, bd2) -> {
       int i1 = ConfigurationClassUtils.getOrder(bd1.getBeanDefinition());
       int i2 = ConfigurationClassUtils.getOrder(bd2.getBeanDefinition());
       return Integer.compare(i1, i2);
    });
    ...
    // 这里使用do-while语句依次解析所有的配置类
    ConfigurationClassParser parser = new ConfigurationClassParser(
          this.metadataReaderFactory, this.problemReporter, this.environment,
          this.resourceLoader, this.componentScanBeanNameGenerator, registry);
    Set<BeanDefinitionHolder> candidates = new LinkedHashSet<>(configCandidates);
    Set<ConfigurationClass> alreadyParsed = new HashSet<>(configCandidates.size());
    do {
       StartupStep processConfig = this.applicationStartup.start("spring.context.config-classes.parse");
       //这里就会通过Parser解析配置类中大部分内容,包括我们之前遇到的@Import注解
             parser.parse(candidates);
             parser.validate();
       //解析完成后读取到所有的配置类
       Set<ConfigurationClass> configClasses = new LinkedHashSet<>(parser.getConfigurationClasses());
             configClasses.removeAll(alreadyParsed);
       ... 
       //将上面读取的配置类加载为Bean
       this.reader.loadBeanDefinitions(configClasses);
       ...
    }
    while (!candidates.isEmpty());
    ...
}

ConfigurationClassParser#parse(candidates)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void parse(Set<BeanDefinitionHolder> configCandidates) {
    for (BeanDefinitionHolder holder : configCandidates) {
        BeanDefinition bd = holder.getBeanDefinition();
        try {
            if (bd instanceof AnnotatedBeanDefinition annotatedBeanDef) {
                parse(annotatedBeanDef, holder.getBeanName());
            }
            else if (bd instanceof AbstractBeanDefinition abstractBeanDef && abstractBeanDef.hasBeanClass()) {
                parse(abstractBeanDef.getBeanClass(), holder.getBeanName());
            }
            else {
                parse(bd.getBeanClassName(), holder.getBeanName());
            }
        }
        catch (BeanDefinitionStoreException ex) {
            throw ex;
        }
        catch (Throwable ex) {
            throw new BeanDefinitionStoreException(
                    "Failed to parse configuration class [" + bd.getBeanClassName() + "]", ex);
        }
    }

    this.deferredImportSelectorHandler.process();
}

内部遍历candidates中的每一个BeanDefinitionHolder,调用parse的多态方法,最终调用ConfigurationClassParser#processConfigurationClass,最后调用deferredImportSelectorHandler.process()处理DeferredImportSelector相关的Bean注册 ^processConfigurationClass

首先判断条件注释,即处理@Conditional相关注解

然后将不同来源的配置类源信息通过asSourceClass进行封装,交给最核心的调用[doProcessConfigurationClass](#ConfigurationClassParser doProcessConfigurationClass “wikilink”)

将配置类ConfigurationClass实例化为SourceClass。这样做的目的是为了让后续的处理逻辑能够通过SourceClass访问到配置类中定义的所有相关信息(比如注解信息,Meta-info),并进行相应的处理。例如,通过SourceClass可以读取配置类上的@ComponentScan注解,并执行组件扫描;读取@Import注解,并处理导入的配置类或组件;读取@Bean方法,并注册对应的Bean定义等。

ConfigurationClassParser#doProcessConfigurationClass

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@Nullable
protected final SourceClass doProcessConfigurationClass(
        ConfigurationClass configClass, SourceClass sourceClass, Predicate<String> filter)
        throws IOException {

    if (configClass.getMetadata().isAnnotated(Component.class.getName())) {
        // Recursively process any member (nested) classes first
        processMemberClasses(configClass, sourceClass, filter);
    }

    // Process any @PropertySource annotations
    for (AnnotationAttributes propertySource : AnnotationConfigUtils.attributesForRepeatable(
            sourceClass.getMetadata(), org.springframework.context.annotation.PropertySource.class,
            PropertySources.class, true)) {
        if (this.propertySourceRegistry != null) {
            this.propertySourceRegistry.processPropertySource(propertySource);
        }
        else {
            logger.info("Ignoring @PropertySource annotation on [" + sourceClass.getMetadata().getClassName() +
                    "]. Reason: Environment must implement ConfigurableEnvironment");
        }
    }

    // Search for locally declared @ComponentScan annotations first.
    Set<AnnotationAttributes> componentScans = AnnotationConfigUtils.attributesForRepeatable(
            sourceClass.getMetadata(), ComponentScan.class, ComponentScans.class,
            MergedAnnotation::isDirectlyPresent);

    // Fall back to searching for @ComponentScan meta-annotations (which indirectly
    // includes locally declared composed annotations).
    if (componentScans.isEmpty()) {
        componentScans = AnnotationConfigUtils.attributesForRepeatable(sourceClass.getMetadata(),
                ComponentScan.class, ComponentScans.class, MergedAnnotation::isMetaPresent);
    }

    if (!componentScans.isEmpty()) {
        List<Condition> registerBeanConditions = collectRegisterBeanConditions(configClass);
        if (!registerBeanConditions.isEmpty()) {
            throw new ApplicationContextException(
                    "Component scan could not be used with conditions in REGISTER_BEAN phase: " + registerBeanConditions);
        }
        for (AnnotationAttributes componentScan : componentScans) {
            // The config class is annotated with @ComponentScan -> perform the scan immediately
            Set<BeanDefinitionHolder> scannedBeanDefinitions =
                    this.componentScanParser.parse(componentScan, sourceClass.getMetadata().getClassName());
            // Check the set of scanned definitions for any further config classes and parse recursively if needed
            for (BeanDefinitionHolder holder : scannedBeanDefinitions) {
                BeanDefinition bdCand = holder.getBeanDefinition().getOriginatingBeanDefinition();
                if (bdCand == null) {
                    bdCand = holder.getBeanDefinition();
                }
                if (ConfigurationClassUtils.checkConfigurationClassCandidate(bdCand, this.metadataReaderFactory)) {
                    parse(bdCand.getBeanClassName(), holder.getBeanName());
                }
            }
        }
    }

    // Process any @Import annotations
    processImports(configClass, sourceClass, getImports(sourceClass), filter, true);

    // Process any @ImportResource annotations
    AnnotationAttributes importResource =
            AnnotationConfigUtils.attributesFor(sourceClass.getMetadata(), ImportResource.class);
    if (importResource != null) {
        String[] resources = importResource.getStringArray("locations");
        Class<? extends BeanDefinitionReader> readerClass = importResource.getClass("reader");
        for (String resource : resources) {
            String resolvedResource = this.environment.resolveRequiredPlaceholders(resource);
            configClass.addImportedResource(resolvedResource, readerClass);
        }
    }

    // Process individual @Bean methods
    Set<MethodMetadata> beanMethods = retrieveBeanMethodMetadata(sourceClass);
    for (MethodMetadata methodMetadata : beanMethods) {
        configClass.addBeanMethod(new BeanMethod(methodMetadata, configClass));
    }

    // Process default methods on interfaces
    processInterfaces(configClass, sourceClass);

    // Process superclass, if any
    if (sourceClass.getMetadata().hasSuperClass()) {
        String superclass = sourceClass.getMetadata().getSuperClassName();
        if (superclass != null && !superclass.startsWith("java")) {
            boolean superclassKnown = this.knownSuperclasses.containsKey(superclass);
            this.knownSuperclasses.add(superclass, configClass);
            if (!superclassKnown) {
                // Superclass found, return its annotation metadata and recurse
                return sourceClass.getSuperClass();
            }
        }
    }

    // No superclass -> processing is complete
    return null;
}

该函数依次解决如下问题:

  • 处理@Component注解
  • 处理@PropertySource和@PropertySources注解
  • 处理@ComponentScan和@ComponentScans
  • 处理@Import注解
  • 处理@ImportResource注解
  • 处理@Bean注解的方法
  • 处理接口上的默认方法和超类

其中的核心是处理@Import注解,通过调用 [ConfigurationClassParser#processImports](#ConfigurationClassParser processImports “wikilink”)

ConfigurationClassParser#processImports

注意其第三个入参Collection<SourceClass> importCandidates,它是通过调用getImports(sourceClass)方法,从给定的sourceClass中提取所有@Import注解指定的类,如果sourceClass是普通的配置类,直接通过isEmpty()返回

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private void processImports(ConfigurationClass configClass, SourceClass currentSourceClass,
        Collection<SourceClass> importCandidates, Predicate<String> filter, boolean checkForCircularImports) {

    if (importCandidates.isEmpty()) {
        return;
    }

    if (checkForCircularImports && isChainedImportOnStack(configClass)) {
        this.problemReporter.error(new CircularImportProblem(configClass, this.importStack));
    }
    else {
        this.importStack.push(configClass);
        try {
            for (SourceClass candidate : importCandidates) {
                if (candidate.isAssignable(ImportSelector.class)) {
                    // Candidate class is an ImportSelector -> delegate to it to determine imports
                    Class<?> candidateClass = candidate.loadClass();
                    ImportSelector selector = ParserStrategyUtils.instantiateClass(candidateClass, ImportSelector.class,
                            this.environment, this.resourceLoader, this.registry);
                    Predicate<String> selectorFilter = selector.getExclusionFilter();
                    if (selectorFilter != null) {
                        filter = filter.or(selectorFilter);
                    }
                    if (selector instanceof DeferredImportSelector deferredImportSelector) {
                        this.deferredImportSelectorHandler.handle(configClass, deferredImportSelector);
                    }
                    else {
                        String[] importClassNames = selector.selectImports(currentSourceClass.getMetadata());
                        Collection<SourceClass> importSourceClasses = asSourceClasses(importClassNames, filter);
                        processImports(configClass, currentSourceClass, importSourceClasses, filter, false);
                    }
                }
                else if (candidate.isAssignable(ImportBeanDefinitionRegistrar.class)) {
                    // Candidate class is an ImportBeanDefinitionRegistrar ->
                    // delegate to it to register additional bean definitions
                    Class<?> candidateClass = candidate.loadClass();
                    ImportBeanDefinitionRegistrar registrar =
                            ParserStrategyUtils.instantiateClass(candidateClass, ImportBeanDefinitionRegistrar.class,
                                    this.environment, this.resourceLoader, this.registry);
                    configClass.addImportBeanDefinitionRegistrar(registrar, currentSourceClass.getMetadata());
                }
                else {
                    // Candidate class not an ImportSelector or ImportBeanDefinitionRegistrar ->
                    // process it as an @Configuration class
                    this.importStack.registerImport(
                            currentSourceClass.getMetadata(), candidate.getMetadata().getClassName());
                    processConfigurationClass(candidate.asConfigClass(configClass), filter);
                }
            }
        }
        catch (BeanDefinitionStoreException ex) {
            throw ex;
        }
        catch (Throwable ex) {
            throw new BeanDefinitionStoreException(
                    "Failed to process import candidates for configuration class [" +
                    configClass.getMetadata().getClassName() + "]: " + ex.getMessage(), ex);
        }
        finally {
            this.importStack.pop();
        }
    }
}

代码遍历每一个@Import注解指定的候选类,根据不同类型进行处理

  • ImportSelector实现
  • ImportSelector
  • DeferredImportSelector
  • ImportBeanDefinitionRegistar实现
  • 普通的配置类
针对ImportSelector

通过selector.selectImports()asSourceClasses()方法将需要导入的类重新封装为SourceClass,递归调用processImports

针对DeferredImportSelector

通过调用ConfigurationClassParser的内部类DeferredImportSelectorHandler#handle()方法,将其封装为DeferredImportSelectorHolder ,加入待处理的List - deferredImportSelectors

ConfigurationClassParser#parse[处理完所有候选配置类后](#ConfigurationClassParser parse(candidates) “wikilink”),调用DeferredImportSelectorHandler#process()方法,该方法将加入deferredImportSelectors中的所有DeferredImportSelectorHolder执行内部类的DeferredImportSelectorGroupingHandler#register方法,得到包装好的、已经分组完毕的DeferredImportSelectorGrouping,然后调用DeferredImportSelectorGroupingHandler#processGroupImports(),处理组内所有的延迟导入 (DeferredImportSelector)

DeferredImportSelectorGroupingHandler#register
1
2
3
4
5
6
7
8
9
void register(DeferredImportSelectorHolder deferredImport) {
            Class<? extends Group> group = deferredImport.getImportSelector().getImportGroup();
            DeferredImportSelectorGrouping grouping = this.groupings.computeIfAbsent(
                    (group != null ? group : deferredImport),
                    key -> new DeferredImportSelectorGrouping(createGroup(group)));
            grouping.add(deferredImport);
            this.configurationClasses.put(deferredImport.getConfigurationClass().getMetadata(),
                    deferredImport.getConfigurationClass());
        }
  • 首先尝试获取DeferredImportSelector指定的导入组 (ImportGroup),如果没有指定特定的导入组,则使用DeferredImportSelector本身作为组的Key
  • 尝试从一个名为groupings的映射中获取或创建一个与导入组对应的DeferredImportSelectorGrouping对象。如果映射中尚未存在与当前组对应的分组,那么将创建一个新的分组,并将其加入到映射中
    • 注意,此处的Group逻辑是将DeferredImportSelector.Group这个内部接口包装到ConfigurationClassParser.DeferredImportSelectorGourping这个内部类中,其内部维护了一个DeferredImportSelector.Group对象和List<DeferredImportSelectorHolder>对象
  • 调用DeferredImportSelectGrouping#add(DeferredImportSelectorHolder),将DeferredImportSelectorHolder加入内部类维护的Grouping中 (静态类)
  • 最后,代码将当前DeferredImportSelectorHolder对应的配置类(ConfigurationClass)及其元数据添加到一个名为configurationClasses的映射中。这确保了后续能够快速访问到与特定DeferredImportSelector相关联的配置类
DeferredImportSelectorGroupingHandler#processGroupImports
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
void processGroupImports() {
    for (DeferredImportSelectorGrouping grouping : this.groupings.values()) {
        Predicate<String> filter = grouping.getCandidateFilter();
        grouping.getImports().forEach(entry -> {
            ConfigurationClass configurationClass = this.configurationClasses.get(entry.getMetadata());
            try {
                processImports(configurationClass, asSourceClass(configurationClass, filter),
                        Collections.singleton(asSourceClass(entry.getImportClassName(), filter)),
                        filter, false);
            }
            catch (BeanDefinitionStoreException ex) {
                throw ex;
            }
            catch (Throwable ex) {
                throw new BeanDefinitionStoreException(
                        "Failed to process import candidates for configuration class [" +
                                configurationClass.getMetadata().getClassName() + "]", ex);
            }
        });
    }
}
  • 遍历保存在Groups - DeferredImportSelectorGroupingHandler中的 DeferredImportSelectorGroup对象,调用 DeferredImportSelectorGroup#getImports()方法
  • DeferredImportSelectorGroup#getImports()方法调用DeferredImportSelectorGroup中维护的真实的Group - DeferredImportSelector.Group#process方法,然后返回含有meta-infoEntry
  • 使用内部维护的Map(在register中put),根据Entry.meta-info得到对应的ConfigurationClass ,调用ConfigurationClassParser#processImports,和[前面](#ConfigurationClassParser processImports “wikilink”)一样递归调用进行处理

所以根据以上分析,DeferredImportSelector最终的处理逻辑在于DeferredImportSelector.Group#process() ^db8805

针对ImportBeanDefinitionRegistar
1
2
3
4
5
6
7
8
public interface ImportBeanDefinitionRegistrar {  
    default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {  
        this.registerBeanDefinitions(importingClassMetadata, registry);  
    }  
  
    default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {  
    }  
}

调用ConfigurationClass#addImportBeanDefinitionRegistrar方法,将对应的实例加入configClass对应的Collection类中,后续在[loadBeanDefinitions](#ConfigurationClassBeanDefinitionReader loadBeanDefinitions “wikilink”)中调用其registerBeanDefinitions,注册相应的BeanDefinition

针对普通配置类

不使用特殊机制,直接递归调用processConfigurationClass

ConfigurationClassParser#getConfigurationClasses

返回从前面得到的所有待配置的配置类

ConfigurationClassBeanDefinitionReader#loadBeanDefinitions

^98f726

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void loadBeanDefinitions(Set<ConfigurationClass> configurationModel) {
        TrackedConditionEvaluator trackedConditionEvaluator = new TrackedConditionEvaluator();
        for (ConfigurationClass configClass : configurationModel) {
            loadBeanDefinitionsForConfigurationClass(configClass, trackedConditionEvaluator);
        }
    }

/**
 * Read a particular {@link ConfigurationClass}, registering bean definitions
 * for the class itself and all of its {@link Bean} methods.
 */
private void loadBeanDefinitionsForConfigurationClass(
        ConfigurationClass configClass, TrackedConditionEvaluator trackedConditionEvaluator) {

    if (trackedConditionEvaluator.shouldSkip(configClass)) {
        String beanName = configClass.getBeanName();
        if (StringUtils.hasLength(beanName) && this.registry.containsBeanDefinition(beanName)) {
            this.registry.removeBeanDefinition(beanName);
        }
        this.importRegistry.removeImportingClass(configClass.getMetadata().getClassName());
        return;
    }

    if (configClass.isImported()) {
        registerBeanDefinitionForImportedConfigurationClass(configClass);
    }
    for (BeanMethod beanMethod : configClass.getBeanMethods()) {
        loadBeanDefinitionsForBeanMethod(beanMethod);
    }

    loadBeanDefinitionsFromImportedResources(configClass.getImportedResources());
    loadBeanDefinitionsFromRegistrars(configClass.getImportBeanDefinitionRegistrars());
}

通过遍历每一个配置类,调用loadBeanDefinitionsForConfigurationClass方法 ^f6a27a

  • registerBeanDefinitionForImportedConfigurationClass(configClass) 注册配置类自身
  • loadBeanDefinitionsForBeanMethod(beanMethod) 注册@Bean注解标识的方法
  • loadBeanDefinitionsFromImportedResources(configClass.getImportedResources()); 注册@ImportResource引入的XML配置文件中读取的bean定义
  • loadBeanDefinitionsFromRegistrars(configClass.getImportBeanDefinitionRegistrars()); 注册configClass中经过解析后保存的所有ImportBeanDefinitionRegistrar,注册对应的BeanDefinition

AOP

AOP的实现类是AnnotationAwareAspectJAutoProxyCreator,其是BeanPostProcessor的实现类,具体来说,是InstantiationAwareBeanPostProcessor的实现类,在实例化Bean过程中,通过调用BeanPostProcessor中的实例化前处理器进行短路,得到相应的代理Bean

@EnableAspectJAutoProxy

1
2
3
4
5
6
7
8
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({AspectJAutoProxyRegistrar.class})
public @interface EnableAspectJAutoProxy {
    boolean proxyTargetClass() default false;
    boolean exposeProxy() default false;
}

这个注解使用@Import导入了AspectJAutoProxyRegistrar,其是ImportBeanDefinitionRegistrar的实现类,会在处理配置类相应@Import机制的时候将AnnotationAwareAspectJAutoProxyCreator实现类注册到容器中,即注册到BeanDefinition中,实现相应的实例化前处理器功能 (InstantiationAwareBeanPostProcessor)

多线程基础

多线程基础

并发与并行

并发执行

bingfa 同一时间只能处理一个任务,每个任务轮着做(时间片轮转),只要我们单次处理分配的时间足够的短,在宏观看来,就是三个任务在同时进行。 而我们Java中的线程,正是这种机制,当我们需要同时处理上百个上千个任务时,很明显CPU的数量是不可能赶得上我们的线程数的,所以说这时就要求我们的程序有良好的并发性能,来应对同一时间大量的任务处理

并行执行

bingxing 突破了同一时间只能处理一个任务的限制,同一时间可以做多个任务,比如分布式计算模型MapReduce

锁机制

使用synchronized关键字来实现锁,其一定是和某个对象关联的,即提供一个对象来作为锁本身,究其根本在于每个对象的对象头信息中的Mark Word

在将synchronized实现锁的代码变成字节码后,我们发现,其调用了monitorentermonitorexit指令,分别对应加锁和释放锁,且其通过两次monitorexit来实现异常处理monitor

对于Mark Word,其在不同状态下,它存储的数据结构不同: objectHead ^markword

重量级锁

在JDK6之前,synchronized被称为重量级锁,monitor依赖于底层操作系统的Lock实现,Java的线程是映射到操作系统的原生线程上,切换成本较高

每一个对象都有一个monitor相关联,在JVM中,monitor是由ObjetMonitor实现的:

ObjectMonitor() {
    _header       = NULL;
    _count        = 0; //记录个数
    _waiters      = 0,
    _recursions   = 0;
    _object       = NULL;
    _owner        = NULL;
    _WaitSet      = NULL; //处于wait状态的线程,会被加入到_WaitSet
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ;
    FreeNext      = NULL ;
    _EntryList    = NULL ; //处于等待锁block状态的线程,会被加入到该列表
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;
}

而每个等待锁的线程都会被封装成ObjectWaiter对象,进入如下机制: synchronized1 ObjectWaiter首先会进入 Entry Set等待,当线程获取到对象的monitor后进入 The Owner 区域并把monitor中的owner变量设置为当前线程,同时monitor中的计数器count加1,若线程调用wait()方法,将释放当前持有的monitorowner变量恢复为nullcount自减1,同时该线程进入 WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor并复位变量的值,以便其他线程进入获取对象的monitor

但是在大多数应用上,每一个线程占用同步代码块的时间并不是很长,完全没有必要将竞争中的线程挂起然后又唤醒,并且现代CPU基本都是多核心运行的,因此引入了自旋锁 synchronized2

对于自旋锁,它不会将处于等待状态的线程挂起,而是通过无限循环的方式不断检测是否能获取锁,并且在等待时间太长的情况下,为了避免CPU继续运算循环浪费资源,会升级为重量级锁机制,自旋的次数限制是可以自适应变化的,比如在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行,那么这次自旋也是有可能成功的,所以会允许自旋更多次。当然,如果某个锁经常都自旋失败,那么有可能会不再采用自旋策略,而是直接使用重量级锁

轻量级锁

轻量级锁的目标是,在无竞争的情况下减少重量级锁的性能消耗(赌一手同一时间只有一个线程在占用资源),不向操作系统申请互斥量等

在即将开始执行同步代码块中的内容时,会首先检查对象的Mark Word,查看锁对象是否被其他线程占用,如果没有任何线程占用,那么会在当前线程中所处的栈帧中建立一个名为锁记录(Lock Record)的空间,用于复制并存储对象目前的Mark Word信息(官方称为Displaced Mark Word), 接着,虚拟机将使用CAS操作将对象的Mark Word更新为轻量级锁状态(数据结构变为指向Lock Record的指针,指向的是当前的栈帧)

如果CAS操作失败了的话,那么说明可能这时有线程已经进入这个同步代码块了,这时虚拟机会再次检查对象的Mark Word,是否指向当前线程的栈帧,如果是,说明不是其他线程,而是当前线程已经有了这个对象的锁,直接放心大胆进同步代码块即可。如果不是,那确实是被其他线程占用了。 这时,轻量级锁一开始的想法就是错的(这时有对象在竞争资源,已经赌输了),所以说只能将锁膨胀为重量级锁,按照重量级锁的操作执行(注意锁的膨胀是不可逆的) lightLock

解锁过程同样采取CAS算法,如果对象的MarkWord仍然指向线程的锁记录,那么就用CAS操作把对象的MarkWord和复制到栈帧中的Displaced Mark Word进行交换。如果替换失败,说明其他线程尝试过获取该锁,在释放锁的同时,需要唤醒被挂起的线程。

总体来说,流程为:轻量级锁 -> 失败 -> 自适应自旋锁 -> 失败 -> 重量级锁

[!NOTE] 无锁机制 在并发执行过程中,Double-Check、自旋等待+CAS修改是在不获取重量锁,即OS相关的线程操作时,保证原子性和正确性的重要手段,在AQS中也是如此

偏向锁

偏向锁比轻量级锁更纯粹,实际上是专门为单个线程而生的,当某个线程第一次获得锁时,如果接下来都没有其他线程获取此锁,那么持有锁的线程将不再需要进行同步操作。可以从之前的Mark Word结构中看到,偏向锁也会通过CAS操作记录线程的ID,如果一直都是同一个线程获取此锁,那么完全没有必要在进行额外的CAS操作。当然,如果有其他线程来抢了,那么偏向锁会根据当前状态,决定是否要恢复到未锁定或是膨胀为轻量级锁。

所以,最终的锁等级为:未锁定 < 偏向锁 < 轻量级锁 < 重量级锁

值得注意的是,如果对象通过调用hashCode()方法计算过对象的一致性哈希值,那么它是不支持偏向锁的,会直接进入到轻量级锁状态,因为Hash是需要被保存的,而偏向锁的Mark Word数据结构,无法保存Hash值;如果对象已经是偏向锁状态,再去调用hashCode()方法,那么会直接将锁升级为重量级锁,并将哈希值存放在monitor(有预留位置保存)中。 biasLock

锁消除和锁粗化

锁消除和锁粗化都是在运行时的一些优化方案,比如我们某段代码虽然加了锁,但是在运行时根本不可能出现各个线程之间资源争夺的情况,这种情况下,完全不需要任何加锁机制,所以锁会被消除。锁粗化则是我们代码中频繁地出现互斥同步操作,比如在一个循环内部加锁,这样明显是非常消耗性能的,所以虚拟机一旦检测到这种操作,会将整个同步范围进行扩展

JMM内存模型

JVM中的内存模型是虚拟机规范对整个内存区域的规划,而Java内存模型,是在JVM内存模型之上的抽象模型,具体实现依然是基于JVM内存模型实现的

Java内存模型

Java中采取了与OS中相似的高速缓存与主内存的解决方式,通过SaveLoad操作实现缓存一致性协议 JMM

JMM(Java Memory Model)内存模型规定如下:

  • 所有的变量全部存储在主内存(注意这里包括下面提到的变量,指的都是会出现竞争的变量,包括成员变量、静态变量等,而局部变量这种属于线程私有,不包括在内)
  • 每条线程有着自己的工作内存(可以类比CPU的高速缓存)线程对变量的所有操作,必须在工作内存中进行,不能直接操作主内存中的数据。
  • 不同线程之间的工作内存相互隔离,如果需要在线程之间传递内容,只能通过主内存完成,无法直接访问对方的工作内存。

也就是说,每一条线程如果要操作主内存中的数据,那么得先拷贝到自己的工作内存中,并对工作内存中数据的副本进行操作,操作完成之后,也需要从工作副本中将结果拷贝回主内存中,具体的操作就是Save(保存)和Load(加载)操作。

结合JVM中的内存规划,有:

  • 主内存:对应堆中存放对象的实例的部分。
  • 工作内存:对应线程的虚拟机栈的部分区域,虚拟机可能会对这部分内存进行优化,将其放在CPU的寄存器或是高速缓存中。比如在访问数组时,由于数组是一段连续的内存空间,所以可以将一部分连续空间放入到CPU高速缓存中,那么之后如果我们顺序读取这个数组,那么大概率会直接缓存命中。

重排序

在编译或执行时,为了优化程序的执行效率,编译器或处理器常常会对指令进行重排序,有以下情况:

  1. 编译器重排序:Java编译器通过对Java代码语义的理解,根据优化规则对代码指令进行重排序。
  2. 机器指令级别的重排序:现代处理器很高级,能够自主判断和变更机器指令的执行顺序。

在多线程情况下就会有抢占和顺序的问题

volatile关键字

首先引入三个概念:

  • 原子性:就是要做什么事情要么做完,要么就不做,不存在做一半的情况。
  • 可见性:指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。
  • 有序性:即程序执行的顺序按照代码的先后顺序执行。

一个代码案例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class Main {
    private static int a = 0;
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (a == 0);
            System.out.println("线程结束!");
        }).start();

        Thread.sleep(1000);
        System.out.println("正在修改a的值...");
        a = 1;   //很明显,按照我们的逻辑来说,a的值被修改那么另一个线程将不再循环
    }
}

在该案例中,虽然主线程中修改了a的值,但是另一个线程并不知道a的值发生了改变,所以循环中依然是使用旧值在进行判断,因此,普通变量是不具有可见性的。

一种情况是对于该代码块进行synchronized加锁,因为其也符合#Happens-Before原则,会添加相应的[#内存屏障(Memory Barriers)](#内存屏障(Memory Barriers) “wikilink”)保证可见性

另一种解决方法是使用volatile关键字。此关键字的第一个作用,就是保证变量的可见性。当写一个volatile变量时,JMM会把该线程本地内存中的变量强制刷新到主内存中去,并且这个写会操作会导致其他线程中的volatile变量缓存无效,这样,另一个线程修改了这个变量时,当前线程会立即得知,并将工作内存中的变量更新为最新的版本;但是该关键字无法解决原子性问题,因为在底层字节码实现中是拆分为多个CPU指令执行的。该关键字的第二个作用就是禁止指令重排,保证有序性,具体来说,是通过在指令序列中插入内存屏障(Memory Barriers)来禁止特定类型的处理器重排序。

我们可以认为,volatilesynchronized以相同的方式解决了数据可见性,指令有序性,但是后者解决了原子性,代价为获取锁的额外开销

[!NOTE] volatile结合内存屏障实现的可见性 在 Java 中,对 volatile 变量的读操作确保了所有写入该变量的操作对其他线程可见。这是通过在读操作后加入 LoadLoadLoadStore 内存屏障来实现的,这些屏障确保对 volatile 变量的读取不会从缓存中获取过时的数据。下面是这两个内存屏障工作机制的具体解释:

LoadLoad 屏障

  • 作用: LoadLoad 屏障放在 volatile 读操作之后,确保所有后续的读操作(包括对 volatile 和非 volatile 变量的读取)必须在读取 volatile 变量之后执行。这样的排序确保了对 volatile 变量的读取操作完成后,任何依赖于该变量的读取都能观察到其最新值。
  • 实现: 在处理器层面,这个屏障防止处理器将后续的读指令重新排序到 volatile 读之前,从而保证了内存操作的正确顺序。

LoadStore 屏障

  • 作用: LoadStore 屏障确保在读取 volatile 变量之后的任何写操作必须等到 volatile 变量读取完成后才能执行。这保证了任何基于 volatile 变量读取结果的写操作都不会过早地执行,从而维护了读写之间的依赖关系。
  • 实现: 这个屏障阻止处理器将读取 volatile 变量后的写操作提前到读操作之前,确保了依赖于 volatile 变量的状态的写操作正确地观察到了 volatile 读取的最新结果。

保证不读取过时的数据

当线程进行 volatile 变量的读取时,LoadLoadLoadStore 屏障一起工作,确保了以下几点:

  • CPU 在执行读操作前必须先确认任何可能的写操作已同步到主内存,这通常涉及到刷新或检查本地缓存行的状态,确保它们与主内存保持一致。
  • 如果本地缓存行被标记为无效(因为其他处理器已经修改了对应于 volatile 变量的内存地址),则处理器必须从主内存重新加载数据,而不是使用缓存中的过时数据。 这些内存屏障的合作最终保证了 volatile 读操作总是从主内存中获取最新数据,而不是从可能包含过时数据的本地缓存中读取。这样的机制是 volatile 变量能够作为轻量级同步机制提供内存可见性和有序性保证的关键。

内存屏障(Memory Barriers)

内存屏障(Memory Barrier)又称内存栅栏,是一个CPU指令,它的作用有两个:

  1. 保证特定操作的顺序 由于编译器和处理器都能执行指令重排的优化,如果在指令间插入一条Memory Barrier则会告诉编译器和CPU,不管什么指令都不能和这条Memory Barrier指令重排序。
  2. 保证某些变量的内存可见性(volatile的内存可见性,其实就是依靠这个实现的) Memory Barrier的另外一个作用是强制刷出各种CPU的缓存数据,因此任何CPU上的线程都能读取到这些数据的最新版本。通常涉及到刷新一些本地处理器缓存中的值到主内存,或者无效化缓存项,迫使后续的读操作从主内存中重新加载数据。

内存屏障主要分为以下几类:

  1. LoadLoad:确保之前的读取操作完成后,后续的读取操作才能进行。
  2. StoreStore:确保之前的写入操作完成后,后续的写入操作才能进行。
  3. LoadStore:确保之前的读取操作完成后,后续的写入操作才能进行。
  4. StoreLoad:确保之前的写入操作完成后,后续的读取操作才能进行。这是最强的内存屏障,确保所有之前的存储都对之后的读取可见。

在Java中,volatilesynchronized关键字在某种程度上用来实现类似内存屏障的功能,尽管它们的主要目的和使用场景有所不同。 #### volatile

在Java中,volatile变量的读写会插入内存屏障指令,保证了volatile变量的可见性和部分顺序性。当一个字段被声明为volatile,编译器和运行时都会在访问该字段时插入内存屏障,确保不会有指令重排序发生,使得一个线程写入的值对其他线程立即可见。

  • 写入volatile变量相当于插入一个StoreStore屏障后跟一个StoreLoad屏障。
  • 读取volatile变量会插入一个LoadLoad屏障后跟一个LoadStore屏障。

synchronized

synchronized关键字在Java中用于实现线程间的互斥和内存一致性。当进入一个synchronized块时,会在开始处插入一个LoadLoadLoadStore屏障,确保之前的操作不会与进入的synchronized块重排序。退出synchronized块时,会插入一个StoreStoreStoreLoad屏障,确保synchronized块内的所有变化对接下来将要执行的操作可见。

对于==Java并发编程图册==中的例子(volatile读-写内存语义),是由于volatile关键字用在flag变量上产生的Happens-Before关系,即在volatile变量的写操作到该变量的读操作之间建立了一个内存可见的桥梁 ### Happens-Before原则

JMM提出了happens-before(先行发生)原则,定义一些禁止编译优化的场景,来向各位程序员做一些保证,只要我们是按照原则进行编程,那么就能够保持并发编程的正确性。

其基本定义为:在 Java 内存模型中,如果一个操作 A happens-before 另一个操作 B,那么 A 的结果对 B 是可见的,并且 A 的执行顺序在 B 之前。这种关系有助于解决多线程环境中的可见性问题和指令重排问题

常见的几种典型情况:

  • 程序顺序规则:同一个线程中,按照程序的顺序,前面的操作happens-before后续的任何操作。
  • 同一个线程内,代码的执行结果是有序的。其实就是,可能会发生指令重排,但是保证代码的执行结果一定是和按照顺序执行得到的一致,程序前面对某一个变量的修改一定对后续操作可见的,不可能会出现前面才把a修改为1,接着读a居然是修改前的结果,这也是程序运行最基本的要求
  • 监视器锁规则(Monitor - synchronized):对一个锁的解锁 happens-before 随后对这个锁的加锁
  • 就是无论是在单线程环境还是多线程环境,对于同一个锁来说,一个线程对这个锁解锁之后,另一个线程获取了这个锁都能看到前一个线程的操作结果。比如前一个线程将变量x的值修改为了12并解锁,之后另一个线程拿到了这把锁,对之前线程的操作是可见的,可以得到x是前一个线程修改后的结果12(所以synchronized是有happens-before规则的)
  • volatile 变量规则:对一个 volatile 字段的写操作 happens-before 任何后续对这个 volatile 字段的读操作
  • 就是如果一个线程先去写一个volatile变量,紧接着另一个线程去读这个变量,那么这个写操作的结果一定对读的这个变量的线程可见,并且会刷新其余的变量,例如书中例子,但是是隐式的
  • 线程启动规则:从线程 A 启动线程 B 的动作 happens-before 线程 B 中的任何动作。
  • 在主线程A执行过程中,启动子线程B,那么线程A在启动子线程B之前对共享变量的修改结果对线程B可见
  • 线程终止规则:线程 A 的所有操作 happens-before 其他线程检测到线程 A 已经终止的动作(通过 join 或其他方式)
  • 传递性规则: 如果A happens-before B,B happens-before C,那么A happens-before C。

多线程核心 (JUC)

锁框架

在JDK 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,Lock接口提供了与synchronized关键字类似的同步功能,但需要在使用时手动获取锁和释放锁。

Lock和Condition接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public interface Lock {
      //获取锁,拿不到锁会阻塞,等待其他线程释放锁,获取到锁后返回
    void lock();
      //同上,但是等待过程中会响应中断
    void lockInterruptibly() throws InterruptedException;
      //尝试获取锁,但是不会阻塞,如果能获取到会返回true,不能返回false
    boolean tryLock();
      //尝试获取锁,但是可以限定超时时间,如果超出时间还没拿到锁返回false,否则返回true,可以响应中断
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
      //释放锁
    void unlock();
      //暂时可以理解为替代传统的Object的wait()、notify()等操作的工具
    Condition newCondition();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public interface Condition {
      //与调用锁对象的wait方法一样,会进入到等待状态,但是这里需要调用Condition的signal或signalAll方法进行唤醒(感觉就是和普通对象的wait和notify是对应的)同时,等待状态下是可以响应中断的
      void await() throws InterruptedException;
      //同上,但不响应中断(看名字都能猜到)
      void awaitUninterruptibly();
      //等待指定时间,如果在指定时间(纳秒)内被唤醒,会返回剩余时间,如果超时,会返回0或负数,可以响应中断
      long awaitNanos(long nanosTimeout) throws InterruptedException;
      //等待指定时间(可以指定时间单位),如果等待时间内被唤醒,返回true,否则返回false,可以响应中断
      boolean await(long time, TimeUnit unit) throws InterruptedException;
      //可以指定一个明确的时间点,如果在时间点之前被唤醒,返回true,否则返回false,可以响应中断
      boolean awaitUntil(Date deadline) throws InterruptedException;
      //唤醒一个处于等待状态的线程,注意还得获得锁才能接着运行
      void signal();
      //同上,但是是唤醒所有等待线程
      void signalAll();
}

在使用Condition时,await()的线程需要先获取锁,signal()的线程也需要获取锁,且唤醒后也需要再次获取锁才能继续运行,且不同的Condition对象有不同的等待队列源码实现,因此无法跨对象唤醒。

可重入锁(ReentrantLock)

常见API使用 #### 公平锁与非公平锁

如果线程之间争抢同一把锁,会暂时进入到等待队列中,根据策略会产生不同的效果:

  • 公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
  • 非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。

由AQS源码可知,公平锁不一定是公平的,直到线程进入等待队列后,才能保证公平机制 ### 读写锁(ReadWriteLock)

1
2
3
4
5
6
public interface ReadWriteLock {
    //获取读锁
    Lock readLock();
      //获取写锁
    Lock writeLock();
}

和可重入锁不同的地方在于,可重入锁是一种排他锁,当一个线程得到锁之后,另一个线程必须等待其释放锁,否则一律不允许获取到锁。而读写锁在同一时间,是可以让多个线程获取到锁的,它其实就是针对于读写场景而出现的。

读写锁维护了一个读锁和一个写锁,这两个锁的机制是不同的。

  • 读锁:在没有任何线程占用写锁的情况下,同一时间可以有多个线程加读锁。
  • 写锁:在没有任何线程占用读锁的情况下,同一时间只能有一个线程加写锁。

锁降级和锁升级

锁降级指的是写锁降级为读锁。当一个线程持有写锁的情况下,虽然其他线程不能加读锁,但是线程自己是可以加读锁的,在同时加了写锁和读锁的情况下,释放写锁,其他的线程就可以一起加读锁,这种操作,就被称之为"锁降级"(注意不是先释放写锁再加读锁,而是持有写锁的情况下申请读锁再释放写锁)

注意在仅持有读锁的情况下去申请写锁,属于"锁升级",ReentrantReadWriteLock是不支持的 ### 队列同步器AQS源码分析

ReentrantLock的公平锁策略入手,解析AQS源码。 ReentrantLock#lock()方法调用的是其内部类Sync中的sync#lock()方法,而Sync类是继承自AbstractQueuedSynchronizer(AQS),调用AQS的内置方法。

  • ☐ ==整理AQS中的各类变量,数据结构,与方法之间的调用关系,以图的方式复习总结==
  • ☐ Unsafe类的CAS #### AQS底层实现(JDK17)

AQS内部封装了包括锁的获取、释放、等待队列等。一个锁(排他锁为例)的基本功能就是获取锁、释放锁、当锁被占用时,其他线程来争抢会进入等待队列,AQS已经将这些基本的功能封装完成,接下来会依次介绍AQS中的核心部分

AQS中的基础变量
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
static final int WAITING   = 1;          // must be 1  
static final int CANCELLED = 0x80000000; // must be negative  
static final int COND      = 2;          // in a condition wait

/**  
 * Head of the wait queue, lazily initialized. 
 */
private transient volatile Node head;  
/**  
 * Tail of the wait queue. After initialization, modified only via casTail. */ private transient volatile Node tail;  
  
/**  
 * The synchronization state. */ 
private volatile int state;

AQS中采取Dummy Node的方式维护等待队列双链表,且是lazily initialized,即在初始化AQS时不会创建相应的等待队列Node ##### AQS中的等待队列(核心数据结构)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
abstract static class Node {  
    volatile Node prev;       // initially attached via casTail  
    volatile Node next;       // visibly nonnull when signallable  
    Thread waiter;            // visibly nonnull when enqueued  
    volatile int status;      // written by owner, atomic bit ops by others  
  
    // methods for atomic operations    
    final boolean casPrev(Node c, Node v) {  // for cleanQueue  
        return U.weakCompareAndSetReference(this, PREV, c, v);  
    }  
    final boolean casNext(Node c, Node v) {  // for cleanQueue  
        return U.weakCompareAndSetReference(this, NEXT, c, v);  
    }  
    final int getAndUnsetStatus(int v) {     // for signalling  
        return U.getAndBitwiseAndInt(this, STATUS, ~v);  
    }  
    final void setPrevRelaxed(Node p) {      // for off-queue assignment  
        U.putReference(this, PREV, p);  
    }  
    final void setStatusRelaxed(int s) {     // for off-queue assignment  
        U.putInt(this, STATUS, s);  
    }  
    final void clearStatus() {               // for reducing unneeded signals  
        U.putIntOpaque(this, STATUS, 0);  
    }  
  
    private static final long STATUS  
        = U.objectFieldOffset(Node.class, "status");  
    private static final long NEXT  
        = U.objectFieldOffset(Node.class, "next");  
    private static final long PREV  
        = U.objectFieldOffset(Node.class, "prev");  
}
BigMap
Sync#lock()
1
2
3
4
final void lock() {  
    if (!initialTryLock())  
        acquire(1);  
}
[Fair|Unfair]Sync#initialTryLock()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
/**  
 * Acquires only if reentrant or queue is empty. */
final boolean initialTryLock() {  
    Thread current = Thread.currentThread();  
    int c = getState();  
    if (c == 0) {  
        if (!hasQueuedThreads() && compareAndSetState(0, 1)) {  
            setExclusiveOwnerThread(current);  
            return true;  
        }  
    } else if (getExclusiveOwnerThread() == current) {  
        if (++c < 0) // overflow  
            throw new Error("Maximum lock count exceeded");  
        setState(c);  
        return true;  
    }  
    return false;  
}

该函数流程简单,首先获取当前AQS的状态State,若等于0则表示当前没有任何进程获得锁,然后通过hasQueuedThreads()方法判断当前等待队列是否有其余线程,并且CAS原子地设置状态为1,获取排他锁,注意,由于该方法本身没有加事实意义上的锁,因此在任意时刻状态都可能会变化([hasQueuedThreads()](#AQS hasQueuedThreads() “wikilink”)注释写道),因此在获取state==0后再次检验队列,可以看作是一种recheck机制;若不等于0则判断是否是当前进程已经持有锁,且没有溢出。

AQS#hasQueuedThreads()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/**  
 * Queries whether any threads are waiting to acquire. Note that * because cancellations due to interrupts and timeouts may occur * at any time, a {@code true} return does not guarantee that any  
 * other thread will ever acquire. * * @return {@code true} if there may be other threads waiting to acquire  
 */
 public final boolean hasQueuedThreads() {  
    for (Node p = tail, h = head; p != h && p != null; p = p.prev)  
        if (p.status >= 0)  
            return true;  
    return false;  
}
AQS#acquire(int arg)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
/**  
 * Acquires in exclusive mode, ignoring interrupts.  Implemented * by invoking at least once {@link #tryAcquire},  
 * returning on success.  Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link  
 * #tryAcquire} until success.  This method can be used  
 * to implement method {@link Lock#lock}.  
 * * @param arg the acquire argument.  This value is conveyed to  
 *        {@link #tryAcquire} but is otherwise uninterpreted and  
 *        can represent anything you like. */
public final void acquire(int arg) {  
    if (!tryAcquire(arg))  
        acquire(null, arg, false, false, false, 0L);  
}

该方法调用AQS中待实现类实现的tryAcquire()方法,以自定义的方式尝试获取一次锁,若获取失败,则调用AQS#acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time)方法

FairSync#tryAcquire(int acquires)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/**  
 * Acquires only if thread is first waiter or empty */
protected final boolean tryAcquire(int acquires) {  
    if (getState() == 0 && !hasQueuedPredecessors() &&  
        compareAndSetState(0, acquires)) {  
        setExclusiveOwnerThread(Thread.currentThread());  
        return true;  
    }  
    return false;  
}

同样的,该函数的大体逻辑与initialTryLock()相似,前者是在锁为空或者等待队列为空时获取锁,该函数是在等待队列为空或者该线程为等待队列中的第一位时,进行锁的获取(公平锁,且同样使用CAS原子地设置)

AQS#hasQueuedPredecessors()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**  
 * Queries whether any threads have been waiting to acquire longer * than the current thread. * * <p>An invocation of this method is equivalent to (but may be  
 * more efficient than): * <pre> {@code  
 * getFirstQueuedThread() != Thread.currentThread()  
 *   && hasQueuedThreads()}</pre>  
 *  
 * <p>Note that because cancellations due to interrupts and  
 * timeouts may occur at any time, a {@code true} return does not  
 * guarantee that some other thread will acquire before the current * thread.  Likewise, it is possible for another thread to win a * race to enqueue after this method has returned {@code false},  
 * due to the queue being empty. * * <p>This method is designed to be used by a fair synchronizer to  
 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.  
 * Such a synchronizer's {@link #tryAcquire} method should return  
 * {@code false}, and its {@link #tryAcquireShared} method should  
 * return a negative value, if this method returns {@code true}  
 * (unless this is a reentrant acquire).  For example, the {@code  
 * tryAcquire} method for a fair, reentrant, exclusive mode  
 * synchronizer might look like this: * * <pre> {@code  
 * protected boolean tryAcquire(int arg) {  
 *   if (isHeldExclusively()) { *     // A reentrant acquire; increment hold count *     return true; *   } else if (hasQueuedPredecessors()) { *     return false; *   } else { *     // try to acquire normally *   } * }}</pre>  
 *  
 * @return {@code true} if there is a queued thread preceding the  
 *         current thread, and {@code false} if the current thread  
 *         is at the head of the queue or the queue is empty * @since 1.7  
 */
public final boolean hasQueuedPredecessors() {  
    Thread first = null; Node h, s;  
    if ((h = head) != null && ((s = h.next) == null ||  
                               (first = s.waiter) == null ||  
                               s.prev == null))  
        first = getFirstQueuedThread(); // retry via getFirstQueuedThread  
    return first != null && first != Thread.currentThread();  
}

正如注释所写,其等价于hasQueuedThreads()然后判断是否为First,证明大体逻辑的正确性。

考虑以下场景:当前线程调用此方法,并且返回false,此时另外的线程(没有入队)开始抢占锁,在当前线程CAS修改state之前先获取到了锁,此时当前线程无法获取锁,即使此方法返回了false,并且大体逻辑是在公平锁语境下。因此只有已经进入队列的线程才能保证公平性。

回到该函数,如果判断头节点不为空,则等待队列可能拥有元素,并且在头节点的下一个节点为空,或者头节点的下一个节点的等待线程为空,或者头节点的下一个线程的prev字段为空,证明等待队列处于一个不一致的情况,或者是过渡状态(节点正在添加或移除),会显式调用getFirstQueuedThread()方法可靠地获取队列中的第一个线程,否则在判断的过程中就会将fisrt字段赋值(短路操作)。最后判断first是否是当前线程即可。

[!NOTE] GPT解释 Detailed Explanation of hasQueuedPredecessors(): This method provides a way to determine if the calling thread should wait in line or attempt to acquire the lock directly, based on whether there are other threads ahead of it in the queue.

  1. Checking the Queue: The method starts by initializing Thread first to null and declaring Node h and Node s. It then checks if the head of the queue (h) is not null. If the head exists, it proceeds to check the next node (s = h.next).
  2. Evaluating Conditions: If the head’s next node (s) is null, or s.waiter (the thread in the s node) is null, or s.prev (the link back to the head) is null, it implies a possibility of inconsistency in the queue or that the queue might be transitioning states (e.g., nodes being added or removed). In such cases, it uses getFirstQueuedThread() to reliably get the first thread in the queue and reassess the situation. This call is more robust but potentially less efficient, hence used as a fallback. 3.Return Logic: Finally, the method returns true if first (the first queued thread determined either directly or through the fallback) is not null and is not the current thread. This means there is at least one thread that has been waiting longer than the current thread. If first is null or it is the current thread, it returns false, indicating either the queue is empty or the current thread is at the head of the queue.
AQS#acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;               // predecessor of node when enqueued

        /*
         * Repeatedly:
         *  Check if node now first
         *    if so, ensure head stable, else ensure valid predecessor
         *  if node is first or not yet enqueued, try acquiring
         *  else if queue is not initialized, do so by attaching new header node
         *     resort to spinwait on OOME trying to create node
         *  else if node not yet created, create it
         *     resort to spinwait on OOME trying to create node
         *  else if not yet enqueued, try once to enqueue
         *  else if woken from park, retry (up to postSpins times)
         *  else if WAITING status not set, set and retry
         *  else park and clear WAITING status, and check cancellation
         */

        for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            Node t;
            if ((t = tail) == null) {           // initialize queue
                if (tryInitializeHead() == null)
                    return acquireOnOOME(shared, arg);
            } else if (node == null) {          // allocate; retry before enqueue
                try {
                    node = (shared) ? new SharedNode() : new ExclusiveNode();
                } catch (OutOfMemoryError oome) {
                    return acquireOnOOME(shared, arg);
                }
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted, interruptible);
    }

这是AQS获取锁的核心代码,所有暴露的acquire方法都会调用这个方法,其通过自旋+CAS的方式进行无锁并发,只有在成功获取、超时、中断的情况下会退出自旋。

针对当前节点不是first的情况,首先尝试获取当前节点的前驱,若前驱不为空,则再次判断当前节点是否为first并赋值,若再次判断后不为first,则继续判断:如果前驱为CANCELLED,调用cleanQueue()尝试清理状态为CANCELLED的节点,优化等待队列,帮助保持头节点的稳定性,清除操作后,Head节点的下一个节点将指向一个有效的、未取消的节点,从而使得锁的获取更加顺畅,并调用Thread.onSpinWait()确保前驱节点的有效性。

若当前节点为first或者尚未入队,再次调用实现类的tryAcquire()方法尝试获取锁,如果成功获取,且当前节点为first,则调整等待队列的head,并且在共享锁模式下尝试唤醒其余可能唤醒的节点,处理中断。

如果当前tail为null,证明等待队列尚未初始化,调用tryInitializeHead()初始化等待队列(证明AQS的等待队列为lazy initialize)

如果当前队列已经初始化,但是当前节点为null,则根据模式创建相应的节点。

如果当前队列已经初始化,且节点已经初始化,但尚未入队(pred==null),尝试进行入队,将节点的waiter更改为当前线程,CAS尝试修改tail

如果当前节点是first并且已经被unparked,减少自旋值增加公平性。

如果当前节点的状态是0(node.status == 0),将其状态设置为WAITING并进行recheck。在recheck时,其status虽然被设定为WAITING,但如果当前node为first且成功获取锁,说明有其他线程unlock并且signalNext将其status设定为0,依旧保证status为0的情况下才能获取锁。[signalNext()中的判断条件可以解释](#AQS signalNext() “wikilink”)

否则,将该线程挂起park,在唤醒后将其的status设定为0,并处理中断

[!NOTE] GPT对于Status的解释 Understanding Status Management

  1. Role of WAITING Status:
>   +  When a node (representing a thread in a queue) is set to WAITING, it typically indicates that the thread is actively waiting and should remain parked until explicitly signalled. The WAITING status is used to manage thread wake-up correctly and to avoid lost wake-ups.
  1. Role of Status Zero (0):
>   + A status of 0 generally indicates that the node is not in any particular waiting or signal state. This can mean several things depending on the context:
    >       + The thread is not currently waiting.
    >       + The thread has been woken up and is about to retry acquiring the lock.
    >       + The thread has completed its operation and is being cleaned up.
  1. Acquiring the Lock with Status Zero:
>   + Setting the status to zero does not by itself grant the lock to the node. Instead, it signifies that the node is in a state eligible to attempt to acquire the lock. When a thread (node) attempts to acquire the lock, having a status of zero implies that it is neither parked nor scheduled to be parked. This status allows it to enter the lock acquisition logic without being delayed by unnecessary waits.
  1. Transition from WAITING to Zero:
  • The transition from WAITING to 0 typically occurs when:
    • The node is signalled (either by LockSupport.unpark() or similar mechanisms) that it should wake up and retry acquiring the lock.
    • The thread successfully acquires the lock and subsequently clears its status to indicate it is no longer waiting.
    • The thread is aborting its wait, possibly due to a timeout or an interrupt, and needs to clear its status as part of cleanup operations.

Practical Implication

  • In a Blocking Scenario (Park):
  • While the node is WAITING, the thread is typically parked (LockSupport.park()) and will remain so until it is unparked or otherwise signalled. The WAITING status helps ensure that the node remains correctly identified as being in need of a wake-up signal.
  • In a Lock Acquisition Scenario:
  • A node may attempt to acquire the lock regardless of its initial status (either 0 or transitioning from WAITING). If the lock acquisition is successful, any status related to waiting is irrelevant post-acquisition; thus, clearing the status to 0 is often an administrative or cleanup action, preparing the node for potential reuse or ensuring it does not remain marked as waiting unnecessarily.
AQS#cleanQueue()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * Possibly repeatedly traverses from tail, unsplicing cancelled
 * nodes until none are found. Unparks nodes that may have been
 * relinked to be next eligible acquirer.
 */
private void cleanQueue() {
    for (;;) {                               // restart point
        for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
            if (q == null || (p = q.prev) == null)
                return;                      // end of list
            if (s == null ? tail != q : (s.prev != q || s.status < 0))
                break;                       // inconsistent
            if (q.status < 0) {              // cancelled
                if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
                    q.prev == p) {
                    p.casNext(q, s);         // OK if fails
                    if (p.prev == null)
                        signalNext(p);
                }
                break;
            }
            if ((n = p.next) != q) {         // help finish
                if (n != null && q.prev == p) {
                    p.casNext(n, q);
                    if (p.prev == null)
                        signalNext(p);
                }
                break;
            }
            s = q;
            q = q.prev;
        }
    }
}

该方法尝试遍历整个等待队列,使用p,q,s三个变量表示当前节点的前驱,当前节点,当前节点的后继。

  • 判断是否已达到队列的起点,若已达到则退出循环
  • 若当前节点的状态不是一致性,则退出内层循环,从尾部重新开始清理
  • 若当前节点q的状态是CANCELLED,尝试通过CAS更改当前节点的前驱,并且修改当前节点的后继的CAS操作可以失败,在再次循环中会判断这种情况(前驱正确,但是后继不正确),并帮助完成链接
  • 并且在修改链接关系后,判断当前节点是否可能为head节点(p.prev==null),若可能,则调用signalNext(p)唤醒下一个线程

[!NOTE] 唤醒下一个线程

  1. 维持锁的可用性:如果 phead 或近似于 head,并且 pnext 指向另一个有效的等待节点,那么这个节点现在可能有机会获取锁。因此,唤醒该节点上的线程是必要的,以便它可以尝试获取锁。
  2. 避免线程饥饿:在多线程并发控制中,保持队列的公平性和活跃性非常重要。如果 pnext 节点的线程处于等待状态,不及时唤醒它可能导致线程饥饿,即线程长时间等待而不得执行。
  3. 响应队列变化:当 cleanQueue() 方法移除一个或多个已取消的节点时,队列的状态发生了变化。更新 head 并唤醒相应的线程是响应这种变化、保证锁机制正常运作的必要步骤。
Sync#unlock()

内部仅调用AQS#release(1) ##### AQS#release(int arg)AQS#acquire(int arg)相似,调用实现类实现的方法tryRelease(int arg),如果成功释放,则唤醒等待队列中的第一个可用线程节点 ##### Sync#tryRelease()&AQS#signalNext() 两者逻辑较为简单,后者的判断条件决定了,只有当status == WAITING时,才能被唤醒,此时存在被park或者正在acquire中进行recheck,都保证了获取锁之前会将status置为0

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected final boolean tryRelease(int releases) {  
    int c = getState() - releases;  
    if (getExclusiveOwnerThread() != Thread.currentThread())  
        throw new IllegalMonitorStateException();  
    boolean free = (c == 0);  
    if (free)  
        setExclusiveOwnerThread(null);  
    setState(c);  
    return free;  
}
/**
 * Wakes up the successor of given node, if one exists, and unsets its
 * WAITING status to avoid park race. This may fail to wake up an
 * eligible thread when one or more have been cancelled, but
 * cancelAcquire ensures liveness.
 */
private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {
        s.getAndUnsetStatus(WAITING);
        LockSupport.unpark(s.waiter);
    }
}
Condition

condition类实际上是代替传统对象的wait & notify操作的,实现等待/通知模式,并且同一把锁下面可以创建复数个condition对象

在AQS内部,通过复用等待队列的Node结构实现condition等待队列,但是其中的Node节点状态为COND且仅维护后继节点(普通的单链表),并且condition队列是由ConditionObject实现类进行维护,其与AQS的等待队列结构相似,仅是修改了节点定义,实现了相关方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static final class ConditionNode extends Node
    implements ForkJoinPool.ManagedBlocker {
    ConditionNode nextWaiter;            // link to next waiting node
    // ......
}
/**
 * Condition implementation for a {@link AbstractQueuedSynchronizer}
 * serving as the basis of a {@link Lock} implementation.
 *
 * <p>Method documentation for this class describes mechanics,
 * not behavioral specifications from the point of view of Lock
 * and Condition users. Exported versions of this class will in
 * general need to be accompanied by documentation describing
 * condition semantics that rely on those of the associated
 * {@code AbstractQueuedSynchronizer}.
 *
 * <p>This class is Serializable, but all fields are transient,
 * so deserialized conditions have no waiters.
 */
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient ConditionNode firstWaiter;
    /** Last node of condition queue. */
    private transient ConditionNode lastWaiter;
    // ......
}

其核心在于await()signal(),接下来依次解析相应源码 ##### ConditionObject#await()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
 * Implements interruptible condition wait.
 * <ol>
 * <li>If current thread is interrupted, throw InterruptedException.
 * <li>Save lock state returned by {@link #getState}.
 * <li>Invoke {@link #release} with saved state as argument,
 *     throwing IllegalMonitorStateException if it fails.
 * <li>Block until signalled or interrupted.
 * <li>Reacquire by invoking specialized version of
 *     {@link #acquire} with saved state as argument.
 * <li>If interrupted while blocked in step 4, throw InterruptedException.
 * </ol>
 */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    ConditionNode node = newConditionNode();
    if (node == null)
        return;
    int savedState = enableWait(node);
    LockSupport.setCurrentBlocker(this); // for back-compatibility
    boolean interrupted = false, cancelled = false, rejected = false;
    while (!canReacquire(node)) {
        if (interrupted |= Thread.interrupted()) {
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;              // else interrupted after signal
        } else if ((node.status & COND) != 0) {
            try {
                if (rejected)
                    node.block();
                else
                    ForkJoinPool.managedBlock(node);
            } catch (RejectedExecutionException ex) {
                rejected = true;
            } catch (InterruptedException ie) {
                interrupted = true;
            }
        } else
            Thread.onSpinWait();    // awoke while enqueuing
    }
    LockSupport.setCurrentBlocker(null);
    node.clearStatus();
    acquire(node, savedState, false, false, false, 0L);
    if (interrupted) {
        if (cancelled) {
            unlinkCancelledWaiters(node);
            throw new InterruptedException();
        }
        Thread.currentThread().interrupt();
    }
}

该函数的大体逻辑较为清晰:

  • 创建新的ConditionNode
  • 调用ConditionObject#enableWait()进行当前锁状态的存储与释放,设定状态为COND | WAITING,添加进入condition等待队列
  • 循环调用ConditionObject#canRequire()判断该节点是否可以获取锁,该方法通过判断该节点是否已经从condition等待队列移入AQS等待队列
  • 然后判断中断,通过之前设定的COND | WAITING状态进行判断是否在signal之前就被interrupt,具体来说,在signal之后,status中的COND位会被移除,若在此处移除COND位之前尚未被移除,说明该中断在signal之前
  • 并且在至少拥有COND状态的情况下调用AQS#block()进行park等待唤醒
  • 否则调用Thread.onSpinWait()等待入队进程完成,因为不满足前者的情况下说明现在是过渡态
  • 若已经成功移入AQS等待队列,清除当前状态为0,调用AQS#acquire(argvs...)进行普通的锁获取
ConditionObject#enableWait()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * Adds node to condition list and releases lock.
 *
 * @param node the node
 * @return savedState to reacquire after wait
 */
private int enableWait(ConditionNode node) {
    if (isHeldExclusively()) {
        node.waiter = Thread.currentThread();
        node.setStatusRelaxed(COND | WAITING);
        ConditionNode last = lastWaiter;
        if (last == null)
            firstWaiter = node;
        else
            last.nextWaiter = node;
        lastWaiter = node;
        int savedState = getState();
        if (release(savedState))
            return savedState;
    }
    node.status = CANCELLED; // lock not held or inconsistent
    throw new IllegalMonitorStateException();
}

该函数逻辑简单,在获得排他锁的情况下将新节点加入condition等待队列,调用AQS#release()方法释放当前的锁

ConditionObject#canRequire()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
/**
 * Returns true if a node that was initially placed on a condition
 * queue is now ready to reacquire on sync queue.
 * @param node the node
 * @return true if is reacquiring
 */
private boolean canReacquire(ConditionNode node) {
    // check links, not status to avoid enqueue race
    Node p; // traverse unless known to be bidirectionally linked
    return node != null && (p = node.prev) != null &&
        (p.next == node || isEnqueued(node));
}

该函数同样简单,检测队列的完整性,并且判断是否进入AQS等待队列,注意,只有AQS等待队列维护前驱,即已经进入AQS队列后才可能返回true,并且使用了recheck方法

Node#getAndUnsetStatus(int v)
1
2
3
final int getAndUnsetStatus(int v) {     // for signalling  
    return U.getAndBitwiseAndInt(this, STATUS, ~v);  
}

该工具函数为按位取反再与运算,返回值为操作之前的状态 ##### ConditionObject#signal() -> ConditionObject#doSignal()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
/**
 * Removes and transfers one or all waiters to sync queue.
 */
private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        ConditionNode next = first.nextWaiter;
        if ((firstWaiter = next) == null)
            lastWaiter = null;
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
            enqueue(first);
            if (!all)
                break;
        }
        first = next;
    }
}

该函数的核心在于判断当前节点是否有COND位并取消COND位,若拥有,则调用ConditionNode#enqueue()将该节点从condition队列中移入AQS等待队列

ConditionNode#enqueue()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
 * Enqueues the node unless null. (Currently used only for
 * ConditionNodes; other cases are interleaved with acquires.)
 */
final void enqueue(ConditionNode node) {
    if (node != null) {
        boolean unpark = false;
        for (Node t;;) {
            if ((t = tail) == null && (t = tryInitializeHead()) == null) {
                unpark = true;             // wake up to spin on OOME
                break;
            }
            node.setPrevRelaxed(t);        // avoid unnecessary fence
            if (casTail(t, node)) {
                t.next = node;
                if (t.status < 0)          // wake up to clean link
                    unpark = true;
                break;
            }
        }
        if (unpark)
            LockSupport.unpark(node.waiter);
    }
}

该函数的核心思想在于unpark唤醒,若初始化AQS等待队列失败(OOME),或者该节点的前驱(之前的tail)处于CANCELLED,则需要唤醒当前节点(同样是一种recheck机制),唤醒后的节点一定会尽快进入AQS#acquire,无论是在哪个等待队列,对于condition队列,会调用await()之后的代码进入acquire,对于AQS队列,其已经进入acquire再被park。对于前者,会再次初始化抛出OOME,对于后者,会调用AQS#cleanQueue(),确保AQS等待队列的完整性,优化效率与内存

[!NOTE] Why unpark when predcessor is CANCELLED

  • Why Wake Up on Cancelled Status?: If the previous tail is cancelled, it might be necessary to wake up or signal other threads because the presence of a cancelled node at the tail can disrupt normal lock acquisition processes. The cancelled node may not be properly participating in the queue dynamics (like signaling next nodes), so handling or removing it quickly is crucial.

自行实现锁类

  • 重写Lock接口中的方法
  • 重写AQS提供的五个try方法中所需要使用的,并与Lock接口的重写方法相关联即可 ## 原子类

原子类介绍

  • AtomicInteger
  • AtomicLong
  • AtomicBoolean
  • AtomicReference<?>
  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray
  • DoubleAdder
  • LongAdder

本质上是采取了volatile关键字+自旋CAS操作保证原子性 ### ABA问题

ABA JUC提供了带版本号的引用类型,只要每次操作都记录一下版本号,并且版本号不会重复,那么就可以解决ABA问题了,类比Redis

并发容器

传统容器

以ArrayList<>为例,其add方法如下:

1
2
3
4
5
public boolean add(E e) { 
    ensureCapacityInternal(size + 1); // Increments modCount!! 
    elementData[size++] = e; //这一句出现了数组越界 
    return true; 
}

在多线程的情况下就会出现数组越界的情况,而HashMap也存在相应的问题,于是我们需要线程安全的解决方法 ### 并发容器

使用synchronized关键字是一个可靠的解决方法,但是其效率较为低下,JUC包中提供了相应的线程安全集合类

CopyOnWriteArrayList<>

在写操作中获取锁,复制并扩容,修改数组并回写。 在读操作中不获取锁。

ConcurrentHashMap<>

HashMap HashMap就是利用了哈希表,哈希表的本质其实就是一个用于存放后续节点的头结点的数组,数组里面的每一个元素都是一个头结点(也可以说就是一个链表),当要新插入一个数据时,会先计算该数据的哈希值,找到数组下标,然后创建一个新的节点,添加到对应的链表后面。当链表的长度达到8时,会自动将链表转换为红黑树,这样能使得原有的查询效率大幅度升高!当使用红黑树之后,我们就可以利用二分搜索的思想,快速地去寻找我们想要的结果,而不是像链表一样挨个去看

JDK7之前,是将所有数据进行分段,对于每一段数据共享一把锁 concurrentHashMap1 JDK8之后,是对于每一个头节点给予一把锁 concurrentHashMap2

其核心在于putVal()get()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());    //计算键的hash值,用于确定在哈希表中的位置
    int binCount = 0;   //用来记录链表长度
    for (Node<K,V>[] tab = table;;) {    //CAS自旋锁
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();    //如果数组(哈希表)为空肯定是要进行初始化的,然后再重新进下一轮循环
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {   //如果哈希表该位置为null,直接CAS插入结点作为头结即可(注意这里会将f设置当前哈希表位置上的头结点)
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))  
                break;                   // 如果CAS成功,直接break结束put方法,失败那就继续下一轮循环
        } else if ((fh = f.hash) == MOVED)   //头结点哈希值为-1,正在扩容
            tab = helpTransfer(tab, f);   //帮助进行迁移,完事之后再来下一次循环
        else {     //特殊情况都完了,这里就该是正常情况了,
            V oldVal = null;
            synchronized (f) {   //在前面的循环中f肯定是被设定为了哈希表某个位置上的头结点,这里直接把它作为锁加锁了,防止同一时间其他线程也在操作哈希表中这个位置上的链表或是红黑树
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {    //头结点的哈希值大于等于0说明是链表,下面就是针对链表的一些列操作
                        ...实现细节略
                    } else if (f instanceof TreeBin) {   //肯定不大于0,肯定也不是-1,还判断是不是TreeBin,所以不用猜了,肯定是红黑树,下面就是针对红黑树的情况进行操作
                          //在ConcurrentHashMap并不是直接存储的TreeNode,而是TreeBin
                        ...实现细节略
                    }
                }
            }
              //根据链表长度决定是否要进化为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);   //注意这里只是可能会进化为红黑树,如果当前哈希表的长度小于64,它会优先考虑对哈希表进行扩容
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());   //计算哈希值
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
          // 如果头结点就是我们要找的,那直接返回值就行了
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
          //要么是正在扩容,要么就是红黑树,负数只有这两种情况
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
          //确认无误,肯定在列表里,开找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
      //没找到只能null了
    return null;
}

综上,ConcurrentHashMap的put操作,实际上是对哈希表上的所有头结点元素分别加锁,理论上来说哈希表的长度很大程度上决定了ConcurrentHashMap在同一时间能够处理的线程数量,这也是为什么treeifyBin()会优先考虑为哈希表进行扩容的原因。显然,这种加锁方式比JDK7的分段锁机制性能更好。 ### 阻塞队列

BlockingQueue<E> extends Queue<E>

阻塞队列本身也是队列,但是是适应多线程环境下的,基于ReentrantLock实现的,接口定义如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    
    //入队,如果队列已满,返回false否则返回true(非阻塞)
    boolean offer(E e);
    
    //入队,如果队列已满,阻塞线程直到能入队为止
    void put(E e) throws InterruptedException;
    
    //入队,如果队列已满,阻塞线程直到能入队或超时、中断为止,入队成功返回true否则false
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
        
    //出队,如果队列为空,阻塞线程直到能出队为止
    E take() throws InterruptedException;
    
    //出队,如果队列为空,阻塞线程直到能出队超时、中断为止,出队成功正常返回,否则返回null
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //返回此队列理想情况下(在没有内存或资源限制的情况下)可以不阻塞地入队的数量,如果没有限制,则返回 Integer.MAX_VALUE
    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    //一次性从BlockingQueue中获取所有可用的数据对象(还可以指定获取数据的个数)
    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);

其常用的三个实现类,即常用的阻塞队列有:

  • ArrayBlockingQueue:有界带缓冲阻塞队列(就是队列是有容量限制的,装满了肯定是不能再装的,只能阻塞,数组实现)
  • SynchronousQueue:无缓冲阻塞队列(相当于没有容量的ArrayBlockingQueue,因此只有阻塞的情况)
  • LinkedBlockingQueue:无界带缓冲阻塞队列(没有容量限制,也可以限制容量,也会阻塞,链表实现)

基于这些实现类,可以轻易实现生产者消费者模型。

ArrayBlockingQueue

构造方法与基础变量
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/** Main lock guarding all access */  
final ReentrantLock lock;  
  
/** Condition for waiting takes */  
@SuppressWarnings("serial")  // Classes implementing Condition may be serializable.  
private final Condition notEmpty;  
  
/** Condition for waiting puts */  
@SuppressWarnings("serial")  // Classes implementing Condition may be serializable.  
private final Condition notFull;

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity and the specified access policy.
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

内部使用ReentrantLock与两个Condition对象,完成出队与入队的线程阻塞控制

put() & offer()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;    //使用了类里面的ReentrantLock进行加锁操作
    lock.lock();    //保证同一时间只有一个线程进入
    try {
        if (count == items.length)   //直接看看队列是否已满,如果没满则直接入队,如果已满则返回false
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;    //同样的,需要进行加锁操作
    lock.lockInterruptibly();    //注意这里是可以响应中断的
    try {
        while (count == items.length)
            notFull.await();    //当队列已满时会直接挂起当前线程,在其他线程出队操作时会被唤醒
        enqueue(e);   //直到队列有空位才将线程入队
    } finally {
        lock.unlock();
    }
}
poll() & take()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();    //出队同样进行加锁操作,保证同一时间只能有一个线程执行
    try {
        return (count == 0) ? null : dequeue();   //如果队列不为空则出队,否则返回null
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();    //可以响应中断进行加锁
    try {
        while (count == 0)
            notEmpty.await();    //和入队相反,也是一直等直到队列中有元素之后才可以出队,在入队时会唤醒此线程
        return dequeue();
    } finally {
        lock.unlock();
    }
}
enqueue() & dequeue()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * Inserts element at current put position, advances, and signals.
 * Call only when holding lock.
 */
private void enqueue(E e) {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;
    count++;
    notEmpty.signal();
}

/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return e;
}

注意在出队时会通知notFull,入队时通知notEmpty

SynchronousQueue

该阻塞队列没有任何容量,正常情况下出队必须和入队操作成对出现,即直接以生产者消费者模式进行的,直接通过内部抽象类维护的方法Transferer<E>#transfer()来对生产者和消费者之间的数据进行传递,具体来说,通过对传入transfer()方法的参数,来区别是put还是take相关方法。

同样地,该阻塞队列中也存在非公平和公平两种实现(前者是通过TransferStack<E>实现,后者是TransferQueue<E>),我们以公平模式为例

构造方法和基础变量
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
/**
 * Reference to a cancelled node that might not yet have been
 * unlinked from queue because it was the last inserted node
 * when it was cancelled.
 */
transient volatile QNode cleanMe;

TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}

static final class QNode implements ForkJoinPool.ManagedBlocker {  
    volatile QNode next;          // next node in queue  
    volatile Object item;         // CAS'ed to or from null  
    volatile Thread waiter;       // to control park/unpark  
    final boolean isData;  
  
    QNode(Object item, boolean isData) {  
        this.item = item;  
        this.isData = isData;  
    }
    //......
}

我们可以发现,==其维护的QNode与AQS中的Node节点十分相似== ##### TransferQueue#transfer

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
/**
 * Puts or takes an item.
 */
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     *
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     *
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS'ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     *
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     *
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.
     */

    QNode s = null;                  // constructed/reused as needed
    boolean isData = (e != null);
    for (;;) {
        QNode t = tail, h = head, m, tn;         // m is node to fulfill
        if (t == null || h == null)
            ;                                    // inconsistent
        else if (h == t || t.isData == isData) { // empty or same-mode
            if (t != tail)                       // inconsistent
                ;
            else if ((tn = t.next) != null)      // lagging tail
                advanceTail(t, tn);
            else if (timed && nanos <= 0L)       // can't wait
                return null;
            else if (t.casNext(null, (s != null) ? s :
                               (s = new QNode(e, isData)))) {
                advanceTail(t, s);
                long deadline = timed ? System.nanoTime() + nanos : 0L;
                Thread w = Thread.currentThread();
                int stat = -1; // same idea as TransferStack
                Object item;
                while ((item = s.item) == e) {
                    if ((timed &&
                         (nanos = deadline - System.nanoTime()) <= 0) ||
                        w.isInterrupted()) {
                        if (s.tryCancel(e)) {
                            clean(t, s);
                            return null;
                        }
                    } else if ((item = s.item) != e) {
                        break;                   // recheck
                    } else if (stat <= 0) {
                        if (t.next == s) {
                            if (stat < 0 && t.isFulfilled()) {
                                stat = 0;        // yield once if first
                                Thread.yield();
                            }
                            else {
                                stat = 1;
                                s.waiter = w;
                            }
                        }
                    } else if (!timed) {
                        LockSupport.setCurrentBlocker(this);
                        try {
                            ForkJoinPool.managedBlock(s);
                        } catch (InterruptedException cannotHappen) { }
                        LockSupport.setCurrentBlocker(null);
                    }
                    else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
                        LockSupport.parkNanos(this, nanos);
                }
                if (stat == 1)
                    s.forgetWaiter();
                if (!s.isOffList()) {            // not already unlinked
                    advanceHead(t, s);           // unlink if head
                    if (item != null)            // and forget fields
                        s.item = s;
                }
                return (item != null) ? (E)item : e;
            }

        } else if ((m = h.next) != null && t == tail && h == head) {
            Thread waiter;
            Object x = m.item;
            boolean fulfilled = ((isData == (x == null)) &&
                                 x != m && m.casItem(x, e));
            advanceHead(h, m);                    // (help) dequeue
            if (fulfilled) {
                if ((waiter = m.waiter) != null)
                    LockSupport.unpark(waiter);
                return (x != null) ? (E)x : e;
            }
        }
    }
}

该方法通过判断e是否为null,设定isData变量,true表示消费者反之表示生产者。

方法入口依旧是自旋,猜测下面是复数个CAS方法,维持多线程中代码操作的正确性和原子性。该方法的主要目的在于:

  • 将当前节点入队:当队列为空或者队列中都是状态相同的节点(全是生产者或者全是消费者)
  • 满足一个等待中的transfer:当队列中存在与当前状态相反的节点时,取出 接下来是代码核心循环逻辑:
  • 如果h或者t为空,证明正在初始化,队列一致性检验不通过,继续循环
  • 如果h == t,即当前队列为空,或者当前节点的状态与队列中的一致
  • 同样判断队列一致性
  • 在多线程上下文中,判断t是否仍为tail,并且更新(lagging tail check)
  • 判断是否超时,超时直接返回null
  • 否则,证明当前状态和队列都合法,开始尝试进行入队,使用CAS更改t.next字段(QNode s是lazily instantiated),若成功则修改tail
  • 通过判断当前节点中item的值是否改变,维持park等待或者自旋等待
  • 处理超时和中断情况
  • recheck item的值是否改变,常见的多线程recheck操作
  • 判断stat <= 0,初始值为-1
  • 判断队列有效性,即t.next==s,如果有效,尝试改变stat状态
  • 如果stat<0,即未改变过,且s的前驱t已经得到满足(t.isFulfilled()该方法中检查isData字段是否和当前item的状态相符,并且再次检查item字段是否已经取消 - 对应post-loop中item != null方法,已经满足的节点会将item设定为this),说明当前节点已经为first,更改一次stat,并且调用Thread.yield()等待
  • 如果已经改变过一次,则直接将stat置1,设置当前节点的等待线程s.waiter=w,准备被park调用(unpark()传入的参数是线程)
  • 根据是否可以超时进行park等待
  • 已经退出循环,证明可以被满足,无需等待(post-loop),根据stat和当前节点s的状态设置对应的队列状态,根据消费者或者是生产者返回相应的数据
  • 否则,recheck判断当前队列是否不为空,且队列满足一致性,不在过渡态(常规recheck),若满足,则证明队列中的first可以与当前节点配对,互相满足
  • 还是常规的recheck判断,与isFulfilled()相似,并且尝试CAS设置first.item为当前节点的e
  • 注意这里可以直接调用advanceHead(h,m)修改head,因为如果前面的CAS失败了,说明有其他线程已经抢先满足,那么也是满足修改head的前置条件的
  • 如果可以满足,并且该线程拥有待唤醒的线程,直接调用unpark(使得被阻塞等待的线程唤醒)并且返回相应的值。

总体来说,被阻塞的线程核心在第二个if条件,可以满足被阻塞线程的线程核心在第三个if条件

`TransferStack#transfer

大体思路一致,只不过将队列变为了stack,满足非公平模式

LinkedTransferQueue

该对象保留了SynchronousQueue的匹配交接机制,并且与等待队列进行融合,我们知道,SynchronousQueue并没有使用锁,而是采用CAS操作保证生产者与消费者的协调,但是它没有容量,而LinkedBlockingQueue虽然是有容量且无界的,但是内部基本都是基于锁实现的,性能并不是很好,这时,我们就可以将它们各自的优点单独拿出来,揉在一起,就成了性能更高的LinkedTransferQueue

相比 SynchronousQueue ,它多了一个可以存储的队列,我们依然可以像阻塞队列那样获取队列中所有元素的值,简单来说,LinkedTransferQueue其实就是一个多了存储队列的SynchronousQueue,插入时,会先检查是否有其他线程等待获取,如果是,直接进行交接,否则插入到存储队列中,不会像SynchronousQueue那样必须等一个匹配的才可以,并且可以打印队列中的元素

(前者被阻塞,进入内部队列,对外不可见,后者是可见的)

PriorityBlockingQueue

是一个支持优先级的阻塞队列,元素的获取顺序按优先级决定 #### DelayQueue

是一个支持延迟获取元素的队列,同样支持优先级,即考虑延迟的情况下也要考虑优先级,如果优先级更大的元素的延迟尚未结束,后面优先级靠后的元素,即使延迟已经结束也无法获取

其类定义与接口如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>  
    implements BlockingQueue<E> { 
    
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     */
    private Thread leader;

    /**
     * Condition signalled when a newer element becomes available
     * at the head of the queue or a new thread may need to
     * become leader.
     */
    private final Condition available = lock.newCondition();
    // ......
}

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);

可以看到此类只接受Delayed的实现类作为元素,并且Delayed类继承了Comparable,支持优先级,其内部维护的leader变量减少不必要的等待,具体解释在类定义的注释中

offer()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e); // 向内部维护的优先队列添加元素
        if (q.peek() == e) { //如果入队后队首就是当前元素,那么直接进行一次唤醒操作(因为有可能之前就有其他线程等着take了)
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
take()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

该方法同样先获取锁,并且同样有自旋的操作,逻辑流程如下:

  • 首先获取队首元素,如果为空,那么说明队列一定为空,调用await()等待
  • 否则,获取队首元素的延迟时间,如果延迟结束,直接返回,延迟没有结束,放弃first,等待下一轮循环再次获取
  • 判断是否拥有leader线程,如果拥有,说明有其他的线程正在调用可超时的等待,当前线程直接await()
  • 否则,将当前线程设定为leader,并且设定当前线程的await()超时时间为delay,在超时后重新设定leader = null,继续循环获取队首元素进行判断
  • 在获取到元素后,如果判断没有可超时的等待(leader == null)并且队首元素不为空,则手动唤醒一个其他永久等待下的线程

多线程进阶

线程池

利用多线程,我们的程序可以更加合理地使用CPU多核心资源,在同一时间完成更多的工作。但是,如果我们的程序频繁地创建线程,由于线程的创建和销毁也需要占用系统资源,因此这样会降低我们整个程序的性能,为了解决这个开销,可以将已创建的线程复用,利用池化技术,就像数据库连接池一样,我们也可以创建很多个线程,然后反复地使用这些线程,而不对它们进行销毁。

比如我们的Tomcat服务器,要在同一时间接受和处理大量的请求,那么就必须要在短时间内创建大量的线程,结束后又进行销毁,这显然会导致很大的开销,因此这种情况下使用线程池显然是更好的解决方案。

由于线程池可以反复利用已有线程执行多线程操作,所以它一般是有容量限制的,当所有的线程都处于工作状态时,那么新的多线程请求会被阻塞,直到有一个线程空闲出来为止,实际上这里就会用到阻塞队列。

线程池的使用

直接通过解析ThreadPoolExecutor()对象的构造方法入手:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

其中的各项参数为:

  • corePoolSize:核心线程池大小,我们每向线程池提交一个多线程任务时,都会创建一个新的核心线程,无论是否存在其他空闲线程,直到到达核心线程池大小为止,之后会尝试复用线程资源。当然也可以在一开始就全部初始化好,调用 prestartAllCoreThreads()即可。
  • maximumPoolSize:最大线程池大小,当目前线程池中所有的线程都处于运行状态,并且等待队列已满,那么就会直接尝试继续创建新的非核心线程运行,但是不能超过最大线程池大小。
  • keepAliveTime:线程最大空闲时间,当一个非核心线程空闲超过一定时间,会自动销毁。
  • unit:线程最大空闲时间的时间单位
  • workQueue:线程等待队列,当线程池中核心线程数已满时,就会将任务暂时存到等待队列中,直到有线程资源可用为止,这里可以使用阻塞队列。
  • threadFactory:线程创建工厂,我们可以干涉线程池中线程的创建过程,进行自定义。
  • handler:拒绝策略,当等待队列和线程池都没有空间时,来了个新的多线程任务,那么只能拒绝了,这时就会根据当前设定的拒绝策略进行处理。

最为重要的就是线程池大小的限定,合理地分配大小会使得线程池的执行效率事半功倍:

  • 首先我们可以分析一下,线程池执行任务的特性,是CPU 密集型还是 IO 密集型
    • CPU密集型: 主要是执行计算任务,响应时间很快,CPU一直在运行,这种任务CPU的利用率很高,那么线程数应该是根据 CPU 核心数来决定,CPU 核心数 = 最大同时执行线程数,以 i5-9400F 处理器为例,CPU 核心数为 6,那么最多就能同时执行 6 个线程。
    • IO密集型: 主要是进行 IO 操作,因为执行 IO 操作的时间比较较长,比如从硬盘读取数据之类的,CPU就得等着IO操作,很容易出现空闲状态,导致 CPU 的利用率不高,这种情况下可以适当增加线程池的大小,让更多的线程可以一起进行IO操作,一般可以配置为CPU核心数的2倍。

其核心方法为ThreadPoolExecutor#execute(),接受一个Runnable作为线程待执行的任务

通常的拒绝策略有四个:

  • AbortPolicy(默认):直接抛异常。
  • CallerRunsPolicy:直接让提交任务的线程运行这个任务,比如在主线程向线程池提交了任务,那么就直接由主线程执行。
  • DiscardOldestPolicy:丢弃队列中oldest的一个任务,替换为当前任务。
  • DiscardPolicy:什么也不用做。

同样的,我们也可以重写RejectedExecutionHandler接口,实现自定义handler,ThreadFactory也是一个可重写的接口,提供干涉新线程创建的窗口

当线程池中的线程由于异常中断时,会进行销毁。

此外,Executors也提供了几个静态方法来快速创建线程池:

  • newFixedThreadPool
  • 内部实现是coreThreadSize=maxThreadSize,使用LinkedBlockingQueue<>作为等待队列
  • newSingleThreadExecutor
  • 该方法将创建的ExecutorService对象封装为FinalizableDelegatedExecutorService,提供一层保护,防止用户更改线程池大小(前者可以调用.setPoolSize()方法)- 与Spring中的Delegated方法一样,不是真正的进行销毁,而是进行保留复用
1
2
3
4
5
6
7
8
static class DelegatedExecutorService extends AbstractExecutorService {
    private final ExecutorService e;    //被委派对象
    DelegatedExecutorService(ExecutorService executor) { e = executor; }   //实际上所以的操作都是让委派对象执行的,有点像代理
    public void execute(Runnable command) { e.execute(command); }
    public void shutdown() { e.shutdown(); }
    public List<Runnable> shutdownNow() { return e.shutdownNow(); }
    // ...
}
  • newCachedThreadPool
    • 是一个核心线程数为0,最大线程数为Integer.MAX_VALUE

执行带返回值的任务

AbstractExecutorService#submit()可以接受三种形式的参数:

  • Runnable
  • Runnable + Result value
  • Callable 或者是直接传入FutureTask<>对象(该对象相当于后两者情况)

返回一个Future<?>对象,可以通过该对象获取当前任务的一些状态

执行定时任务

通过ScheduledThreadPoolExecutor来提交定时任务,它继承自ThreadPoolExecutor,并且所有的构造方法都要求最大线程池容量为Integer.MAX_VALUE,采用DelayedQueue作为等待队列

同样的,ScheduledThreadPoolExecutor#schedule()方法支持返回值任务,通过ScheduledFuture<?>对象进行接受

猜测,所有的任务先进入DelayedQueue后再进行取出

ScheduledThreadPoolExecutor#scheduleAtFixedRateScheduledThreadPoolExecutor#scheduleWithFixedDelay两个方法可以以一定的频率不断执行任务

线程池实现原理

同样的,我们先从核心变量入手,然后walkThrough其核心方法executeshutdown #### 核心变量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//使用AtomicInteger,用于同时保存线程池运行状态和线程数量(使用原子类是为了保证原子性)
//它是通过拆分32个bit位来保存数据的,前3位保存状态,后29位保存工作线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;    //29位,线程数量位
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;   //计算得出最大容量

// 所有的运行状态,注意都是只占用前3位,不会占用后29位

// 接收新任务,并等待执行队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS;   //111 | 数量位
// 不接收新任务,但是依然等待执行队列中的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;   //000 | 数量位
// 不接收新任务,也不执行队列中的任务,并且还要中断正在执行中的任务
private static final int STOP       =  1 << COUNT_BITS;   //001 | 数量位
// 所有的任务都已结束,线程数量为0,即将完全关闭
private static final int TIDYING    =  2 << COUNT_BITS;   //010 | 数量位
// 完全关闭
private static final int TERMINATED =  3 << COUNT_BITS;   //011 | 数量位

// 封装和解析ctl变量的一些方法
// 取前三位运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; } 
// 取后29位线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 将运行状态与线程数量拼接
private static int ctlOf(int rs, int wc) { return rs | wc; }   

//指定的阻塞队列
private final BlockingQueue<Runnable> workQueue;

ThreadPoolExecutor#execute(Runnable)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@link RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

该方法的核心逻辑有三,在官方注释中已经详尽介绍:

  • 判断当前运行的线程数量,如果小于核心线程数量则尝试调用addWorker创建一个新的核心线程,将当前Runnable设定为该线程的任务,否则,证明在创建新线程过程中有其他线程抢先,需要重新获取线程池状态(ctl)继续判断
  • 进入当前条件判断的前提是运行线程数量不小于核心线程大小;判断当前线程池是否处于RUNNING态,并尝试将当前Runnable任务加入阻塞队列,同样的,由于该方法没有加锁,需要进行double-check,再次判断当前线程池的状态
  • 若当前线程池的状态不为RUNNING(进入了SHUTDOWN态),则证明该Runnable任务不该加入阻塞队列,从队列中取出并执行reject
  • 或者该线程池处于运行状态,但由于其他线程可能导致的不一致性与过渡态,或者线程池中的线程(worker)由于初始化、超时、中断等原因结束了其生命周期,调用addWorker添加一个first任务为空的非核心线程,确保新加入阻塞队列的Runnable可以被预期执行,并且维护其中的队列规则,例如FIFO,priority-based
  • 如果进入阻塞队列失败,或者线程池不处于RUNNING状态,尝试调用addWorker添加一个first为当前Runnable的非核心线程,若失败直接调用reject

ThreadPoolExecutor#addWorker(Runnable, boolean)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();

                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                container.start(t);
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

同样该方法考虑多线程情况,因此使用两个for循环实现自旋锁,保证线程安全,确保阻塞队列状态与线程池工作状态合法,并且可以添加,才会进入实际的worker添加代码段

  • 对于外层循环,其主要任务是判断线程池的状态
    • 首先判断当前线程池是否处于RUNNING状态
    • 若不处于RUNNING状态,且处于STOP以上状态,或者处于SHUTDOWN状态(该状态下线程池不再接受新线程,但会执行剩余的线程)但firstTask不为空,或者处于SHUTDOWN状态且阻塞队列为空(满足状态进一步切换 - tryTerminate()),返回false,表明无法添加worker
  • 对于内层循环,其主要任务是将线程池中的worker计数增加,采取自旋+CAS方式,增加成功才会执行实际的worker添加代码段
    • 首先判断当前线程池的线程数量(worker)是否未超过设定值(核心与非核心),如果超过直接返回false
    • 若满足线程数量要求,尝试增加线程池中的worker数量,若CAS成功,退出外层循环,进入worker添加段
    • CAS操作失败,重新获取当前线程池的状态ctl,若当前线程池状态处于SHUTDOWN及以上状态,证明线程池状态已经不再处于RUNNING,退出内层循环,重新进行外层循环,判断线程池的状态,否则,重新进行内层循环,仅仅是CAS线程池的worker数量失败,不涉及线程池状态的变化
  • 退出了双层循环,进入了实际添加worker的代码段
  • 将当前的Runnable任务封装为Worker对象,该对象继承自AQS,本质上也是一个队列同步器,并且根据Worker对象获取线程,double-check其初始化过程
  • 尝试获取线程池中的ReentrantLock,在获取锁之后,进行一次recheck,判断当前线程池是否处于RUNNING状态,或者是SHUTDOWN状态且Runnablenull(含义为创建新线程Worker执行队列中的任务,但不接受新Runnable任务)
    • 判断当前Worker中的线程是否开始执行,若已经开始执行则抛出异常
    • 否则,将当前Worker对象加入线程池维护的可用线程对象集合
  • 如果成功将当前Worker对象加入线程池维护的可用线程对象集合,开始运行该线程

接下来分析Worker对象及其是如何开始运行Runnable任务的 #### Worker

该类是继承自AQS,本质上也是一把锁,也重写了tryAcquiretryRelease方法 ##### 基础变量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    /** Thread this worker is running in.  Null if factory fails. */
    @SuppressWarnings("serial") // Unlikely to be serializable
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    @SuppressWarnings("serial") // Not statically typed as Serializable
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker. */
    public void run() {
        runWorker(this);
    }
    // Lock methods  
    //  
    // The value 0 represents the unlocked state.  
    // The value 1 represents the locked state.
    protected boolean isHeldExclusively() {  
        return getState() != 0;  
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    // .....
}

ThreadPoolExecutor#runWorker()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and then we
 * ensure that unless pool is stopping, this thread does not have
 * its interrupt set.
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to afterExecute.
 * We separately handle RuntimeException, Error (both of which the
 * specs guarantee that we trap) and arbitrary Throwables.
 * Because we cannot rethrow Throwables within Runnable.run, we
 * wrap them within Errors on the way out (to the thread's
 * UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 * @param w the worker
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

该方法的核心逻辑也在官方注释中已经详尽介绍:

  • 首先尝试获取Worker类中的Runnable,并且调用unlock()允许中断
  • Worker中存在first Runnable,或者调用getTask()成功获取Runnable任务(getTask()会阻塞线程直到超时或者得到任务),尝试获取Worker维护的锁,此处的锁在于在shutdown时保护此Worker完成任务
  • 判断线程池是否处于STOP及以上状态并且运行线程池的线程没有被中断标记,打上中断标记
  • 尝试获取当前线程是否被中断,重置中断标记,若处于STOP及以上状态,打上中断标记
  • 此处保证线程池STOP及以上状态时被中断,否则没有被中断 ==ShutdownNow==
  • 真正执行Runnablerun方法,然后解锁继续循环获取任务
  • 退出了循环获取任务,证明该Worker可以被丢弃,直接调用processWorkerExit()

ThreadPoolExecutor#getTask()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

该方法的核心思想也在官方注释中有详解,具体逻辑为:

  • 该方法不显式加锁,使用自旋锁,猜测使用了CAS,进行循环
  • 首先判断当前线程池状态,若线程池状态为SHUTDOWN且等待队列为空,或者线程池状态为STOP及以上,减少一个工作线程worker的计数,返回null告知runWorker方法
  • 判断当前线程池中的工作线程(worker)是否大于最大线程容量(通常为容量大小被动态修改)或者当前worker已经超时,并且线程池中的工作线程worker大于1(避免Last Thread Scenario及不一致性与过渡态,因为该方法的超时判断或容量判断是没有显式加锁的)或等待队列为空
  • 判断当前worker是否可超时,根据核心线程是否允许超时或者当前工作线程数量大于核心线程数量(当前worker为非核心线程)
  • 正常通过阻塞队列获取任务,根据是否可超时决定

ThreadPoolExecutor#shutdown()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
 * Initiates an orderly shutdown in which previously submitted
 * tasks are executed, but no new tasks will be accepted.
 * Invocation has no additional effect if already shut down.
 *
 * <p>This method does not wait for previously submitted tasks to
 * complete execution.  Use {@link #awaitTermination awaitTermination}
 * to do that.
 *
 * @throws SecurityException {@inheritDoc}
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

该方法会优先获取锁,然后:

  • 调用checkShutdownAccess()检查是否有权限终止
  • 调用advanceRunState(SHUTDOWN)CAS将线程池的状态改为SHUTDOWN
  • 调用interruptIdleWorkers()让空闲的线程中断
  • 最后调用tryTerminate()尝试中止线程池 #### ThreadPoolExecutor#interruptIdleWorkers()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 *
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers.  In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case all threads are currently waiting.
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

该方法逻辑简单,核心点在于判断语句中的w.tryLock(),该语句解释了为什么在runWorker中的添加锁是为了防止shutdown()时错误停止任务

**shutdown()tryTerminate()都会调用这个方法,唯一的区别是参数不同,在注释中有写,当使用tryTerminate()方法调用时,仅会中断一个线程,这和前面的wc > 1,即Last Thread Scenario相关

ThreadPoolExecutor#tryTerminate()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * Transitions to TERMINATED state if either (SHUTDOWN and pool
 * and queue empty) or (STOP and pool empty).  If otherwise
 * eligible to terminate but workerCount is nonzero, interrupts an
 * idle worker to ensure that shutdown signals propagate. This
 * method must be called following any action that might make
 * termination possible -- reducing worker count or removing tasks
 * from the queue during shutdown. The method is non-private to
 * allow access from ScheduledThreadPoolExecutor.
 */
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                    container.close();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

该方法可以认为:判断当前线程池是否可以关闭或者已经接近关闭,如果不是(SHUTDOWN且阻塞队列为空或者是STOP),则中断一个空闲状态下的线程,再次自旋尝试

ThreadPoolExecutor#shutdownNow()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

该方法与shutdown()的区别仅为:1. 将状态设定为STOP。2.调用interruptWorkers()不尝试获取锁,若开始直接中断。3. 将队列中等待的任务返回。 其余核心逻辑一致 ## 并发工具类 ### CountDownLatch

本质上是使用共享锁机制的AQS实现,初始化锁数量为设定值,因此支持多个线程同时等待多个线程的情况 ### CyclicBarrier

CountDownLatch的区别:

  • CountDownLatch:
  1. 它只能使用一次,是一个一次性的工具
  2. 它是一个或多个线程用于等待其他线程完成的同步工具
  • CyclicBarrier
  1. 它可以反复使用,允许自动或手动重置计数
  2. 它是让一定数量的线程在同一时间开始运行的同步工具

该对象内部方法较为简单,维护了一个ReentrantLock及其Condition对象,直接源码分析即可,注意其可重用性,reset,及broken状态即可 ### Semaphore

同样的,本质上是使用共享锁机制的AQS实现 ### Exchanger

代码示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public static void main(String[] args) throws InterruptedException {
    Exchanger<String> exchanger = new Exchanger<>();
    new Thread(() -> {
        try {
            System.out.println("收到主线程传递的交换数据:"+exchanger.exchange("AAAA"));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    System.out.println("收到子线程传递的交换数据:"+exchanger.exchange("BBBB"));
}

Fork/Join Framework

其核心逻辑可以看作是拆分任务,并使用多线程,即多线程Context下的递归/分治算法,并且可以利用工作窃取算法,提高线程的利用率

工作窃取算法: 是指某个线程从其他队列里窃取任务来执行。一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务待处理。干完活的线程与其等着,不如帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。

Arrays工具类提供的并行排序也是利用了ForkJoinPool来实现。

  • ☐ TODO: 单例模式,懒汉,饿汉,静态内部类
0%