线程池 Java线程池中将线程抽象为Worker,并存储在workers变量中进行维护,将需要执行的任务存储在workQueue中。其大致思想就是从workQueue中不断地取出需要执行的任务,放到workers中进行处理
线程池的优点 如果不用类似线程池的容器,每当我们需要异步执行用户任务的时候都去创建新的线程,任务执行完之后将线程销毁,这样频繁地创建和销毁线程会浪费大量的系统资源,线程池通过线程复用机制,并对线程进行统一管理
避免线程的创建和销毁带来的性能开销
避免大量的线程间因互相抢占系统资源导致的阻塞现象
能够对线程进行简单的管理并提供定时执行、间隔执行等功能
java在java.util.concurrent包中提供了Executor接口,在该接口的基础上整合出来一个Executor框架,该框架包含了线程池,Executor ,Executors ,ExecutorService ,ThreadPoolExecutor ,CompletionService ,Future ,Callable 等,该框架分离了任务的创建和执行,通过使用执行器,仅需要构建实现Runable接口的对象,然后将这些对象交给执行器即可,执行器通过创建所需要的线程,来负责这些Runnable对象任务执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public interface Executor { void execute (Runnable command) ; } public interface ExecutorService extends Executor { void shutdown () ; List<Runnable> shutdownNow () ; boolean isShutdown () ; boolean isTerminated () ; boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException ; <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task); }
之前在使用线程的时候,不管是继承Thread类还是实现Runnable或者Callable接口,都需要使用new Thread来创建线程,用完之后直接销毁,频繁的创建销毁线程比较浪费资源,于是出现了线程池的概念。
线程池中提供了一个线程队列,队列中保存着所有等待状态的线程,避免了创建和销毁的额外开销,提高了响应速度
ThreadPoolExecutor 构造方法 以参数最全的构造函数举例
1 2 3 4 5 6 7 8 9 10 11 12 public ThreadPoolExecutor (int corePoolSize, // 核心线程数,决定新提交的任务是新开线程去执行还是放到任务队列中,当线程数量小于corePoolSize时才会去创建线程,如果大于corePoolSize会将任务放入workQueue队列中 int maximumPoolSize, // 最大线程数,线程池能创建的最大线程数,达到该数值后将不会创建新的线程,再添加任务则采用拒绝策略 long keepAliveTime, // 线程超过corePoolSize后,多余线程的最大闲置时间,如果超过,则会终止,但是对于核心线程,如果allowCoreThreadTimeOut(boolean value) 设置为true 的话,核心线程也会被终止 TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
在ThreadPoolExecutor中有一个方法可以控制核心线程是否回收,如果设置allowCoreThreadTimeOut为false,则核心线程一直存活;如果为true,则空闲时间超过keepAliveTime时会回收
1 2 ((ThreadPoolExecutor) (exec)).allowCoreThreadTimeOut(true );
核心方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int runStateOf (int c) { return c & ~CAPACITY; }private static int workerCountOf (int c) { return c & CAPACITY; }private static int ctlOf (int rs, int wc) { return rs | wc; }private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();private final HashSet<Worker> workers = new HashSet<Worker>();private final Condition termination = mainLock.newCondition();private int largestPoolSize;private long completedTaskCount;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); } private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker Worker实现了Runnable接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 private final class Worker extends AbstractQueuedSynchronizer implements Runnable public void run () { runWorker(this ); } final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } } private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
其他方法 1 2 3 4 5 6 7 8 public <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException public <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException
Executors 在一般情况下使用线程池都是直接使用Executors类中提供的静态方法来直接创建线程池,这是jdk提供创建线程池的工具类,要用好这个工具类,一定要清楚每个方法所生成的线程池的特性以及线程池中每个参数的含义。
几种常用的创建线程池的方法
newFixedThreadPool固定大小线程池 创建固定大小 的线程池,每提交一个任务就创建一个线程,直到达到最大线程数,核心线程数与最大线程数相同,都为nThreads ,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程,阻塞队列长度为Integer.MAX_VALUE
1 2 3 4 5 6 7 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
特点
核心线程数和最大线程数大小一样
没有所谓的空闲时间,keepAliveTime为0
阻塞队列为LinkedBlockingQueue
工作机制
提交任务
如果线程数少于核心线程,创建核心线程执行任务
如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列中
如果线程执行完任务,则去阻塞队列中取任务,继续执行
使用方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 ExecutorService pool = Executors.newFixedThreadPool(2 ); for (int i = 0 ;i<5 ;i++){ int finalI = i; pool.execute(() -> { if (finalI == 3 ){ throw new RuntimeException("出错了" ); } try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":" + finalI); }); } pool.shutdown(); ------------------------- pool-1 -thread-2 :1 pool-1 -thread-1 :0 Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 出错了 at com.zhanghe.study.thread.pool.TestExecutors.lambda$testFixedThread$1 (TestExecutors.java:49 ) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149 ) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624 ) at java.lang.Thread.run(Thread.java:748 ) pool-1 -thread-2 :2 pool-1 -thread-3 :4
每次提交一个任务就创建一个线程,直到线程达到线程池的最大线程数。一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,而其他线程又没有空闲,那么线程池会补充一个新线程
适用于处理CPU密集型的任务,适用于执行长期的任务,防止任务时间长,且任务太多,导致队列中任务越积越多,使得机器内存飙升
newSingleThreadExecutor单一线程池 创建单个线程 的线程池,只有唯一的工作线程,所有的任务都是串行执行的,如果这个唯一的线程因为异常结束,会有新的线程来代替它,此线程保证所有任务的执行顺序按照任务的提交顺序执行
相当于newFixedThreadPool(1)
1 2 3 4 5 6 7 8 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
特点
核心线程数和最大线程数都是1
没有所谓的空闲时间,keepAliveTime为0
阻塞队列为LinkedBlockingQueue
就相当于newFixedThreadPool(1)
使用方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 ExecutorService pool = Executors.newSingleThreadExecutor(); for (int i = 0 ;i<5 ;i++){ int finalI = i; pool.execute(() -> { try { Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(System.err); } if (finalI == 3 ){ throw new RuntimeException("出错了" ); } System.out.println(Thread.currentThread().getName() + ":" + finalI); }); } pool.shutdown(); -------------------- pool-1 -thread-1 :0 pool-1 -thread-1 :1 pool-1 -thread-1 :2 Exception in thread "pool-1-thread-1" java.lang.RuntimeException: 出错了 at com.zhanghe.study.thread.pool.TestExecutors.lambda$testSingleThread$0 (TestExecutors.java:30 ) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149 ) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624 ) at java.lang.Thread.run(Thread.java:748 ) pool-1 -thread-2 :4
这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行
newCachedThreadPool缓存线程池 缓存线程池 ,可以根据需求自动创建新线程,没有对线程池中线程的数量进行限制 ,最大可以为Integer.MAX_VALUE,如果有可用的线程就使用,如果没有的话,会自动创建新的线程,默认空闲60s移除
常用于执行一些生存期很短的任务,默认会移除60s内未被使用的线程,因此长时间保持空闲的线程池不会使用任何资源
1 2 3 4 5 6 7 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
特点
核心线程数为0
最大线程数为Integer.MAX_VALUE
阻塞队列是SynchronousQueue
非核心线程空闲存活时间为60s
工作机制
提交任务
没有核心线程,会先加入到队列SynchronousQueue
如果此时有空闲线程,则从队列中取出任务执行
如果没有空闲线程,由于SynchronousQueue不会进行存储,就会新建一个线程去执行
执行完任务的线程,还可以存活60s,如果该期间,接到任务,则会继续存活;否则会被销毁
使用方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 ExecutorService pool = Executors.newCachedThreadPool(); for (int i = 0 ;i<5 ;i++){ int finalI = i; pool.execute(() -> { if (finalI == 3 ){ throw new RuntimeException("出错了" ); } try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":" + finalI); }); } pool.shutdown(); -------------------- Exception in thread "pool-1-thread-4" java.lang.RuntimeException: 出错了 at com.zhanghe.study.thread.pool.TestExecutors.lambda$testCachedThread$2 (TestExecutors.java:75 ) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149 ) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624 ) at java.lang.Thread.run(Thread.java:748 ) pool-1 -thread-1 :0 pool-1 -thread-5 :4 pool-1 -thread-3 :2 pool-1 -thread-2 :1
要执行多少任务就创建多少线程,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添加新线程来处理任务
适用于并发执行大量短期的小任务
newScheduledThreadPool 调度型线程池,创建大小无限的的线程池,可以延迟或定时以及周期性的执行任务,使用的队列是DelayedWorkQueue,是一个延迟队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue()); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
特点
可以设定核心线程数
最大线程数为Integer.MAX_VALUE
keepAliveTime为0
使用方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ScheduledExecutorService pool = Executors.newScheduledThreadPool(10 ); for (int i = 0 ;i<5 ;i++){ int finalI = i; pool.schedule(()-> { if (finalI == 3 ){ throw new RuntimeException("出错了" ); } System.out.println(Thread.currentThread().getName() + ":" + finalI); },20 , TimeUnit.SECONDS); } pool.shutdown();
1 2 3 schedule(Runnable command,long delay, TimeUnit unit)
1 2 3 scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
1 2 3 scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
1 2 3 4 public BlockingQueue<Runnable> getQueue () { return super .getQueue(); }
newWorkStealingPool (工作窃取线程池) 大型的任务被分解成若干块放入到队列中,在队列中还可以继续细分,线程从队列中获取任务并进行执行,当所有的线程计算结束之后,再将各部分的结果进行合并来得到最终结果。
在处理器之间分配工作项,从而最大限度地利用所有可用的处理器来完成计算密集型任务,这项算法也用于 Java 的fork/join 框架
1 ExecutorService executorService = Executors.newWorkStealingPool();
线程池的提交和关闭 ExecutorService接口扩展了Executor接口并提供了提交任务和关闭线程池的方法
提交 1 2 3 4 5 6 7 8 9 10 11 <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task); void execute (Runnable command) ;
submit()方法和execute()方法都可以提交任务执行,这两者有什么不同呢
submit()方法有返回值Future,而execute()方法是没有返回值的
但是对于ScheduledExecutorService延时调度,使用以下方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit) ;public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
关闭 1 2 3 4 5 6 7 8 9 void shutdown () ;List<Runnable> shutdownNow () ;
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 pool.execute(() -> { int num = 0 ; while (true ){ if (!Thread.currentThread().isInterrupted()){ num++; if (num % 1000 == 0 ){ System.out.println("---" ); } } else { System.out.println("被中断了" ); break ; } } });
线程池拒绝策略 线程池是有一定的容量限制的,不可能来多少任务接多少任务,那么如何拒绝任务呢?就用到了线程池的拒绝策略RejectedExecutionHandler,在jdk中提供了四种拒绝策略,分别是AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy,主要是执行rejectedExecution方法,下面分别介绍这四种拒绝策略
AbortPolicy 线程池队列满了,直接抛出异常RejectedExecutionException,默认策略
1 2 3 4 5 6 7 8 9 10 11 public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
CallerRunsPolicy 只要线程池没有关闭,直接在调用者线程(主线程)中执行该任务。不会等待线程池中的线程去执行
1 2 3 4 5 6 7 8 9 10 11 public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
DiscardPolicy 什么事情都不干,直接丢弃当前将要加入队列的任务本身
1 2 3 4 5 6 7 8 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy 丢弃掉等待队列中最老的一个请求,然后再次提交当前任务
1 2 3 4 5 6 7 8 9 10 11 12 public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
自定义拒绝策略 实现RejectedExecutionHandler接口,来重写rejectedExecution()方法即可
线程池工作步骤
任务提交后,如果线程池中正在运行的线程数量小于corePoolSize核心线程数 ,则创建新的线程执行任务
如果正在运行的线程数量大于或等于corePoolSize核心线程数 ,则加入到workQueue等待队列 中
如果workQueue等待队列 已满,且正在运行的线程数量小于maximumPoolSize最大线程数 ,则创建非核心线程执行任务
如果workQueue队列 已满,且正在运行的线程数量大于或等于maximumPoolSize最大线程数 ,则执行handler拒绝策略
当线程完成任务之后,会从队列中取出下一个任务继续执行
当线程没有任务可做时,超过keepAliveTime存活时间 ,如果当前运行的线程数大于corePoolSize核心线程数 ,则该线程停止,直到线程数达到corePoolSize核心线程数
ForkJoinPool(分支合并) ForkJoinPool用来通过分治技术将大任务拆分成小任务,再对每个小任务得到的结果进行汇总,在一个任务中,先检查将要执行的任务的大小,如果大于一个设定的大小,则将任务拆分成可以通过框架来执行的小任务,如果任务的大小比设定的要小,就可以直接执行,然后根据需要返回的任务的结果,底层采用了工作窃取模式 (有的线程把自己队列中的任务执行完毕之后,发现其他线程的队列中还有任务等待处理,就去这些线程的队列中窃取一个任务来执行)
Fork操作:当需要将一个任务拆分成更小的多个任务时,在框架中执行这些任务
Join操作:当一个主任务等待其创建的多个子任务的完成执行
核心类有两个
RecursiveAction 任务没有返回结果且仅执行一次任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class TestRecursiveAction { public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool(); pool.submit(new MyRecursiveAction(10 ,20 )); try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } } static class MyRecursiveAction extends RecursiveAction { private int begin; private int end; public MyRecursiveAction (int begin,int end) { this .begin = begin; this .end = end; } @Override protected void compute () { if (end - begin > 2 ){ int mid = (begin + end) / 2 ; MyRecursiveAction leftAction = new MyRecursiveAction(begin,mid); MyRecursiveAction rightAction = new MyRecursiveAction(mid+1 ,end); invokeAll(leftAction,rightAction); } else { System.out.println(Thread.currentThread().getName()+"===计算值为" +begin+"-" +end); } } } } --- ForkJoinPool-1 -worker-1 ===计算值为10 -12 ForkJoinPool-1 -worker-4 ===计算值为19 -20 ForkJoinPool-1 -worker-3 ===计算值为13 -15 ForkJoinPool-1 -worker-2 ===计算值为16 -18
RecursiveTask 任务有返回结果,可以使用ForkJoinTask的get()或者join()方法来获取结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class TestRecursiveTask { public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool(); MyRecursiveTask task = new MyRecursiveTask(10 ,20 ); ForkJoinTask result = pool.submit(task); System.out.println(result.join()); } static class MyRecursiveTask extends RecursiveTask <Integer > { private int begin; private int end; public MyRecursiveTask (int begin, int end) { this .begin = begin; this .end = end; } @Override protected Integer compute () { if (end - begin > 2 ) { int mid = (begin + end) / 2 ; MyRecursiveTask leftTask = new MyRecursiveTask(begin, mid); MyRecursiveTask rightTask = new MyRecursiveTask(mid + 1 , end); invokeAll(leftTask, rightTask); return leftTask.join() + rightTask.join(); } else { int result = 0 ; for (int i = begin; i <= end; i++) { result = i + result; } System.out.println(begin + "~" + end + "累加结果为" +result); return result; } } } } --- 10 ~12 累加结果为33 16 ~18 累加结果为51 19 ~20 累加结果为39 13 ~15 累加结果为42 165
获取pool状态的方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public int getActiveThreadCount () public int getParallelism () public int getPoolSize () public int getQueuedSubmissionCount () public long getQueuedTaskCount () public int getRunningThreadCount () public long getStealCount ()
扩展线程池 线程池中有几个方法是可以由开发者去继承重写来自定义自己的逻辑的
protected void beforeExecute(Thread t, Runnable r) 该方法在执行器中的某一任务执行之前被调用,接收将要执行的Runnable对象和将要执行这些对象的Thread对象
protected void afterExecute(Runnable r, Throwable t) 该方法在执行器中的某一任务执行之后被调用,接收的是已执行的Runnable对象和一个Throwable对象,存储了该任务中可能抛出的异常