0%

线程池

线程池

Java线程池中将线程抽象为Worker,并存储在workers变量中进行维护,将需要执行的任务存储在workQueue中。其大致思想就是从workQueue中不断地取出需要执行的任务,放到workers中进行处理

线程池的优点

如果不用类似线程池的容器,每当我们需要异步执行用户任务的时候都去创建新的线程,任务执行完之后将线程销毁,这样频繁地创建和销毁线程会浪费大量的系统资源,线程池通过线程复用机制,并对线程进行统一管理

  • 避免线程的创建和销毁带来的性能开销
  • 避免大量的线程间因互相抢占系统资源导致的阻塞现象
  • 能够对线程进行简单的管理并提供定时执行、间隔执行等功能

java在java.util.concurrent包中提供了Executor接口,在该接口的基础上整合出来一个Executor框架,该框架包含了线程池,ExecutorExecutorsExecutorServiceThreadPoolExecutorCompletionServiceFutureCallable等,该框架分离了任务的创建和执行,通过使用执行器,仅需要构建实现Runable接口的对象,然后将这些对象交给执行器即可,执行器通过创建所需要的线程,来负责这些Runnable对象任务执行

线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public interface Executor {
// 执行一个Runnable任务
void execute(Runnable command);
}

// 继承了Executor接口
public interface ExecutorService extends Executor {

// 关闭ExecutorService,停止接收新的任务,且等待已经提交的任务执行完,当所有任务都执行完之后将会关闭ExecutorService
void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

// Callable执行完毕后有返回结果
<T> Future<T> submit(Callable<T> task);

// Runnable
<T> Future<T> submit(Runnable task, T result);

// Runnable执行完毕后,没有返回结果
Future<?> submit(Runnable task);
}

之前在使用线程的时候,不管是继承Thread类还是实现Runnable或者Callable接口,都需要使用new Thread来创建线程,用完之后直接销毁,频繁的创建销毁线程比较浪费资源,于是出现了线程池的概念。

线程池中提供了一个线程队列,队列中保存着所有等待状态的线程,避免了创建和销毁的额外开销,提高了响应速度

ThreadPoolExecutor

构造方法

以参数最全的构造函数举例

1
2
3
4
5
6
7
8
9
10
11
12

public ThreadPoolExecutor(int corePoolSize, // 核心线程数,决定新提交的任务是新开线程去执行还是放到任务队列中,当线程数量小于corePoolSize时才会去创建线程,如果大于corePoolSize会将任务放入workQueue队列中
int maximumPoolSize, // 最大线程数,线程池能创建的最大线程数,达到该数值后将不会创建新的线程,再添加任务则采用拒绝策略
long keepAliveTime, // 线程超过corePoolSize后,多余线程的最大闲置时间,如果超过,则会终止,但是对于核心线程,如果allowCoreThreadTimeOut(boolean value)设置为true的话,核心线程也会被终止
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 线程数如果大于corePoolSize会将任务放入workQueue队列中,存放未处理的任务的阻塞队列,常用的队列有
//1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
//2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE
//3)SynchronousQueue:这个队列比较特殊,它不会进行存储,而是将直接新建一个线程来执行新来的任务
//4)PriorityBlockingQuene:具有优先级的无界阻塞队列
ThreadFactory threadFactory, // 生产线程的工厂类,可以用来定义线程名称以及线程的优先级
RejectedExecutionHandler handler)//拒绝策略

在ThreadPoolExecutor中有一个方法可以控制核心线程是否回收,如果设置allowCoreThreadTimeOut为false,则核心线程一直存活;如果为true,则空闲时间超过keepAliveTime时会回收

1
2
// 允许核心线程回收
((ThreadPoolExecutor) (exec)).allowCoreThreadTimeOut(true);

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// 变量的高3位代表线程池的状态,后29位(从低位往高位数)代表该线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池数量的位数 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程最大个数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize()
// (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
// SHUTDOWN -> TIDYING When both queue and pool are empty
// STOP -> TIDYING When pool is empty
// TIDYING -> TERMINATED When the terminated() hook method has completed

// 线程池状态 高3位
// 高三位111,表示运行中的状态标识,此时可以接受新任务并且执行队列中的任务 runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
// 高三位000,表示关闭中的状态标识,调用了shutdown方法,此时不再接收新任务,但是队列里的任务还得执行
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高三位001,表示已停止的状态标识,调用了shutdownNow方法不再接受新任务,同时抛弃阻塞队列中的所有任务并中断正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
// 高三位010,表示当前所有任务已经终止,任务数量为0时的状态标识,在调用shutdown方法和shutdownNow方法时都会尝试更新这个状态
private static final int TIDYING = 2 << COUNT_BITS;
// 高三位011,表示线程池已经完全终止(关闭),调用了terminated方法会更新为该状态
private static final int TERMINATED = 3 << COUNT_BITS;

// 获取运行状态 高3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程数量 低29位
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl值 线程状态和线程个数 或操作
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 用来保存等待任务执行的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁,对线程池状态等属性进行修改时需要持有该锁
private final ReentrantLock mainLock = new ReentrantLock();
// 包含了所有在工作的线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 锁条件队列,主要用于 awaitTermination 终止条件
private final Condition termination = mainLock.newCondition();
// 记录线程池最大工作线程的数量
private int largestPoolSize;
// 完成任务的数量,仅在中止工作任务时更新
private long completedTaskCount;
// 用于创建线程的工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲存活时间,如果线程池中的线程数量比核心线程数量还要多时,并且多出的这些线程都是闲置状态,该变量则是这些闲置状态的线程的存活时间
private volatile long keepAliveTime;
// 默认为 false,如果设为 true那么核心线程也会遵循 keepAliveTime的时间来做闲置处理
private volatile boolean allowCoreThreadTimeOut;
// 线程池核心线程数量
private volatile int corePoolSize;
// 线程池最大线程数量
private volatile int maximumPoolSize;

// submit是将任务进行一次封装,然后执行execute方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

public void execute(Runnable command) {
// command为空,则抛出空指针
if (command == null)
throw new NullPointerException();
// 获取线程池状态 和 线程数量的组合值
int c = ctl.get();
// workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务
// 判断是否创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 创建核心线程
// 成功则直接返回
return;
// 失败需要再次获取线程池的状态
c = ctl.get();
}
// 经过上面的if语句,走到这里可能会因为两种情况1:当前工作线程数大于等于核心线程数 2:创建线程失败
// 判断线程池是否在运行
// 如果是RUNNING状态,则添加任务到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 双重检查,再次获取线程池状态,插入成功再次验证线程池是否运行,防止任务入队的过程中ctl的值发生变化
int recheck = ctl.get();
//再次检查如果不在运行,移除已入队的任务,然后抛出拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果在运行,当前线程数为0,就启用一个非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 加入阻塞队列失败(队列已满),尝试创建非核心线程
//如果创建非核心线程失败,直接拒绝
else if (!addWorker(command, false))
reject(command);
}


// 添加线程 core表示是否为核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { // 自旋
int c = ctl.get();
// 当前线程池运行状态
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 判断线程池状态,如果线程池的状态值大于或等于SHUTDOWN,则不处理提交任务,直接返回
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 自旋,更新线程数量
for (;;) {
// 当前线程数
int wc = workerCountOf(c);
// 根据core来判断是否为核心线程
// 当前线程数已经超过容量,就直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加当前线程数量,更改成功结束自旋
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS操作失败了,获取最新值
c = ctl.get(); // Re-read ctl
// 检查线程池状态是不是修改了,线程池状态修改则需要重新获取线程池
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// CAS操作成功,创建线程的所有条件都满足了,可以开始创建线程来执行任务
// worker是否启动
boolean workerStarted = false;
// 是否将worker添加到workers集合中
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程
w = new Worker(firstTask);
// 取出worker中的线程对象
final Thread t = w.thread;
if (t != null) {
// 获取线程池主锁,为了保证workers同步
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取当前线程池运行状态
int rs = runStateOf(ctl.get());

//小于SHUTTDOWN即RUNNING
//等于SHUTDOWN并且firstTask为null,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断线程是否是 alive状态
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加线程到workers中,workers是一个HashSet private final HashSet<Worker> workers = new HashSet<Worker>();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 设置当前工作者加入线程队列的已添加的标识为 true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 线程添加线程池成功,启动新建的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}


Worker

Worker实现了Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

public void run() {
runWorker(this);
}
// worker的执行方法
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取worker的firstTask
Runnable task = w.firstTask;
w.firstTask = null;
// 释放锁,设置aqs的state为0,允许中断
w.unlock(); // allow interrupts
// 用于标识线程是否异常终止
boolean completedAbruptly = true;
try {
// 循环getTask获取任务
while (task != null || (task = getTask()) != null) {
// 获取到可执行的任务,对worker对象加锁,保证线程在执行任务过程中不会被中断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || // 线程池状态大于等于STOP
(Thread.interrupted() && // 线程被中断 且 是线程池内部的状态变化中断的
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) // 当前线程未被中断
wt.interrupt(); // 中断
try {
// 线程执行前
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 线程执行后
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成数加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

// 获取任务
private Runnable getTask() {
// 标识是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
// 线程池控制状态
int c = ctl.get();
// 线程池运行状态
int rs = runStateOf(c);

// 如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递减,方法返回null,回收线程
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果worker数量大于maximumPoolSize或者超时了
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) //线程池工作线程数量递减,方法返回null,回收线程
return null;
continue;//线程池工作线程数量递减失败,跳过剩余部分,继续循环
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

其他方法

1
2
3
4
5
6
7
8
// 取得第一个完成任务的结果值,当第一个任务执行完成后,会调用interrupt()将其他任务中断,需要结合if(Thread.currentThread().isInterrupted())来判断
// 会阻塞
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException
// 等全部线程任务执行完毕后,取得所有完成任务的结果值
// 会阻塞
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException

Executors

在一般情况下使用线程池都是直接使用Executors类中提供的静态方法来直接创建线程池,这是jdk提供创建线程池的工具类,要用好这个工具类,一定要清楚每个方法所生成的线程池的特性以及线程池中每个参数的含义。

几种常用的创建线程池的方法

newFixedThreadPool固定大小线程池

创建固定大小的线程池,每提交一个任务就创建一个线程,直到达到最大线程数,核心线程数与最大线程数相同,都为nThreads,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程,阻塞队列长度为Integer.MAX_VALUE

1
2
3
4
5
6
7
// corePoolSize和maximumPoolSize相等
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
// LinkedBlockingQueue是一个无界队列,可以有无限多的任务进行排队,所以创建的线程数不会超过corePoolSize,因此maximumPoolSize的值也就是无效的
new LinkedBlockingQueue<Runnable>());
}
特点
  • 核心线程数和最大线程数大小一样
  • 没有所谓的空闲时间,keepAliveTime为0
  • 阻塞队列为LinkedBlockingQueue
工作机制
  • 提交任务
  • 如果线程数少于核心线程,创建核心线程执行任务
  • 如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列中
  • 如果线程执行完任务,则去阻塞队列中取任务,继续执行
使用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 创建固定大小的线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
for(int i = 0;i<5;i++){
int finalI = i;

pool.execute(() ->
{
if(finalI == 3){
throw new RuntimeException("出错了");
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + ":"+ finalI);
});
}
pool.shutdown();

-------------------------
pool-1-thread-2:1
pool-1-thread-1:0
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 出错了
at com.zhanghe.study.thread.pool.TestExecutors.lambda$testFixedThread$1(TestExecutors.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2:2
pool-1-thread-3:4

每次提交一个任务就创建一个线程,直到线程达到线程池的最大线程数。一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,而其他线程又没有空闲,那么线程池会补充一个新线程

适用于处理CPU密集型的任务,适用于执行长期的任务,防止任务时间长,且任务太多,导致队列中任务越积越多,使得机器内存飙升

newSingleThreadExecutor单一线程池

创建单个线程的线程池,只有唯一的工作线程,所有的任务都是串行执行的,如果这个唯一的线程因为异常结束,会有新的线程来代替它,此线程保证所有任务的执行顺序按照任务的提交顺序执行

相当于newFixedThreadPool(1)

1
2
3
4
5
6
7
8
// corePoolSize和maximumPoolSize都是1
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
// LinkedBlockingQueue是一个无界队列,可以有无限多的任务进行排队,所以创建的线程数不会超过corePoolSize,因此maximumPoolSize的值也就是无效的
new LinkedBlockingQueue<Runnable>()));
}
特点
  • 核心线程数和最大线程数都是1
  • 没有所谓的空闲时间,keepAliveTime为0
  • 阻塞队列为LinkedBlockingQueue
  • 就相当于newFixedThreadPool(1)
使用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 该线程池中只有一个线程 一直在复用这个线程 
ExecutorService pool = Executors.newSingleThreadExecutor();
for(int i = 0;i<5;i++){
int finalI = i;

pool.execute(() ->
{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
// 该线程出现异常,之后线程池会创建一个新的线程来执行其他任务
if(finalI == 3){
throw new RuntimeException("出错了");
}
System.out.println(Thread.currentThread().getName() + ":"+ finalI);
});
}
pool.shutdown();

--------------------
pool-1-thread-1:0
pool-1-thread-1:1
pool-1-thread-1:2
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 出错了
at com.zhanghe.study.thread.pool.TestExecutors.lambda$testSingleThread$0(TestExecutors.java:30)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2:4

这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行

newCachedThreadPool缓存线程池

缓存线程池,可以根据需求自动创建新线程,没有对线程池中线程的数量进行限制,最大可以为Integer.MAX_VALUE,如果有可用的线程就使用,如果没有的话,会自动创建新的线程,默认空闲60s移除

常用于执行一些生存期很短的任务,默认会移除60s内未被使用的线程,因此长时间保持空闲的线程池不会使用任何资源

1
2
3
4
5
6
7
public static ExecutorService newCachedThreadPool() {
// 默认空闲60s移除
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
// SynchronousQueue队列不进行存储,直接将任务交给线程处理,所以在没有空闲的线程时,会直接创建新的线程来执行任务
new SynchronousQueue<Runnable>());
}
特点
  • 核心线程数为0
  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是SynchronousQueue
  • 非核心线程空闲存活时间为60s
工作机制
  • 提交任务
  • 没有核心线程,会先加入到队列SynchronousQueue
  • 如果此时有空闲线程,则从队列中取出任务执行
  • 如果没有空闲线程,由于SynchronousQueue不会进行存储,就会新建一个线程去执行
  • 执行完任务的线程,还可以存活60s,如果该期间,接到任务,则会继续存活;否则会被销毁
使用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ExecutorService pool = Executors.newCachedThreadPool();
for(int i = 0;i<5;i++){
int finalI = i;

pool.execute(() ->
{
if(finalI == 3){
throw new RuntimeException("出错了");
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + ":"+ finalI);
});
}
pool.shutdown();

--------------------
Exception in thread "pool-1-thread-4" java.lang.RuntimeException: 出错了
at com.zhanghe.study.thread.pool.TestExecutors.lambda$testCachedThread$2(TestExecutors.java:75)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-1:0
pool-1-thread-5:4
pool-1-thread-3:2
pool-1-thread-2:1

要执行多少任务就创建多少线程,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添加新线程来处理任务

适用于并发执行大量短期的小任务

newScheduledThreadPool

调度型线程池,创建大小无限的的线程池,可以延迟或定时以及周期性的执行任务,使用的队列是DelayedWorkQueue,是一个延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// ScheduledThreadPoolExecutor是继承自ThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
特点
  • 可以设定核心线程数
  • 最大线程数为Integer.MAX_VALUE
  • keepAliveTime为0
使用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
for(int i = 0;i<5;i++){
int finalI = i;

pool.schedule(()->
{
if(finalI == 3){
throw new RuntimeException("出错了");
}

System.out.println(Thread.currentThread().getName() + ":"+ finalI);

},20, TimeUnit.SECONDS);
}
pool.shutdown();
1
2
3
// command是任务,delay指延迟的时间,unit为时间单位
// 该方法只执行一次,只是会延迟一段时间执行,并不是周期性任务
schedule(Runnable command,long delay, TimeUnit unit)
1
2
3
// 按照固定的频率来进行执行
// 该方法是在initialDelay时长后第一次执行任务,每隔period时间执行一次,period是从任务开始执行算起的,不管任务执行多长时间
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
1
2
3
// 延迟固定的时间, delay是从任务执行完成之后开始计算时间的
// 在每一次执行终止和下一次执行开始之间都存在给定的延迟
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
1
2
3
4
// 取得队列中的任务,还未执行的任务
public BlockingQueue<Runnable> getQueue() {
return super.getQueue();
}

newWorkStealingPool (工作窃取线程池)

大型的任务被分解成若干块放入到队列中,在队列中还可以继续细分,线程从队列中获取任务并进行执行,当所有的线程计算结束之后,再将各部分的结果进行合并来得到最终结果。

在处理器之间分配工作项,从而最大限度地利用所有可用的处理器来完成计算密集型任务,这项算法也用于 Java 的fork/join 框架

1
ExecutorService executorService = Executors.newWorkStealingPool();

线程池的提交和关闭

ExecutorService接口扩展了Executor接口并提供了提交任务和关闭线程池的方法

提交

1
2
3
4
5
6
7
8
9
10
11
//提交  为线程池中的线程分配任务
// future对象 isDone()方法来检查任务是否已经完成
// get()方法获取返回的结果,该方法会一直等待直到Callable对象的call()方法执行完成并返回结果,如果get()方法在等待结果时线程中断了,则会抛出InterruptedException异常,如果call()方法抛出异常那么get()方法将随之抛出ExecutionException异常
// submit有返回值
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);
// execute没有返回值
void execute(Runnable command);

submit()方法和execute()方法都可以提交任务执行,这两者有什么不同呢

  • submit()方法有返回值Future,而execute()方法是没有返回值的

但是对于ScheduledExecutorService延时调度,使用以下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

关闭

1
2
3
4
5
6
7
8
9
// 优雅的停止,停止接收新任务,等线程池任务执行完毕停止
// 状态变为SHUTDOWN
// 中断的是所有空闲的线程
// shutdown方法不会进行阻塞
void shutdown();
// 立即停止,并尝试打断正在执行的任务(如果Runnable中有判断中断的话if(Thread.currentThread().isInterrupted())则会中断,否则会将正在执行完的任务执行完),并且清空阻塞队列,返回为执行的任务
// 状态变为STOP
// 中断所有的线程
List<Runnable> shutdownNow();

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

pool.execute(() ->
{
int num = 0;
while (true){
// 判断是否被中断,shutdownNow方法会中断
if(!Thread.currentThread().isInterrupted()){
num++;
if(num % 1000 == 0){
System.out.println("---");
}
} else {
System.out.println("被中断了");
break;
}

}

});

线程池拒绝策略

线程池是有一定的容量限制的,不可能来多少任务接多少任务,那么如何拒绝任务呢?就用到了线程池的拒绝策略RejectedExecutionHandler,在jdk中提供了四种拒绝策略,分别是AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy,主要是执行rejectedExecution方法,下面分别介绍这四种拒绝策略

AbortPolicy

线程池队列满了,直接抛出异常RejectedExecutionException,默认策略

1
2
3
4
5
6
7
8
9
10
11
public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

CallerRunsPolicy

只要线程池没有关闭,直接在调用者线程(主线程)中执行该任务。不会等待线程池中的线程去执行

1
2
3
4
5
6
7
8
9
10
11
public static class CallerRunsPolicy implements RejectedExecutionHandler {

public CallerRunsPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

DiscardPolicy

什么事情都不干,直接丢弃当前将要加入队列的任务本身

1
2
3
4
5
6
7
8
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

DiscardOldestPolicy

丢弃掉等待队列中最老的一个请求,然后再次提交当前任务

1
2
3
4
5
6
7
8
9
10
11
12
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

自定义拒绝策略

实现RejectedExecutionHandler接口,来重写rejectedExecution()方法即可

线程池工作步骤

  • 任务提交后,如果线程池中正在运行的线程数量小于corePoolSize核心线程数,则创建新的线程执行任务
  • 如果正在运行的线程数量大于或等于corePoolSize核心线程数,则加入到workQueue等待队列
  • 如果workQueue等待队列已满,且正在运行的线程数量小于maximumPoolSize最大线程数,则创建非核心线程执行任务
  • 如果workQueue队列已满,且正在运行的线程数量大于或等于maximumPoolSize最大线程数,则执行handler拒绝策略
  • 当线程完成任务之后,会从队列中取出下一个任务继续执行
  • 当线程没有任务可做时,超过keepAliveTime存活时间,如果当前运行的线程数大于corePoolSize核心线程数,则该线程停止,直到线程数达到corePoolSize核心线程数

ForkJoinPool(分支合并)

ForkJoinPool用来通过分治技术将大任务拆分成小任务,再对每个小任务得到的结果进行汇总,在一个任务中,先检查将要执行的任务的大小,如果大于一个设定的大小,则将任务拆分成可以通过框架来执行的小任务,如果任务的大小比设定的要小,就可以直接执行,然后根据需要返回的任务的结果,底层采用了工作窃取模式(有的线程把自己队列中的任务执行完毕之后,发现其他线程的队列中还有任务等待处理,就去这些线程的队列中窃取一个任务来执行)

  • Fork操作:当需要将一个任务拆分成更小的多个任务时,在框架中执行这些任务
  • Join操作:当一个主任务等待其创建的多个子任务的完成执行

核心类有两个

  • ForkJoinPool 实现了ExecutorService接口和工作窃取算法,管理工作者线程,并提供任务的状态信息,以及任务的执行信息

    1
    public class ForkJoinPool extends AbstractExecutorService
  • ForkJoinTask 是在ForkJoinPool中执行的任务的父类,其有三个子类,RecursiveAction用于任务没有返回结果且仅执行一次任务的场景;RecursiveTask用于任务有返回结果的场景;CountedCompleter

    1
    public abstract class ForkJoinTask<V> implements Future<V>, Serializable

RecursiveAction

任务没有返回结果且仅执行一次任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class TestRecursiveAction {

public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new MyRecursiveAction(10,20));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}


static class MyRecursiveAction extends RecursiveAction {

private int begin;
private int end;

public MyRecursiveAction(int begin,int end){
this.begin = begin;
this.end = end;
}

@Override
protected void compute() {
if(end - begin > 2){ // fork分解进行计算
int mid = (begin + end) / 2;
MyRecursiveAction leftAction = new MyRecursiveAction(begin,mid);
MyRecursiveAction rightAction = new MyRecursiveAction(mid+1,end);
invokeAll(leftAction,rightAction);
} else {
System.out.println(Thread.currentThread().getName()+"===计算值为"+begin+"-"+end);
}

}
}
}

---
ForkJoinPool-1-worker-1===计算值为10-12
ForkJoinPool-1-worker-4===计算值为19-20
ForkJoinPool-1-worker-3===计算值为13-15
ForkJoinPool-1-worker-2===计算值为16-18

RecursiveTask

任务有返回结果,可以使用ForkJoinTask的get()或者join()方法来获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestRecursiveTask {

public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
MyRecursiveTask task = new MyRecursiveTask(10,20);
ForkJoinTask result = pool.submit(task);
System.out.println(result.join());
}

static class MyRecursiveTask extends RecursiveTask<Integer> {

private int begin;
private int end;

public MyRecursiveTask(int begin, int end) {
this.begin = begin;
this.end = end;
}

@Override
protected Integer compute() {
if (end - begin > 2) { // fork分解进行计算
int mid = (begin + end) / 2;
MyRecursiveTask leftTask = new MyRecursiveTask(begin, mid);
MyRecursiveTask rightTask = new MyRecursiveTask(mid + 1, end);
invokeAll(leftTask, rightTask);
return leftTask.join() + rightTask.join();
} else {
int result = 0;
for (int i = begin; i <= end; i++) {
result = i + result;
}
System.out.println(begin + "~" + end + "累加结果为"+result);
return result;
}

}
}
}

---
10~12累加结果为33
16~18累加结果为51
19~20累加结果为39
13~15累加结果为42
165

获取pool状态的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 活动的线程个数
public int getActiveThreadCount()
// 并行数量,与CPU的内核数相关
public int getParallelism()
// 任务池的大小
public int getPoolSize()
// 已经提交但尚未被执行的任务数量
public int getQueuedSubmissionCount()
// 任务的总个数
public long getQueuedTaskCount()
// 正在运行并且不在阻塞状态下的线程个数
public int getRunningThreadCount()
// 偷窃的任务个数
public long getStealCount()

扩展线程池

线程池中有几个方法是可以由开发者去继承重写来自定义自己的逻辑的

  • protected void beforeExecute(Thread t, Runnable r) 该方法在执行器中的某一任务执行之前被调用,接收将要执行的Runnable对象和将要执行这些对象的Thread对象
  • protected void afterExecute(Runnable r, Throwable t) 该方法在执行器中的某一任务执行之后被调用,接收的是已执行的Runnable对象和一个Throwable对象,存储了该任务中可能抛出的异常

欢迎关注我的其它发布渠道