Callable接口 Runnable的缺陷 在创建线程的时候不管是继承Thread类(Thread本身也是实现的Runnable接口)还是实现Runnable接口,所实现的run()方法都没有返回值,使得需要将返回值写到map中,等到线程结束再去从map中取数据,特别的不方便
而且Runnable还有另一个问题是不能抛出任何异常,必须在run()方法中自己处理异常。
由于Runnable存在的两个问题,所以Callable接口和Future接口应运而生,这里来介绍一下Callable接口和Future接口
Callable的改善 Callable与Runnable不同,提供的方法为call()方法,大家看到,该方法是有返回值的,且可以抛出异常
获取Callable的返回值需要FutureTask的支持
1 2 3 4 5 6 7 8 9 public interface Callable <V > { V call () throws Exception ; } public interface Runnable { public abstract void run () ; }
Runnable和Callable接口的区别
Runnable中提供的是run()方法,Callable中提供的是call()方法
Runnable中的run()方法返回值为void,Callable中的call()方法有返回值
Runnable的run()方法不能抛出异常,Callable中的call方法可以抛出异常
Callable可以和Future、FutureTask配合获取异步执行的结果
自旋锁执行时间短、线程数少的时候使用(由于占用CPU) ——-> Atomic和lock都是使用的自旋锁
FutureTask Future接口 使用ExecutorService来执行Callable对象
1 2 3 4 5 6 <T> Future<T> submit (Callable<T> task) ; Future<?> submit(Runnable task); <T> Future<T> submit (Runnable task, T result) ;
看到该方法返回的是一个Future对象,Future对象是什么呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface Future <V > { boolean cancel (boolean mayInterruptIfRunning) ; boolean isCancelled () ; boolean isDone () ; V get () throws InterruptedException, ExecutionException ; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ;}
但是线程中没有可以使用Callable接口和Future接口的地方,Thread唯一可用的参数是Runnable对象,那么如何才能用到这个好用的技术呢?
答案是FutureTask
FutureTask类 FutureTask类同时实现了Runnable接口和Future接口,同时拥有异步获取结果以及取消任务的能力,可以创建出可取消的任务。
通过传入Runnable或者Callable的任务给FutureTask,通过FutureTask的get方法异步获取执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 public class FutureTask <V > implements RunnableFuture <V > { private static final int NEW = 0 ; private static final int COMPLETING = 1 ; private static final int NORMAL = 2 ; private static final int EXCEPTIONAL = 3 ; private static final int CANCELLED = 4 ; private static final int INTERRUPTING = 5 ; private static final int INTERRUPTED = 6 ; private Callable<V> callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters; private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); } private int awaitDone (boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L ; WaitNode q = null ; boolean queued = false ; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null ) q.thread = null ; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null ) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this , waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L ) { removeWaiter(q); return state; } LockSupport.parkNanos(this , nanos); } else LockSupport.park(this ); } } public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set (V v) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this , stateOffset, NORMAL); finishCompletion(); } } protected void setException (Throwable t) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this , stateOffset, EXCEPTIONAL); finishCompletion(); } } private void finishCompletion () { for (WaitNode q; (q = waiters) != null ;) { if (UNSAFE.compareAndSwapObject(this , waitersOffset, q, null )) { for (;;) { Thread t = q.thread; if (t != null ) { q.thread = null ; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null ) break ; q.next = null ; q = next; } break ; } } done(); callable = null ; } } public interface RunnableFuture <V > extends Runnable , Future <V > { void run () ; }
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class TestCallable { public static void main (String[] args) { CallDemo call = new CallDemo(); FutureTask<Integer> futureTask = new FutureTask<>(call); new Thread(futureTask).start(); try { int sum = futureTask.get(); System.out.println("---------" ); System.out.println(sum); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } class CallDemo implements Callable <Integer > { @Override public Integer call () throws Exception { int sum = 0 ; for (int i = 0 ;i<1000 ;i++){ sum +=i; } return sum; } }
CompletionService Future具有阻塞同步性的特点,由于阻塞代码的运行效率会比较差,而CompletionService可以解决这样的问题
CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开进行处理,使用submit执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果,把线程池Executor和阻塞队列BlockingQueue融合在一起
ExecutorCompletionService是CompletionService的唯一实现类
1 2 3 4 5 6 7 8 9 public ExecutorCompletionService (Executor executor) { if (executor == null ) throw new NullPointerException(); this .executor = executor; this .aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null ; this .completionQueue = new LinkedBlockingQueue<Future<V>>(); }
源码 take 用来取得completionQueue队列中的Future对象,最先完成的会先进入completionQueue队列中,执行时间最短的最先返回,虽然调用completionService.take().get()也会进行阻塞,但是并不会阻塞后续的任务,哪个任务先执行完,哪个任务的返回值就先打印
1 2 3 public Future<V> take () throws InterruptedException { return completionQueue.take(); }
poll 获取并移除表示下一个已完成任务的Future,如果不存在这样的任务,则返回null,无阻塞效果
1 2 3 public Future<V> poll () { return completionQueue.poll(); }
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class TestCompletionService { public static void main (String[] args) { try { ExecutorService executorService = Executors.newFixedThreadPool(2 ); ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(executorService); for (int i = 0 ; i < 10 ; i++) { executorCompletionService.submit(new Callable<String>() { @Override public String call () throws Exception { long sleep = (long ) (Math.random() * 1000 ); System.out.println("sleep=" + sleep + " " + Thread.currentThread().getName()); Thread.sleep(sleep); return Thread.currentThread().getName() + " " + sleep; } }); } for (int i = 0 ; i < 10 ; i++) { System.out.println(executorCompletionService.take().get()); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }