因为除了内存,数据还会被缓存在寄存器和各级缓存中
- 访问变量时,不一定从内存中取
- 修改变量时,可能先写到缓存,稍后才同步到内存中
CAS CompareAndSwap
- 是一种非阻塞同步方式,采用基于冲突检测的乐观并发策略
- 包含三个操作数:需要读写的内存位置V、预期值A、拟写入的新值B
- 当且仅当V=A时,CAS才会通过原子方式用B更新V的值,否则不会执行任何操作
- 无论是否操作成功,都返回V原有的值
- 多个线程尝试使用CAS同时更新一个变量时,只有一个线程可以成功变更,其它都会失败,但是失败的线程不会被挂起,而是被告知失败,可以再次尝试(通常在失败时不执行任何操作)
- CAS 的整个过程是原子的,依赖于硬件指令集
- CAS 是 Java 并发包的基础,基于 CAS 可以实现高效、乐观、非阻塞的数据结构和算法,它也是并发包的锁、同步工具和各种容器的基础
java.util.concurrent.atomic
包含了一些原子变量类,也是基于 CAS 实现的,使用示例:// 线程不安全
public class ThreadSafeCounter {
private final long count = 0L;
public void plusOne() {
// 自增操作包含 读取-修改-写入 三个步骤,且非原子操作
count++;
}
}
// 线程安全
public class ThreadSafeCounter {
private final AtomicLong count = new AtomicLong(0);
public void plusOne() {
// 原子操作
count.incrementAndGet();
}
}
AtomicStampedReference
维护一个引用和对应的版本号,且修改操作是原子的 private void test() {
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 111);
// 所有的修改操作都是原子的
atomicStampedReference.attemptStamp(2, 222);
atomicStampedReference.attemptStamp(1, 333); // 值为1,但版本号不同
}
this
引用向外抛出,并被其他线程访问了该引用this
引用逸出产生的条件:在构造函数中 创建 并 发布 内部类(即使发布的语句是构造函数的最后一行)this
隐式的作为一个参数传递给内部类// 逸出示例
public class ThisEscape {
public final int id;
public final String name;
public ThisEscape(EventSource<EventListener> source) {
id = 1;
name = "xjpking";
// 在此创建了内部类,内部类具有外部类的this引用,但外部类的构造方法并未执行完成
// 有可能因为指令重排序,先执行下面的注册监听,再执行上面的赋值
source.registerListener(new EventListener() {
public void onEvent(Object obj) {
System.out.println("id: "+ThisEscape.this.id);
System.out.println("name: "+ThisEscape.this.name);
}
});
}
}
final
修饰this
逸出)组合 | 行为 |
---|---|
同步阻塞 | 发送方发送请求之后一直等待响应;接收方处理请求如果不能马上等到返回结果,就一直等到返回结果后,才响应发送方 |
同步非阻塞 | 发送方发送请求之后一直等待响应;接收方立即返回结果 |
异步阻塞 | 发送方向接收方请求后,不等待响应,可以继续其他工作;接收方处理请求如果不能马上等到返回结果,就一直等到返回结果后,才响应发送方 |
异步非阻塞 | 发送方向接收方请求后,不等待响应,可以继续其他工作;接收方立即返回结果 |
volatile
变量同样遵循,只是看起来如同直接在主内存中读写访问volatile
关键字synchronized
关键字ReentrantLock
class Counter {
public int nonSyncCount = 0;
public volatile int volatileCount = 0;
public int synchronizedCount = 0;
public AtomicInteger atomicCount = new AtomicInteger(0);
public int lockCount = 0;
private final ReentrantLock lock = new ReentrantLock();
public void nonSyncIncrease() {
nonSyncCount++;
}
public void volatileIncrease() {
volatileCount++;
}
public synchronized void synchronizedIncrease() {
synchronizedCount++;
}
public void atomicIncrease() {
atomicCount.incrementAndGet();
}
public void lockIncrease() {
lock.lock();
lockCount++;
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
// 创建10个线程,每个线程调用5个计数值的自增方法
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
counter.nonSyncIncrease();
counter.volatileIncrease();
counter.synchronizedIncrease();
counter.atomicIncrease();
counter.lockIncrease();
}
}).start();
}
while (Thread.activeCount() > 1) {
Thread.yield();
}
System.out.println(counter.nonSyncCount); // 未同步,结果随机
System.out.println(counter.volatileCount); // 仅使用volatile修饰变量,结果随机,因为执行结果依赖于旧值
System.out.println(counter.synchronizedCount); // 使用synchronized同步,100000
System.out.println(counter.atomicCount); // 使用原子变量,100000
System.out.println(counter.lockCount); // 使用显式锁,100000
}
AtomicInteger...
Lock, Condition, AbstractQueueSynchronizer...
Executor, Future, Callable, ThreadPoolExecutor...
LinkedBlockingQueue...
ConcurrentHashMap, CopyOnWriteArrayList...
CountDownLatch, Semaphore...
new Thread
时没有显式指定,那么默认将父线程(当前执行new Thread的线程)线程组设置为自己的线程组举例说明:线程A切换到线程B
1.先挂起线程A,将其在CPU中的状态保存在内存中
2.在内存中检索下一个线程B的上下文,并将其在 CPU 的寄存器中恢复,开始执行B线程
3.当B执行完,根据程序计数器中指向的位置恢复线程A
start()
方法后开始运行,线程这时候处于 READY(可运行) 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING(运行) 状态。以上两种状态都属于 RUNNABLEwait()
或 join()
方法之后,线程进入 WAITING(等待) 状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态,而 TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制,比如通过 sleep(long millis)
、join(long millis)
方法或 wait(long millis)
方法可以将 Java 线程置于 TIMED_WAITING 状态。当超时时间到达后 Java 线程将会返回到 RUNNABLE 状态run()
方法之后将会进入到 TERMINATED(终止) 状态反复调用同一个线程的start()方法是否可行?假如一个线程执行完毕(此时处于TERMINATED状态),再次调用这个线程的 start() 方法是否可行?
两个问题的答案都是不可行,因为 threadStatus 的值会改变,调用 start() 的前提是 threadStatus==0 ,此时再次调用 start() 方法会抛 IllegalThreadStateException异常
另参考 十一、同步工具类
Object
类的 wait()
方法和 notify()
(随机叫醒一个正在等待的线程) ,notifyAll()
(叫醒所有正在等待的线程) 方法实现wait()
和 notify()
只能在对应的 synchronized
代码块内被调用,否则会抛出异常
synchronized
保护的对象的 wait()
和 notify()
synchronized
保护的是子线程对象 this
,所以调用的也是 this
的 wait()
和 notify()
方法wait()
的具体过程
WAITING/TIMED_WAITING
notify()/notifyAll()
,从条件等待队列中移除,尝试竞争对象锁
RUNNABLE
,并从 wait()
调用中返回BLOCKED
,只有获得锁后才从wait()
调用中返回wait()
返回后,线程重新获得锁,但不代表等待的条件一定成立,需要重新检查条件synchronized(obj) {
while(条件不成立) {
obj.wait();
}
满足条件后的操作
}
notify()
会唤醒条件队列的线程并将其移除,但不会立刻释放锁,只有所在的同步代码块执行完后才会释放锁,被唤醒的线程获得锁后才会从 wait()
返回wait-notify
DEMO public static void main(String[] args) throws InterruptedException {
WaitThread waitThread = new WaitThread();
waitThread.start(); // 子线程启动
Thread.sleep(2000L);
waitThread.setFlagTrue(); // 主线程改变标志位
}
class WaitThread extends Thread {
private volatile boolean flag = false;
@Override
public void run() {
System.out.println("sub thread running");
try {
synchronized (this) {
// 子线程执行循环
while (!flag) {
wait(); // 子线程加入子线程对象的条件队列,释放锁,等待唤醒
}
// 直到主线程将flag修改为true,并调用notify()唤醒子线程,跳出循环...
System.out.println("loop end");
}
} catch (InterruptedException ignored) {
}
}
public synchronized void setFlagTrue() {
// 主线程修改flag,并唤醒在子线程对象的条件队列上的子线程
flag = true;
System.out.println("main thread call notify()");
notify();
}
}
输出结果:
sub thread running
main thread call notify()
loop end
volatile
部分PipedWriter
、 PipedReader
、 PipedOutputStream
、 PipedInputStream
互斥条件
:该资源任意一个时刻只由一个线程占用请求与保持条件
:一个进程因请求资源而阻塞时,对已获得的资源保持不放不剥夺条件
:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源循环等待条件
:若干进程之间形成一种头尾相接的循环等待资源关系synchronized
) 避免同时持有多个锁,但这种做法也削弱了原子性
// 不使用开放调用
class Taxi {
private Position location;
private Position destination;
private Dispatcher dispatcher;
public synchronized void setLocation(Position location) {
this.location = location;
if (location.equals(destination)) {
dispatcher.notifyAvailable(this); // 持有本车的锁,请求车队的锁
}
}
public synchronized Position getLocation() {
return location;
}
}
class Dispatcher {
private Set<Taxi> taxis;
private Set<Taxi> availableTaxis;
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
public synchronized void showAllLocations() {
for (Taxi t : taxis) {
Position location = t.getLocation(); // 持有车队的锁,获取车的锁
// ...
}
}
}
// 使用开放调用
class Taxi {
private Position location;
private Position destination;
private Dispatcher dispatcher;
public void setLocation(Position location) {
boolean reach = false;
// 仅持有车的锁
synchronized (this) {
this.location = location;
if (location.equals(destination)) {
reach = true;
}
}
if (reach) {
// 仅持有车队的锁
dispatcher.notifyAvailable(this);
}
}
public synchronized Position getLocation() {
return location;
}
}
class Dispatcher {
private Set<Taxi> taxis;
private Set<Taxi> availableTaxis;
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
public void showAllLocations() {
Set<Taxi> copy;
// 仅持有车队的锁
synchronized (this) {
copy = new HashSet<>(this.taxis);
}
for (Taxi t : copy) {
// 仅持有车的锁
Position location = t.getLocation();
// ...
}
}
}
class LockByOrderDemo {
/**
* 加时锁,只有两个转账账户无法比较时使用
* 如果ID不重复则不会使用
*/
private static final Object lock = new Object();
public void transferMoney(Account from, Account to, double amount) {
// 优先加锁id小的账户
if (from.getId() < to.getId()) {
synchronized (from) {
synchronized (to) {
if (from.getBalance() > 0) {
from.decrease(amount);
to.increase(amount);
}
}
}
} else if (from.getId() > to.getId()) {
synchronized (to) {
synchronized (from) {
if (from.getBalance() > 0) {
from.decrease(amount);
to.increase(amount);
}
}
}
} else {
// 如果重复则需要加时锁,保证每次只有一个线程以未知的顺序获取两个账户的锁
synchronized (lock) {
synchronized (from) {
synchronized (to) {
if (from.getBalance() > 0) {
from.decrease(amount);
to.increase(amount);
}
}
}
}
}
}
}
sleep()
方法没有释放锁,而 wait()
方法释放了锁(从 wait()
返回后线程又重新获得锁)wait()
通常被用于线程间交互/通信,sleep()
通常被用于暂停执行wait()
无参方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的 notify()
或者 notifyAll()
方法,且线程竞争到锁之后,才能从方法中返回;sleep()
方法执行完成后,线程会自动苏醒wait()
可以指定时间,也可以不指定;而 sleep()
必须指定时间wait()
释放CPU资源,同时释放锁;sleep()
释放CPU资源,但是不释放锁,所以易死锁为什么 sleep 函数的精度很低?
- sleep函数并不能起到定时的作用,主要作用是延时
- 在一些多线程中可能会看到sleep(0),其主要目的是让出时间片
- 当系统越繁忙的时候它精度也就越低,因为它的精度取决于线程自身优先级、其他线程的优先级,以及线程的数量等因素
start()
方法,会启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了。 start()
会执行线程的相应准备工作,然后自动执行 run()
方法的内容,实现多线程工作run()
方法,会把 run()
方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,所以不是多线程工作join()
方法,线程 a 进入阻塞状态,直到线程 b 完全执行完,线程 a 从阻塞状态中恢复join()
public static void main(String[] args) {
Thread t = new MyThread();
thread.start();
thread.join();
// 主线程的其它功能...(会在子线程执行完后开始执行)
}
Runnable
接口不会返回结果或抛出检查异常,Callable
接口可以Runnable
,这样代码看起来会更加简洁Thread
类,并重写 run()
方法,使用时直接创建该类的对象public class Demo {
public static class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread");
}
}
public static void main(String[] args) {
Thread myThread = new MyThread();
myThread.start();
}
}
Runnable
接口并重写 run()
方法,使用时将该类的对象作为参数传递到 Thread
类的构造方法中
public class Demo {
public static class MyThread implements Runnable {
@Override
public void run() {
System.out.println("MyThread");
}
}
public static void main(String[] args) {
new Thread(new MyThread()).start();
}
}
Callable
接口,并重写 call()
方法
call()
可以有返回值//1.创建一个实现Callable的实现类
class NumThread implements Callable{
//2.实现call方法,将此线程需要执行的操作声明在call()中
@Override
public Object call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
if(i % 2 == 0){
System.out.println(i);
sum += i;
}
}
return sum;
}
}
public class ThreadNew {
public static void main(String[] args) {
//3.创建Callable接口实现类的对象
NumThread numThread = new NumThread();
//4.将此Callable接口实现类的对象作为传递到FutureTask构造器中,创建FutureTask的对象
FutureTask futureTask = new FutureTask(numThread);
//5.将FutureTask的对象作为参数传递到Thread类的构造器中,创建Thread对象,并调用start()
new Thread(futureTask).start();
try {
//6.获取Callable中call方法的返回值
//get()返回值即为FutureTask构造器参数Callable实现类重写的call()的返回值。
Object sum = futureTask.get();
System.out.println("总和为:" + sum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public class ThreadPool {
public static void main(String[] args) {
//1. 提供指定线程数量的线程池
ExecutorService service = Executors.newFixedThreadPool(10);
ThreadPoolExecutor service1 = (ThreadPoolExecutor) service;
//2.执行指定的线程的操作。需要提供实现Runnable接口或Callable接口实现类的对象
service.execute(new NumberThread());//适合适用于Runnable
// service.submit(Callable callable);//适合使用于Callable
//3.关闭连接池
service.shutdown();
}
}
thread_instance.getPriority()
setPriority(int LEVEL)
,默认5,最高10,最低1,优先级越高,先执行的 概率 更大finally
块)Happens-Before
- 上图展示了两个线程使用同一个锁时,它们之间的
Happens-Before
关系- JMM为程序中的所有操作定义的
偏序关系
- 要保证执行操作B的线程看到操作A的结果(无论A和B是否由同一个线程执行),A和B之间必须满足
Happens-Before
关系- 如果不满足,则JVM可以对操作A和B进行任意重排序
synchronized
属于 重量级锁,效率低下
trap
指令)synchronized void method() {
// TODO
}
synchronized
方法,而线程 B 需要调用这个实例对象所属类的静态 synchronized
方法,是允许的,不会发生互斥现象,因为访问静态 synchronized
方法占用的锁是当前类的锁,而访问非静态 synchronized
方法占用的锁是当前实例对象锁。synchronized static void method() {
// TODO
}
synchronized(this|object)
表示进入同步代码库前要获得给定对象的锁synchronized(类.class)
表示进入同步代码前要获得 当前 class 的锁synchronized(this) {
// TODO
}
synchronized(String a)
,因为字符串常量池具有缓存功能synchronized
关键字修饰。因为构造方法本身就属于线程安全的public class Singleton {
private volatile static Singleton uniqueInstance; // 对象实例,需要用volatile修饰
private Singleton() { // 构造方法,设置为private
}
public static Singleton getUniqueInstance() { // 创建实例
//先判断对象是否已经实例过,没有实例化过才进入加锁代码
if (uniqueInstance == null) {
//类对象加锁,保证只能创建一个实例
synchronized (Singleton.class) {
if (uniqueInstance == null) {
uniqueInstance = new Singleton();
// 执行过程:
// 1.为 uniqueInstance 分配内存空间
// 2.初始化 uniqueInstance
// 3.将 uniqueInstance 指向分配的内存地址
}
}
}
return uniqueInstance;
}
}
volatile
关键字的原因:
uniqueInstance = new Singleton()
执行顺序有可能变成注释中的 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例(仅仅是刚分配了内存空间)getUniqueInstance()
后发现 uniqueInstance 不为空,因此返回 uniqueInstance,但此时 uniqueInstance 还未被初始化volatile
禁止 JVM 的指令重排,保证在多线程环境下也能正常运行synchronized
关键字和 volatile
关键字是互补而非对立的关系volatile
关键字是线程同步的 轻量级 实现,性能比 synchronized
关键字好volatile
关键字只能用于变量(仅仅保证对单个volatile变量的读/写具有原子性),而 synchronized
关键字可以修饰方法以及代码块volatile
保证变量在多个线程之间的 可见性 ,而 synchronized
可以保证 变量的可见性 和 操作的原子性屏障类型 | 作用 |
---|---|
LoadLoad | 对于语句 Load1; LoadLoad; Load2 ,在 Load2 及后续读取操作要读取的数据被访问前,保证 Load1 要读取的数据被读取完毕 |
StoreStore | 对于语句 Store1; StoreStore; Store2 ,在 Store2 及后续写入操作执行前,保证 Store1 的写入操作对其它处理器可见 |
LoadStore | 对于语句 Load1; LoadStore; Store2 ,在 Store2 及后续写入操作被刷出前,保证 Load1 要读取的数据被读取完毕 |
StoreLoad | 对于语句 Store1; StoreLoad; Load2 ,在 Load2 及后续所有读取操作执行前,保证 Store1 的写入对所有处理器可见 |
volatile
读写前后均加上内存屏障,在一定程度上保证有序性在每个 读操作后 插入 LoadLoad 屏障和 LoadStore 屏障
volatile
不满足线程安全的要求,需要加锁class VolatileThreadUnsafeDemo {
private volatile int count = 0;
private void increase() {
count++; // 新值依赖原值,违背了条件1
}
public void test() {
int threadCount = 20;
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
increase();
}
}).start();
}
while (Thread.activeCount() > 1) {
Thread.yield();
}
System.out.println(count); // 结果是随机的
}
}
state = 0
,表示 ReentrantLock
目前处于解锁状态lock()
方法进行加锁,state = 1
,如果该线程再次调用 lock()
方法加锁,就执行 state++
unlock()
方法释放锁,会让 state--
state
的数值,即可知道 ReentrantLock
被重入的次数了void m1() {
lock.lock();
try {
// 调用 m2,因为可重入,所以并不会被阻塞
m2();
} finally {
lock.unlock()
}
}
void m2() {
lock.lock();
try {
// do something
} finally {
lock.unlock()
}
}
synchronized
使用的是对象或类进行加锁,而 ReentrantLock
内部是通过 AQS(AbstractQueuedSynchronizer)
中的同步队列进行加锁class Window implements Runnable{
private int ticket = 100;
//1.实例化ReentrantLock
private ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
while(true){
try{
//2.调用锁定方法lock()
lock.lock();
if(ticket > 0){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":售票,票号为:" + ticket);
ticket--;
}else{
break;
}
}finally {
//3.调用解锁方法:unlock()
lock.unlock();
}
}
}
}
类型 | 数据结构 | 使用的锁 |
---|---|---|
ConcurrentHashMap JDK1.7 | Segment 数组 + HashEntry 数组 + 链表 | Segment (本质是 ReentrantLock ),每次锁若干 HashEntry |
ConcurrentHashMap JDK1.8 | Node 数组 + 链表/红黑树 | synchronized ,每次锁一个 Node |
Hashtable | 数组+链表 | synchronized ,每次锁全表 |
ConcurrentHashMap
采用分段锁机制,对整个桶数组进行了分割分段(Segment,每个 Segment 都是一个可重入锁),每一个 Segment 只锁容器其中一部分数据,多线程访问容器里不同数据段的数据不会存在锁竞争,提高并发访问率static class Segment<K,V> extends ReentrantLock implements Serializable {...}
Segment
的概念,synchronized
只锁定当前链表或红黑二叉树的首节点,并发控制使用 synchronized
和 CAS 来操作,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本synchronized
来保证线程安全,效率非常低下
transferIndex
标志位,每个线程领取完任务就减去多少ForwardingNode
,如果不为空就加锁拷贝,拷贝完成之后也设置为 ForwardingNode
节点transferIndex = 0
,这个时候就说明所有的桶都领取完了,但是别的线程领取任务之后有没有处理完并不知道,该线程会将 sizeCtl
的值减1sizeCtl - 2 = rs<< RESIZE_STAMP_SHIFT
(此时的 sizeCtl
高16位是本次扩容的标识,低16位值为2,即只有当前线程正在参与扩容),才会将旧数组用新数组覆盖,并且会重新设置 sizeCtl
的值为0.75n,作为新数组的扩容阈值private transient volatile int sizeCtl;
ConcurrentHashMap
的状态sizeCtl
的高16位是标志位(每一轮扩容生成的一个唯一的标志),低16位等于 参与扩容的线程数+1sizeCtl - 2 == rs<<RESIZE_STAMP_SHIFT(值为16)
,说明所有参与扩容的线程都执行完,即扩容完毕,rs 是本次扩容的标志sizeCtl | 含义 |
---|---|
-1 | 正在初始化 |
0.75n | 正常状态,代表扩容阈值 |
其它负值 | 有其他线程正在扩容 |
sizeCtl
最初是 rs
带符号左移16位的结果,它的符号位为1,所以数值为负transferIndex = table.length = 64
,每个线程领取的桶个数是16,第一个线程领取完任务后 transferIndex = 48
,也就是说第二个线程这时进来是从第48个桶开始处理,再减去16,依次类推,这就是多线程协作处理的原理nextTable
,扩容期间可以通过 find
方法,访问已经迁移到了 nextTable
中的数据CopyOnWriteArrayList
基于 ReentrantLock
实现;CopyOnWriteArraySet
基于 CopyOnWriteArrayList
实现ConcurrentLinkedQueue
、ConcurrentLinkedDeque
ArrayBlockingQueue
、LinkedBlockingQueue
、LinkedBlockingDeque
BlockingQueue
接口,适合用于作为数据共享的通道ReentrantLock
和 Condition
实现BlockingQueue
提供了可阻塞的插入和移除的方法:当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止PriorityBlockingQueue
DelayQueue
SynchronousQueue
ConcurrentSkipListMap / ConcurrentSkipListSet
数据有序,没有使用锁,所有操作都是非阻塞的,包括写接口 | 功能 |
---|---|
Runnable, Callable | 要执行的异步任务 |
Executor, ExecutorService | 执行服务 |
Future | 异步执行的结果,包含返回值或异常 |
Executor
是一个顶层接口,在它里面只声明了一个方法 execute(Runnable)
,返回值为 void
,参数为 Runnable
类型ExecutorService
接口继承了 Executor
接口,并声明了一些方法:submit
、invokeAll
、invokeAny
以及shutDown
等AbstractExecutorService
实现了 ExecutorService
接口,基本实现了 ExecutorService
中声明的所有方法
execute(task)
异步执行,无返回值submit(task)
异步执行,返回 Future
对象,可通过 task.get()
同步到主线程invoke(task)
一直阻塞到任务执行完成返回
- 如果将运行时间较长和较短的任务提交给一个线程池,可能会产生饥饿
- 如果一些任务依赖于其它任务的执行结果,可能会产生死锁
参数 | 含义 | 说明 | 是否必须 |
---|---|---|---|
int corePoolSize | 线程池中核心线程数最大值 | 一旦创建,核心线程默认情况下会一直存在于线程池中(核心线程不会预先创建,有任务时才会创建);非核心线程如果长时间的闲置就会被销毁 | 是 |
int maximumPoolSize | 线程池中线程总数最大值 | 核心线程数量 + 非核心线程数量 | 是 |
long keepAliveTime | 非核心线程闲置超时时长 | 非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置 allowCoreThreadTimeOut(true) ,则会也作用于核心线程 | 是 |
TimeUnit unit | keepAliveTime 的单位 | 枚举类型 | 是 |
BlockingQueue workQueue | 阻塞队列,维护着等待执行的 Runnable 任务对象 | 下面补充说明 | 是 |
ThreadFactory threadFactory | 线程创建工厂 | 用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂 | 否 |
RejectedExecutionHandler handler | 拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略 | 下面补充说明 | 否 |
常用的阻塞队列
LinkedBlockingQueue
:底层数据结构是链表,默认大小是 Integer.MAX_VALUE
,也可以指定大小ArrayBlockingQueue
:底层数据结构是数组,需要指定队列的大小SynchronousQueue
:同步队列(容量为0),要将一个任务放到该队列中,必须有一个线程等待接收这个任务,否则执行拒绝策略DelayQueue
:延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素
- 只有任务相互独立时,设置最大线程数、最大队列长度才是合理的,否则可能会出现死锁
- 如果任务之间存在依赖,应该使用无界的线程池
有关拒绝处理策略
ThreadPoolExecutor.AbortPolicy
:默认策略,丢弃任务并抛出 RejectedExecutionException
异常ThreadPoolExecutor.DiscardPolicy
:丢弃新来的任务,但是不抛出异常ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列头部(也就是最旧的,如果是优先队列则丢弃优先级最高的)任务,然后重新尝试执行程序,如果再次失败,重复此过程ThreadPoolExecutor.CallerRunsPolicy
:由主线程处理该任务,此时主线程暂停向队列中提交新任务
ThreadPoolExecutor.CallerRunsPolicy
在执行时,主线程处于忙碌状态,新到达的请求会保存在 TCP 队列中- 如果持续下去可能造成 TCP 队列丢弃请求,这种现象最终会蔓延到客户端,导致性能降低
状态 | 说明 |
---|---|
RUNNING | 线程池创建后处于 RUNNING 状态 |
SHUTDOWN | 调用 shutdown() 方法后处于 SHUTDOWN 状态,线程池不能接受新的任务,正在执行的线程不中断,完成阻塞队列的任务 |
STOP | 调用 shutdownNow() 方法后处于 STOP 状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃,工作线程全部停止,阻塞队列为空 |
TIDYING | 当所有的任务已终止,线程池会变为 TIDYING 状态,接着会执行 terminated() 函数 |
TERMINATED | 线程池处在 TIDYING 状态时,并且执行完 terminated() 方法之后 , 线程池被设置为 TERMINATED 状态 |
class ThreadPoolExecutorExtension extends ThreadPoolExecutor {
public ThreadPoolExecutorExtension(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
// 任务执行前...
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 任务执行后...
}
@Override
protected void terminated() {
// 线程池关闭...
}
}
corePoolSize
,无论当前已经创建的线程是否空闲,都会新建一个核心线程执行任务(在核心线程数量 < corePoolSize
时,让核心线程数量快速达到 corePoolSize
)corePoolSize
时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(线程复用)。在加入队列前后,都进行线程池状态是否是 RUNNING
的检查maximumPoolSize
,则会采取拒绝策略进行处理为什么在步骤2中,要二次检查线程池的状态?
- 在多线程的环境下,线程池的状态是时刻发生变化的
- 有可能刚获取线程池状态后线程池状态就改变了。判断是否将 command 加入workqueue 是线程池之前的状态
- 倘若没有二次检查,万一线程池处于非 RUNNING 状态(在多线程环境下很有可能发生),那么 command 永远不会执行
- 类似于单例模式的双重校验
参数 | 含义 |
---|---|
parallelism | 线程池最大线程数量和该参数值有关系,但不是绝对的关联(有依据但并不全由它决定) |
factory | 线程创建工厂 |
handler | 线程因未知异常而终止的回调处理,执行的任务中出现异常,并从任务中被抛出时,会被 handler 捕获 |
asyncMode | 当设置为 ture 的时候,队列采用先进先出方式;反之则是采用后进先出的方式,默认false(参考上图) |
RecursiveAction(类比Runnable)
、RecursiveTask<V>(类比Callable<V>)
class ForkJoinPoolDemo {
private static final ForkJoinPool forkJoinPool = new ForkJoinPool();
public static void addTask() {
// 无返回值的任务 类比Runnable
RecursiveAction recursiveAction = new RecursiveAction() {
@SneakyThrows
@Override
protected void compute() {
Thread.sleep(5000);
System.out.println("recursiveAction" + Thread.currentThread());
}
};
// 有返回值的任务 类比Callable
RecursiveTask<Integer> recursiveTask = new RecursiveTask<Integer>() {
@SneakyThrows
@Override
protected Integer compute() {
Thread.sleep(5000);
System.out.println("recursiveTask" + Thread.currentThread());
return 1;
}
};
// execute: 异步,无返回值
forkJoinPool.execute(recursiveTask);
forkJoinPool.execute(recursiveAction);
// submit: 异步,返回Future,需要调用get同步结果
ForkJoinTask<Integer> taskSubmit = forkJoinPool.submit(recursiveTask);
ForkJoinTask<Void> actionSubmit = forkJoinPool.submit(recursiveAction);
try {
Integer res = taskSubmit.get();
actionSubmit.get(); // Void
} catch (InterruptedException | ExecutionException e) {
// ...
}
// invoke: 阻塞并直接返回结果
Integer taskInvoke = forkJoinPool.invoke(recursiveTask);
forkJoinPool.invoke(recursiveAction);
}
}
Executor
提交一组任务,并在计算完成后获得结果(例如每下载一张图片就执行一次渲染,而非下载完所有图片再渲染)
ThreadPoolExecutor.submit(task)
并对返回值轮询调用 future.get()
,效率低ThreadPoolExecutor.invoke(task)
会阻塞直到返回结果,同样效率低CompletionService.take()
可以在部分任务完成后,按完成的先后顺序获取任务 future
ExecutorCompletionService
是 CompletionService
的实现,将计算任务委托给线程池执行class CompletionServiceDemo {
/**
* 线程池
*/
private final Executor executor = Executors.newFixedThreadPool(4);
public void test() {
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
Future<Integer> future5000 = completionService.submit(new CustomTask(5000));
Future<Integer> future2000 = completionService.submit(new CustomTask(2000));
Future<Integer> future8000 = completionService.submit(new CustomTask(8000));
for (int i = 0; i < 3; i++) {
Future<Integer> future = null;
try {
future = completionService.take(); // 阻塞直到有任务完成,按照任务完成的先后顺序获得结果
System.out.println(future.get()); // 输出结果 2000, 5000, 8000
} catch (InterruptedException | ExecutionException e) {
// ...
}
}
}
static class CustomTask implements Callable<Integer> {
private final int i;
public CustomTask(int i) {
this.i = i;
}
@Override
public Integer call() throws Exception {
Thread.sleep(i);
return i;
}
}
}
run()
方法中主动捕获异常Thread.UncaughtExceptionHandler
,被动处理
Thread.UncaughtExceptionHandler
仅在使用execute(task)
向线程池提交任务时才生效
submit(task)
会在调用其返回值的get()
方法时将异常包装为ExecutionException
后重新抛出
/**
* 自定义线程工厂
**/
class MyThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public MyThreadFactory(String namePrefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0) {
@Override
public void run() {
// 1.捕获未受检异常,例如RuntimeException(主动)
// 不被捕获的未受检异常将使线程终结
Throwable throwable = null;
try {
super.run();
} catch (Throwable t) {
throwable = t;
} finally {
// 处理策略,可以新建线程代替当前线程,也可以在活跃线程数足够多的时候不新建线程
}
}
};
// 2.处理未捕捉的异常(被动)
// 仅在通过execute方法提交任务时才会用该方式处理异常
// 使用submit提交的任务会在其返回值调用future.get()时将异常包装为ExecutionException后重新抛出
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("do something to handle uncaughtException");
}
});
return t;
}
}
Future
可以把计算结果从执行计算的线程传递到获取计算结果的线程,也可以在任务执行中取消future.get()
时,行为取决于任务的状态
Callable
对象内部的异常会被包装成 ExecutionException
并抛出e.getCause()
获得包装前的初始异常CancellationException
Future
调用 cancel()
不会产生影响RunnableFuture
同时拓展了 Runnable
(作为执行的任务)和 Future
(作为任务执行的结果)public class Solution {
public static void main(String[] args) {
// 创建Callable对象
Callable<Integer> callable = () -> {
if (System.currentTimeMillis() % 2 == 0) {
throw new MyException(); // 自定义异常
} else {
Thread.sleep(2000);
return 1;
}
};
// 创建FutureTask对象
FutureTask<Integer> futureTask = new FutureTask<>(callable);
// 将FutureTask对象交付线程并执行
Thread thread = new Thread(futureTask);
thread.start();
try {
futureTask.get(10, TimeUnit.SECONDS);
// 保持阻塞直到 1.执行完成 2.抛出异常(包含中断、超时、Callable内部checked异常、RuntimeException、Error)
} catch (TimeoutException e) {
// 超时...
} catch (InterruptedException e) {
// 中断...
} catch (ExecutionException e) {
// Callable内部异常,被包装后重新抛出
if (e.getCause() instanceof MyException) {
System.out.println("Callable Inner Exception, thrown by futureTask.get()");
}
} finally {
// 1.如果任务已经结束,执行cancel不会产生任何影响
// 2.如果任务正在运行,将会被中断
futureTask.cancel(true);
}
}
}
class ComputeCache<K, V> {
/**
* 因为计算过程较长,Value使用Future
*/
private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<>();
private V compute(K args) {
while (true) {
Future<V> result = cache.get(args);
if (result == null) {
// 缓存中无该值,执行计算
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
// take a long time
// Thread.sleep(100000);
return null;
}
};
FutureTask<V> futureTask = new FutureTask<>(callable);
result = cache.putIfAbsent(args, futureTask); // 保证查询-放入操作是原子的,避免了put可能被多个线程同时执行
if (result == null) { // 加入成功,开始执行计算
result = futureTask;
futureTask.run();
}
}
try {
// 阻塞直到拿到计算结果
return result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
await()
解除阻塞的情况:
/** 使用示例:等待线程池的任务全部执行完成 **/
public final void run(List<RuleDO> rules) {
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
for (ScanTask task : taskList) {
// 向线程池加入任务
threadPoolExecutor.execute(() -> {
doScan(task, rules);
// 计数值-1
countDownLatch.countDown();
});
}
try {
// 在此阻塞,直到计数值为0,或发生中断,或超时
countDownLatch.await();
} catch (InterruptedException e) {
// ...
}
}
CountDownLatch
不同的是,参与的线程角色相同,且可以重复使用Runnable
类型,所有线程到达后执行的动作,由最后一个到达的线程执行(可选) static void CyclicBarrierTest() {
int subThreadNum = 5;
// 所有子线程到达后执行的行为,由最后一个到达的子线程完成
Runnable commonAction = new Runnable() {
@Override
public void run() {
System.out.println("all sub threads terminated, print by thread " + Thread.currentThread().getName());
}
};
// 构造CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(subThreadNum, commonAction);
for (int i = 0; i < subThreadNum; i++) {
new Thread(() -> {
System.out.println("round1: sub thread " + Thread.currentThread().getName() + " terminated");
try {
cyclicBarrier.await(); // 线程宣布到达
} catch (InterruptedException | BrokenBarrierException e) {
// ...
}
System.out.println("round2: sub thread " + Thread.currentThread().getName() + " terminated");
try {
cyclicBarrier.await(); // 线程宣布到达
} catch (InterruptedException | BrokenBarrierException e) {
// ...
}
}).start();
}
}
执行结果:
round1: sub thread Thread-1 terminated
round1: sub thread Thread-0 terminated
round1: sub thread Thread-3 terminated
round1: sub thread Thread-2 terminated
round1: sub thread Thread-4 terminated
all sub threads terminated, print by thread Thread-4
round2: sub thread Thread-4 terminated
round2: sub thread Thread-3 terminated
round2: sub thread Thread-0 terminated
round2: sub thread Thread-1 terminated
round2: sub thread Thread-2 terminated
all sub threads terminated, print by thread Thread-2
CountDownLatch
用于仅减的场景)permits=1
时,和一般的锁仍有区别:
Semaphore
可以由任意线程释放acquire()
获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态
?
acquire(int permits)
获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态
acquireUninterruptibly()
获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)
tryAcquire()
尝试获得令牌,返回获取令牌成功或失败,不阻塞线程
?
tryAcquire(long timeout, TimeUnit unit)
尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程
?
release()
释放一个令牌,唤醒一个获取令牌不成功的阻塞线程
?
hasQueuedThreads()
等待队列里是否还存在等待线程
?
getQueueLength()
获取等待队列里阻塞的线程数
?
drainPermits()
清空令牌把可用令牌数置为0,返回清空令牌的数量
?
availablePermits()
返回可用的令牌数量。
class LimitTaskCommitWithSemaphoreDemo {
private ExecutorService executorService; // 线程池拒绝策略 AbortPolicy
private final Semaphore semaphore = new Semaphore(100); // 信号量上界 = 最大线程数 + 队列最大长度
public void addTask(Runnable task) throws InterruptedException {
semaphore.acquire(); // 信号量-1
try {
executorService.execute(() -> {
task.run();
semaphore.release(); // 执行完毕,信号量+1
});
} catch (RejectedExecutionException e) {
semaphore.release(); // 添加失败,信号量+1
}
}
}
ThreadLocal
类主要解决的就是让每个线程绑定自己的值,这个值不能被其它线程访问到JDBCConnection
,每个线程不会访问到其它线程的数据库连接Session
会话public class RequestContext {
public static class Request {...}
private static ThreadLocal<String> userId = new ThreadLocal<>();
private static ThreadLocal<Request> req = new ThreadLocal<>();
// getter&setter...
}
ThreadLocalMap
ThreadLocalMap
,而 ThreadLocalMap
可以存储以 ThreadLocal
为 key (ThreadLocal
的弱引用),Object 对象为 value 的键值对ThrealLocal
类可以通过 Thread.currentThread()
获取到当前线程对象后,直接通过 getMap(Thread t)
可以访问到该线程的 ThreadLocalMap
对象ThreadLocalMap
不使用拉链法解决哈希冲突,而是向后探测:如果先遇到了空位置则直接插入;如果先遇到了 key 过期的数据则进行垃圾回收并替换interrupt()
仅是一种协作机制,不会强制终止线程Thread.interrupted()
InterruptedException
,例如对 WAITING/TIMED_WAITING
状态的线程调用 interrupt()
public class Thread {
/**
* 由*本线程对象*之外的线程调用,作用为:
* 1.线程处于状态WAITING/TIMED_WAITING,则抛出InterruptedException
* 2.线程处于状态RUNNABLE/BLOCKED,将中断标志位设置为true(仅告知"需要中断"而不采取其它措施)
* 3.如果本线程对象处于状态NEW/TERMINATE,则无动作(不修改标志位)
**/
public void interrupt() {...}
/**
* 查看*本线程对象*的中断标志位
* 如果线程执行完成则返回false,即使之前对它调用了 interrupt()
**/
public boolean isInterrupted() {...}
/**
* 静态方法,查看*调用该方法的线程*的中断标志位,并清除状态
* 调用形式仅可以为 Thread.interrupted(),同时会将中断状态更新为false
**/
public static boolean interrupted() {...}
}
class ThreadInterruptDemo {
protected void test() throws Exception {
Thread.currentThread().interrupt(); // 将主线程的中断标志设为true
System.out.println("MainStatus after thread.interrupt(): " + Thread.currentThread().isInterrupted());
boolean mainThreadInterruptStatus = Thread.interrupted(); // 查看当前线程(主线程)的中断状态,并将其重置为false
System.out.println("MainStatus return by Thread.interrupted(): " + mainThreadInterruptStatus);
System.out.println("MainStatus after Thread.interrupted(): " + Thread.currentThread().isInterrupted());
}
}
返回结果:
MainStatus after thread.interrupt(): true
MainStatus return by Thread.interrupted(): true
MainStatus after Thread.interrupted(): false
class ThreadInterruptDemo {
protected void test() throws Exception {
Thread subThread = new Thread(() -> {
System.out.println("Start: " + Thread.currentThread().isInterrupted());
try {
System.out.println("Before sleep: " + Thread.currentThread().isInterrupted());
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println("InterruptedException: " + Thread.currentThread().isInterrupted());
// 捕获到InterruptedException后,会将中断状态重置为false,如果需要维持还需要重设为true
Thread.currentThread().interrupt();
}
System.out.println("End: " + Thread.currentThread().isInterrupted());
});
subThread.start();
subThread.interrupt(); // 如果子线程处于阻塞状态,则抛出InterruptedException;否则仅将子线程的中断状态设置为true
Thread.sleep(10000); // 等待子线程执行完成
System.out.println("SubThread end status: " + subThread.isInterrupted());
}
}
返回结果1:
Start: false
Before sleep: false
---此时调用subThread.interrupt()---
InterruptedException: false // 发生了InterruptedException,中断状态被重置为false
End: true // 在catch块里又将中断状态设置为true
---子线程执行完成---
SubThread end status: false
返回结果2:
---此时调用subThread.interrupt()---
Start: true
Before sleep: true
InterruptedException: false // 证明了线程先中断状态true再sleep也会抛出异常!!!
End: true
---子线程执行完成---
SubThread end status: false
synchronized
和 volatile
保证的内存可见性