0%

线程池

线程池

线程池的优点

  • 避免线程的创建和销毁带来的性能开销
  • 避免大量的线程间因互相抢占系统资源导致的阻塞现象
  • 能够对线程进行简单的管理并提供定时执行、间隔执行等功能

java在java.util.concurrent包中提供了Executor接口,在该接口的基础上整合出来一个Executor框架,该框架包含了线程池,ExecutorExecutorsExecutorService,ThreadPoolExecutorCompletionServiceFutureCallable等,该框架分离了任务的创建和执行,通过使用执行器,仅需要实现Runable接口的对象,然后将这些对象发送给执行器即可,执行器通过创建所需要的线程,来负责这些Runnable对象的创建、实例化以及运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public interface Executor {
// 执行一个Runnable任务
void execute(Runnable command);
}

// 继承了Executor接口
public interface ExecutorService extends Executor {

// 关闭ExecutorService,停止接收新的任务,且等待已经提交的任务执行完,当所有任务都执行完之后将会关闭ExecutorService
void shutdown();


List<Runnable> shutdownNow();


boolean isShutdown();


boolean isTerminated();


boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

// Callable执行完毕后有返回结果
<T> Future<T> submit(Callable<T> task);

// Runnable
<T> Future<T> submit(Runnable task, T result);

// Runnable执行完毕后,没有返回结果
Future<?> submit(Runnable task);


}

之前在使用线程的时候,不管是继承Thread类还是实现Runnable或者Callable接口,都需要使用new Thread来创建线程,用完之后直接销毁,频繁的创建销毁线程比较浪费资源,于是出现了线程池的概念。

线程池中提供了一个线程队列,队列中保存着所有等待状态的线程,避免了创建和销毁的额外开销,提高了响应速度

ThreadPoolExecutor

线程池

以参数最全的构造函数举例

1
2
3
4
5
6
7
8
9
10
11

public ThreadPoolExecutor(int corePoolSize, // 核心线程数,决定新提交的任务是新开线程去执行还是放到任务队列中,当线程数量小于corePoolSize时才回去创建线程
int maximumPoolSize, // 最大线程数,线程池能创建的最大线程数,达到该数值后将不会创建新的线程,后续任务会进入阻塞队列排队
long keepAliveTime, // 线程超过corePoolSize后,多余线程的最大闲置时间,如果超过,则会终止,但是对于核心线程,如果allowCoreThreadTimeOut(boolean value)设置为true的话,核心线程也会被终止
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 存放未处理的任务的阻塞队列,常用的队列有1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
//2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE
//3)SynchronousQueue:这个队列比较特殊,它不会进行存储,而是将直接新建一个线程来执行新来的任务
//4)PriorityBlockingQuene:具有优先级的无界阻塞队列
ThreadFactory threadFactory, // 生产线程的工厂类,可以用来定义线程名称以及线程的优先级
RejectedExecutionHandler handler)//拒绝策略

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// 变量的高3位代表线程池的状态,那么后29位(从低位往高位数)代表该线程池数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池数量的位数 29
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 表示运行中的状态标识,此时可以接受任务执行队列中的任务 runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
// 表示关闭中的状态标识,调用了shutdown方法,此时不再接收新任务,但是队列里的任务还得执行
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 表示已停止的状态标识,调用了shutdownNow方法不再接受新任务,同时抛弃阻塞队列中的所有任务并中断正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
// 表示当前所有任务已经终止,任务数量为0时的状态标识,在调用shutdown方法和shutdownNow方法时都会尝试更新这个状态
private static final int TIDYING = 2 << COUNT_BITS;
// 表示线程池已经完全终止(关闭),调用了terminated方法会更新为该状态
private static final int TERMINATED = 3 << COUNT_BITS;
// 用来保存等待任务执行的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
// 包含了所有在工作的线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 锁条件队列,主要用于 awaitTermination
private final Condition termination = mainLock.newCondition();
// 记录线程池最大工作线程的数量
private int largestPoolSize;
// 完成任务的计时器,仅在中止工作任务时更新
private long completedTaskCount;
// 用于创建线程的工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲存活时间,如果线程池中的线程数量比核心线程数量还要多时,并且多出的这些线程都是闲置状态,该变量则是这些闲置状态的线程的存活时间
private volatile long keepAliveTime;
// 默认为 false,如果设为 true那么核心线程也会遵循 keepAliveTime的时间来做闲置处理
private volatile boolean allowCoreThreadTimeOut;
// 线程池核心线程数量
private volatile int corePoolSize;
// 线程池最大线程数量
private volatile int maximumPoolSize;

public void execute(Runnable command) {
// command为空,则抛出空指针
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 判断线程池是否在运行
// 如果在,任务队列是否允许插入
if (isRunning(c) && workQueue.offer(command)) {
// 双重检查,插入成功再次验证线程池是否运行,
int recheck = ctl.get();
//再次检查如果不在运行,移除插入的任务,然后抛出拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果在运行,没有线程了,就启用一个非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 加入队列失败,尝试新增非核心线程失败,直接拒绝
else if (!addWorker(command, false))
reject(command);
}


// 添加线程 core表示是否为核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { // 自旋
int c = ctl.get();
// 当前线程池运行状态
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 判断线程池状态,如果线程池的状态值大于或等于SHUTDOWN,则不处理提交任务,直接返回
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 自旋,更新线程数量
for (;;) {
// 当前线程数
int wc = workerCountOf(c);
// 根据core来判断是否为核心线程
// 当前线程数已经超过容量,就直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加当前线程数量,更改成功结束自旋
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 判断当前线程池状态是否还是运行中,如果不是则跳过第一层自旋的第一次自旋开始第二次
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 获取线程池主锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取当前线程池运行状态
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断线程是否是 alive状态
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加线程到workers中,workers是一个HashSet private final HashSet<Worker> workers = new HashSet<Worker>();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 设置当前工作者加入线程队列的已添加的标识为 true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动新建的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Executors

在一般情况下使用线程池都是直接使用Executors类中提供的静态方法来直接创建线程池,这是jdk提供创建线程池的工具类,但是要用好这个工具类,一定要清楚每个方法所生成的线程池的特性以及线程池中每个参数的含义。

几种常用的创建线程池的方法

newFixedThreadPool

创建固定大小的线程池,每提交一个任务就创建一个线程,直到达到最大线程数,核心线程数与最大线程数相同,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程

1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
// LinkedBlockingQueue是一个无界队列,可以有无限多的任务进行排队,所以创建的线程数不会超过corePoolSize,因此maximumPoolSize的值也就是无效的
new LinkedBlockingQueue<Runnable>());
}

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 创建固定大小的线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
for(int i = 0;i<5;i++){
int finalI = i;

pool.execute(() ->
{
if(finalI == 3){
throw new RuntimeException("出错了");
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + ":"+ finalI);
});
}
pool.shutdown();

-------------------------
pool-1-thread-2:1
pool-1-thread-1:0
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 出错了
at com.zhanghe.study.thread.pool.TestExecutors.lambda$testFixedThread$1(TestExecutors.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2:2
pool-1-thread-3:4

每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,而其他线程有没有空闲,那么线程池会补充一个新线程

newSingleThreadExecutor

创建单个线程的线程池,只有唯一的工作线程,所有的任务都是串行执行的,如果这个唯一的线程因为异常结束,会有新的线程来代替它,此线程保证所有任务的执行顺序按照任务的提交顺序执行

相当于newFixedThreadPool(1)

1
2
3
4
5
6
7
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
// LinkedBlockingQueue是一个无界队列,可以有无限多的任务进行排队,所以创建的线程数不会超过corePoolSize,因此maximumPoolSize的值也就是无效的
new LinkedBlockingQueue<Runnable>()));
}

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 该线程池中只有一个线程 一直在复用这个线程 
ExecutorService pool = Executors.newSingleThreadExecutor();
for(int i = 0;i<5;i++){
int finalI = i;

pool.execute(() ->
{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
// 该线程出现异常,之后线程池会创建一个新的线程来执行其他任务
if(finalI == 3){
throw new RuntimeException("出错了");
}
System.out.println(Thread.currentThread().getName() + ":"+ finalI);
});
}
pool.shutdown();

--------------------
pool-1-thread-1:0
pool-1-thread-1:1
pool-1-thread-1:2
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 出错了
at com.zhanghe.study.thread.pool.TestExecutors.lambda$testSingleThread$0(TestExecutors.java:30)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2:4

这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行

newCachedThreadPool

缓存线程池,可以根据需求自动创建新线程,没有对线程池中线程的数量进行限制,最大可以为Integer.MAX_VALUE,如果有可用的线程就使用,如果没有的话,会自动创建新的线程,默认空闲60s移除

常用于执行一些生存期很短的任务,默认会移除60s内未被使用的线程,因此长时间保持空闲的线程池不会使用任何资源

1
2
3
4
5
6
7
public static ExecutorService newCachedThreadPool() {
// 默认空闲60s移除
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
// SynchronousQueue队列不进行存储,直接将任务交给线程处理,所以在没有空闲的线程时,会直接创建新的线程来执行任务
new SynchronousQueue<Runnable>());
}

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ExecutorService pool = Executors.newCachedThreadPool();
for(int i = 0;i<5;i++){
int finalI = i;

pool.execute(() ->
{
if(finalI == 3){
throw new RuntimeException("出错了");
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + ":"+ finalI);
});
}
pool.shutdown();

--------------------
Exception in thread "pool-1-thread-4" java.lang.RuntimeException: 出错了
at com.zhanghe.study.thread.pool.TestExecutors.lambda$testCachedThread$2(TestExecutors.java:75)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-1:0
pool-1-thread-5:4
pool-1-thread-3:2
pool-1-thread-2:1

要执行多少任务就创建多少线程,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添加新线程来处理任务

newScheduledThreadPool

调度型线程池,创建大小无限的的线程池,可以延迟或定时以及周期性的执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// ScheduledThreadPoolExecutor是继承自ThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
for(int i = 0;i<5;i++){
int finalI = i;

pool.schedule(()->
{
if(finalI == 3){
throw new RuntimeException("出错了");
}

System.out.println(Thread.currentThread().getName() + ":"+ finalI);

},20, TimeUnit.SECONDS);
}
pool.shutdown();
1
2
3
// command是任务,delay指延迟的时间,unit为时间单位
// 该方法只执行一次,只是会延迟一段时间执行
schedule(Runnable command,long delay, TimeUnit unit)
1
2
// 该方法是每隔period时间执行一次,不管任务执行多长时间
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
1
2
// 在每一次执行终止和下一次执行开始之间都存在给定的延迟
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

newWorkStealingPool (工作窃取线程池)

大型的任务被分解成若干块放入到队列中,在队列中还可以继续细分,线程从队列中获取任务并进行执行,当所有的线程计算结束之后,再将各部分的结果进行合并来得到最终结果。

在处理器之间分配工作项,从而最大限度地利用所有可用的处理器来完成计算密集型任务,这项算法也用于 Java 的fork/join 框架

1
ExecutorService executorService = Executors.newWorkStealingPool();

线程池的提交和关闭

ExecutorService接口扩展了Executor接口并提供了提交任务和关闭线程池的方法

1
2
3
4
5
6
7
8
9
10
//提交  为线程池中的线程分配任务
// future对象 isDone()方法来检查任务是否已经完成
// get()方法获取返回的结果,该方法会一直等待直到Callable对象的call()方法执行完成并返回结果,如果get()方法在等待结果时线程中断了,则会抛出InterruptedException异常,如果call()方法抛出异常那么get()方法将随之抛出ExecutionException异常
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

void execute(Runnable command);
1
2
3
4
// 优雅的停止,停止接收新任务,等线程池任务执行完毕停止
void shutdown();
// 立即停止,并尝试打断正在执行的任务,并且清空阻塞队列,返回为执行的任务
List<Runnable> shutdownNow();

submit()方法和execute()方法都可以提交任务执行,这两者有什么不同呢

  • submit()方法有返回值Future,而execute()方法是没有返回值的

但是对于ScheduledExecutorService延时调度,使用以下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

线程池拒绝策略

线程池是有一定的容量限制的,不可能来多少任务接多少任务,那么如何拒绝任务呢?就用到了线程池的拒绝策略RejectedExecutionHandler,在jdk中提供了四种拒绝策略,分别是AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy,主要是执行rejectedExecution方法,下面分别介绍这四种拒绝策略

AbortPolicy

线程池队列满了,直接抛出异常RejectedExecutionException,默认策略

1
2
3
4
5
6
7
8
9
10
11
public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

CallerRunsPolicy

只要线程池没有关闭,直接在调用者线程(主线程)中执行该任务。不会等待线程池中的线程去执行

1
2
3
4
5
6
7
8
9
10
11
public static class CallerRunsPolicy implements RejectedExecutionHandler {

public CallerRunsPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

DiscardPolicy

什么事情都不干,直接丢弃当前将要加入队列的任务本身

1
2
3
4
5
6
7
8
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

DiscardOldestPolicy

丢弃掉最老的一个请求,然后再次提交当前任务

1
2
3
4
5
6
7
8
9
10
11
12
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

自定义拒绝策略

实现RejectedExecutionHandler接口,来重写rejectedExecution()方法即可

线程池工作步骤

  • 任务提交,如果线程池中正在运行的线程数量小于corePoolSize核心线程数,则创建新的线程执行任务
  • 如果正在运行的线程数量大于或等于corePoolSize核心线程数,则加入到workQueue等待队列
  • 如果workQueue等待队列已满,且正在运行的线程数量小于maximumPoolSize最大线程数,则创建线程执行任务
  • 如果workQueue队列已满,且正在运行的线程数量大于或等于maximumPoolSize最大线程数,则执行handler拒绝策略
  • 当线程完成任务之后,会从队列中取出下一个任务继续执行
  • 当线程没有任务可做时,超过keepAliveTime存活时间,如果当前运行的线程数大于corePoolSize核心线程数,则该线程停止,直到线程数达到corePoolSize核心线程数

ForkJoinPool(分支合并)

ForkJoinPool用来通过分治技术将问题拆分成小任务的问题,在一个任务中,先检查将要执行的任务的大小,如果大于一个设定的大小,则将任务拆分成可以通过框架来执行的小任务,如果任务的大小比设定的要小,就可以直接执行,然后根据需要返回的任务的结果,底层采用了工作窃取模式

  • Fork操作:当需要将一个任务拆分成更小的多个任务时,在框架中执行这些任务
  • Join操作:当一个主任务等待其创建的多个子任务的完成执行

核心类有两个

  • ForkJoinPool 实现了ExecutorService接口和工作窃取算法,管理工作者线程,并提供任务的状态信息,以及任务的执行信息
  • ForkJoinTask 是在ForkJoinPool中执行的任务的父类,其有两个子类,RecursiveAction用于任务没有返回结果的场景;RecursiveTask用于任务有返回结果的场景