0%

线程池

线程池

java在java.util.concurrent包中提供了Executor接口,在该接口的基础上整合出来一个Executor框架

1
2
3
public interface Executor {
void execute(Runnable command);
}

之前在使用线程的时候,不管是继承Thread类还是实现Runnable或者Callable接口,都需要使用new Thread来创建线程,用完之后直接销毁,频繁的创建销毁线程比较浪费资源,于是出现了线程池的概念。

线程池中提供了一个线程队列,队列中保存着所有等待状态的线程,避免了创建和销毁的额外开销,提高了响应速度

线程池

以参数最全的构造函数举例

1
2
3
4
5
6
7
8

public ThreadPoolExecutor(int corePoolSize, // 核心线程数,决定新提交的任务是新开线程去执行还是放到任务队列中,当线程数量小于corePoolSize时才回去创建线程
int maximumPoolSize, // 最大线程数,线程池能创建的最大线程数
long keepAliveTime, // 线程超过corePoolSize后,多余线程的最大存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 存放未处理的任务的阻塞队列
ThreadFactory threadFactory, // 生产线程的工厂类,可以用来定义线程名称以及线程的优先级
RejectedExecutionHandler handler)//拒接策略

Executors

在一般情况下使用线程池都是直接使用Executors类中提供的静态方法来直接创建线程池,这是jdk提供创建线程池的工具类,但是要用好这个工具类,一定要清楚每个方法所生成的线程池的特性以及线程池中每个参数的含义。

几种常用的创建线程池的方法

newFixedThreadPool

创建固定大小的线程池,每提交一个任务就创建一个线程,直到达到最大线程数,核心线程数与最大线程数相同

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 创建固定大小的线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
for(int i = 0;i<5;i++){
int finalI = i;

pool.execute(() ->
{
if(finalI == 3){
throw new RuntimeException("出错了");
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + ":"+ finalI);
});
}
pool.shutdown();

-------------------------
pool-1-thread-2:1
pool-1-thread-1:0
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 出错了
at com.zhanghe.study.thread.pool.TestExecutors.lambda$testFixedThread$1(TestExecutors.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2:2
pool-1-thread-3:4

每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,而其他线程有没有空闲,那么线程池会补充一个新线程

newSingleThreadExecutor

创建单个线程的线程池,只有唯一的工作线程

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 该线程池中只有一个线程 一直在复用这个线程 
ExecutorService pool = Executors.newSingleThreadExecutor();
for(int i = 0;i<5;i++){
int finalI = i;

pool.execute(() ->
{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
// 该线程出现异常,之后线程池会创建一个新的线程来执行其他任务
if(finalI == 3){
throw new RuntimeException("出错了");
}
System.out.println(Thread.currentThread().getName() + ":"+ finalI);
});
}
pool.shutdown();

--------------------
pool-1-thread-1:0
pool-1-thread-1:1
pool-1-thread-1:2
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 出错了
at com.zhanghe.study.thread.pool.TestExecutors.lambda$testSingleThread$0(TestExecutors.java:30)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2:4

这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行

newCachedThreadPool

缓存线程池,可以根据需求自动的更改数量,没有对线程池中线程的数量进行限制

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ExecutorService pool = Executors.newCachedThreadPool();
for(int i = 0;i<5;i++){
int finalI = i;

pool.execute(() ->
{
if(finalI == 3){
throw new RuntimeException("出错了");
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + ":"+ finalI);
});
}
pool.shutdown();

--------------------
Exception in thread "pool-1-thread-4" java.lang.RuntimeException: 出错了
at com.zhanghe.study.thread.pool.TestExecutors.lambda$testCachedThread$2(TestExecutors.java:75)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-1:0
pool-1-thread-5:4
pool-1-thread-3:2
pool-1-thread-2:1

要执行多少任务就创建多少线程,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添加新线程来处理任务

newScheduledThreadPool

创建固定大小的线程池,可以延迟或定时的执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
for(int i = 0;i<5;i++){
int finalI = i;

pool.schedule(()->
{
if(finalI == 3){
throw new RuntimeException("出错了");
}

System.out.println(Thread.currentThread().getName() + ":"+ finalI);

},20, TimeUnit.SECONDS);
}
pool.shutdown();
1
2
3
// command是任务,delay指延迟的时间,unit为时间单位
// 该方法只执行一次,只是会延迟一段时间执行
schedule(Runnable command,long delay, TimeUnit unit)
1
2
// 该方法是每隔period时间执行一次,不管任务执行多长时间
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
1
2
// 在每一次执行终止和下一次执行开始之间都存在给定的延迟
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

newWorkStealingPool (工作窃取线程池)

大型的任务被分解成若干块放入到队列中,在队列中还可以继续细分,线程从队列中获取任务并进行执行,当所有的线程计算结束之后,再将各部分的结果进行合并来得到最终结果。

在处理器之间分配工作项,从而最大限度地利用所有可用的处理器来完成计算密集型任务,这项算法也用于 Java 的fork/join 框架

1
ExecutorService executorService = Executors.newWorkStealingPool();

线程池的提交和关闭

ExecutorService接口扩展了Executor接口并提供了提交任务和关闭线程池的方法

1
2
3
4
5
6
7
8
//提交  为线程池中的线程分配任务
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

void execute(Runnable command);
1
2
3
4
// 优雅的停止,停止接收新任务,等线程池任务执行完毕停止
void shutdown();
// 立即停止
List<Runnable> shutdownNow();

submit()方法和execute()方法都可以提交任务执行,这两者有什么不同呢

  • submit()方法有返回值Future,而execute()方法是没有返回值的

但是对于ScheduledExecutorService延时调度,使用以下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

线程池拒绝策略

线程池是有一定的容量限制的,不可能来多少任务接多少任务,那么如何拒绝任务呢?就用到了线程池的拒绝策略RejectedExecutionHandler,在jdk中提供了四种拒绝策略,分别是AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy,主要是执行rejectedExecution方法,下面分别介绍这四种拒绝策略

AbortPolicy

总是抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

CallerRunsPolicy

只要线程池没有关闭,直接在调用者线程中执行该任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

DiscardPolicy

什么事情都不干,直接丢弃

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

DiscardOldestPolicy

丢弃掉最老的一个请求,然后再次提交当前任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

线程池工作步骤

  • 如果线程池中正在运行的线程数量小于corePoolSize核心线程数,则创建新的线程执行任务
  • 如果正在运行的线程数量大于或等于corePoolSize核心线程数,则加入到workQueue等待队列
  • 如果workQueue等待队列已满,且正在运行的线程数量小于maximumPoolSize最大线程数,则创建线程执行任务
  • 如果workQueue队列已满,且正在运行的线程数量大于或等于maximumPoolSize最大线程数,则执行handler拒绝策略
  • 当线程完成任务之后,会从队列中取出下一个任务继续执行
  • 当线程没有任务可做时,超过keepAliveTime存活时间,如果当前运行的线程数大于corePoolSize核心线程数,则该线程停止,直到线程数达到corePoolSize核心线程数

ForkJoinPool(分支合并)

存在一个大任务,将大任务进行拆分(fork)成若干个小任务,再将一个个的小任务运算进行join操作

底层采用了工作窃取模式