0%

Callable接口

Callable接口

Runnable的缺陷

在创建线程的时候不管是继承Thread类(Thread本身也是实现的Runnable接口)还是实现Runnable接口,所实现的run()方法都没有返回值,使得需要将返回值写到map中,等到线程结束再去从map中取数据,特别的不方便

而且Runnable还有另一个问题是不能抛出任何异常,必须在run()方法中自己处理异常。

由于Runnable存在的两个问题,所以Callable接口和Future接口应运而生,这里来介绍一下Callable接口和Future接口

Callable的改善

Callable与Runnable不同,提供的方法为call()方法,大家看到,该方法是有返回值的,且可以抛出异常

获取Callable的返回值需要FutureTask的支持

1
2
3
4
5
6
7
8
9
public interface Callable<V> {

V call() throws Exception;
}

public interface Runnable {

public abstract void run();
}

Runnable和Callable接口的区别

  • Runnable中提供的是run()方法,Callable中提供的是call()方法
  • Runnable中的run()方法返回值为void,Callable中的call()方法有返回值
  • Runnable的run()方法不能抛出异常,Callable中的call方法可以抛出异常
  • Callable可以和Future、FutureTask配合获取异步执行的结果

自旋锁执行时间短、线程数少的时候使用(由于占用CPU) —–> Atomic和lock都是使用的自旋锁

FutureTask

Future接口

使用ExecutorService来执行Callable对象

1
2
3
4
5
6
// submit方法既可以传入Callable,也可以传入Runnable,如果传入Callable的话,可以使用get方法获取到值;如果传入Runnable的话,get方法不可以获取到值
<T> Future<T> submit(Callable<T> task);

Future<?> submit(Runnable task);
// result参数来作为执行结果的返回值
<T> Future<T> submit(Runnable task, T result);

看到该方法返回的是一个Future对象,Future对象是什么呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Future<V> {

// 在任务正常结束之前可以尝试取消任务,如果任务已完成或已取消,则次尝试失败。如果调用成功,且此任务还没有启动,则该任务将不会执行;如果任务已经启动,则mayInterruptIfRunning参数确定是否应该以试图停止任务的方式来中断此任务的线程
// 中断需要判断 if(Thread.currentThread().isInterrupted())
boolean cancel(boolean mayInterruptIfRunning);

// 如果任务正常结束之前被取消,isCancelled会返回true
boolean isCancelled();

// 检测任务是否结束,任务完成返回true,由于正常终止、异常或取消而完成,也会返回true
boolean isDone();

// get方法会将结果返回给调用方,会阻塞
V get() throws InterruptedException, ExecutionException;

// get方法会将结果返回给调用方,可以限制限制超时时间
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

但是线程中没有可以使用Callable接口和Future接口的地方,Thread唯一可用的参数是Runnable对象,那么如何才能用到这个好用的技术呢?

答案是FutureTask

FutureTask类

FutureTask类同时实现了Runnable接口和Future接口,同时拥有异步能力以及取消任务的能力,可以创建出可取消的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206

public class FutureTask<V> implements RunnableFuture<V> {
//NEW -> COMPLETING -> NORMAL 异步任务的正常结束,COMPLETING是在执行set方法为outcome赋值时的一个过渡状态,赋值完成后状态变为NORMAL
//NEW -> COMPLETING -> EXCEPTIONAL 异步任务执行过程中抛出异常,COMPLETING是在执行setException方法为outcome赋值时的一个过渡状态,赋值完成后状态变为EXCEPTIONAL
//NEW -> CANCELLED 调用cancel(false)将状态设置为CANCELLED
//NEW -> INTERRUPTING -> INTERRUPTED 调用了cancel(true)
// 新建状态
private static final int NEW = 0;
// 任务正在完成状态
private static final int COMPLETING = 1;
// 正常执行结束
private static final int NORMAL = 2;
// 非正常结束
private static final int EXCEPTIONAL = 3;
// 任务被取消,对应cancel(false)
private static final int CANCELLED = 4;
// 任务中断
private static final int INTERRUPTING = 5;
// 任务被中断,对应cancel(true)
private static final int INTERRUPTED = 6;

/** The underlying callable; nulled out after running */
private Callable<V> callable;
// 异步任务的结果
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
// 执行Callable任务的线程
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
// 线程等待节点
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;

// get方法阻塞
public V get() throws InterruptedException, ExecutionException {
int s = state;
// state小于等于COMPLETING表示还没开始执行
if (s <= COMPLETING)
// 休眠等待执行结果
s = awaitDone(false, 0L);
// 休眠返回,调用report获取结果
return report(s);
}

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 线程等待节点
WaitNode q = null;
// 是否入队
boolean queued = false;
// 自旋
for (;;) {
// 中断之后移除自身对应的等待的节点
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
// 状态大于COMPLETING,任务完成或者取消或者中断,则返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 状态等于COMPLETING,任务正在执行,获取任务结果的线程让出cpu
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 若当前线程还没有进入线程等待链表的WaitNode节点,此时新建一个WaitNode节点,并把当前线程赋值给WaitNode节点的thread属性,创建等待节点,在下一轮自旋时会入队
else if (q == null)
q = new WaitNode();
// 还没有入队,则入队等待执行结果
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 定时,则观察时间是否到了
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 被finishCompletion唤醒,自旋回到for循环重新往下执行
LockSupport.park(this);
}
}

// 执行
public void run() {
// 不是新建的任务 并且不能将执行线程由null变为当前线程,则直接返回
// 如果任务状态为NEW且runner为null,说明还未有线程执行过异步任务,则满足条件,可以执行
// 如果任务状态不为NEW,说明已经有线程执行过异步任务,没有必要再次执行,直接返回
// 如果任务状态为NEW且runner不为null,说明异步任务正在执行,直接返回

if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
// 异步任务结果
V result;
// 异步任务执行成功还是失败标志
boolean ran;
try {
// 执行任务的call方法
// Callable.call
result = c.call();
// 正常执行完成,设置为true
ran = true;
} catch (Throwable ex) { // 执行失败,也会修改状态,然后唤醒get方法阻塞的线程
result = null;
// 抛出异常,设置为false
ran = false;
// 异常结果赋值给outcome
setException(ex);
}
// 执行成功,则设置结果
if (ran)
// 赋值给outcome
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 异步任务正在执行过程中,runner一直都是非空的,防止并发执行
// 任务执行结束后,不管成功还是失败,都将runner设置为null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 表示被中断了,调用了cancel(true),调用handlePossibleCancellationInterrupt处理中断
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// 设置结果并进行通知
protected void set(V v) {
// 将状态由NEW 变为COMPLETING
// 任务不会被并发执行,导致状态不为NEW的原因是中断
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将结果赋给outcome
outcome = v;
// 将状态变为NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 唤醒调用get方法阻塞等待结果的线程
finishCompletion();
}
}

// 设置异常
protected void setException(Throwable t) {
// 任务不会被并发执行,导致状态不为NEW的原因是中断
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将异常赋给outcome
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

private void finishCompletion() {
// assert state > COMPLETING;
// waiters中存储的是调用了get方法的线程集合,遍历
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// waitNode节点的线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒阻塞的线程
LockSupport.unpark(t);
}
// 继续取下一个节点
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 正常执行完成还是抛出异常都会执行done方法
done();

callable = null; // to reduce footprint
}
}

public interface RunnableFuture<V> extends Runnable, Future<V> {

void run();
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class TestCallable {

public static void main(String[] args) {
CallDemo call = new CallDemo();
FutureTask<Integer> futureTask = new FutureTask<>(call);
new Thread(futureTask).start();

try {
// get方法是一个阻塞方法 在没有执行结束之前一直阻塞,直到执行完毕
int sum = futureTask.get();
System.out.println("---------");
System.out.println(sum);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

}
}

/**
* Callable相较于Runnable有返回值和异常
*/
class CallDemo implements Callable<Integer>{

@Override
public Integer call() throws Exception {
int sum = 0;
for(int i = 0;i<1000;i++){
sum +=i;
}
return sum;
}
}

CompletionService

Future具有阻塞同步性的特点,由于阻塞代码的运行效率会比较差,而CompletionService可以解决这样的问题

CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开进行处理,使用submit执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果,把线程池Executor和阻塞队列BlockingQueue融合在一起

ExecutorCompletionService是CompletionService的唯一实现类

1
2
3
4
5
6
7
8
9
// 依赖于Executor对象,completionQueue作为完成队列
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

源码

take

用来取得completionQueue队列中的Future对象,最先完成的会先进入completionQueue队列中,执行时间最短的最先返回,虽然调用completionService.take().get()也会进行阻塞,但是并不会阻塞后续的任务,哪个任务先执行完,哪个任务的返回值就先打印

1
2
3
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
poll

获取并移除表示下一个已完成任务的Future,如果不存在这样的任务,则返回null,无阻塞效果

1
2
3
public Future<V> poll() {
return completionQueue.poll();
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TestCompletionService {
public static void main(String[] args) {

try{
ExecutorService executorService = Executors.newFixedThreadPool(2);

ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(executorService);

for (int i = 0; i < 10; i++) {
executorCompletionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
long sleep = (long) (Math.random() * 1000);
System.out.println("sleep=" + sleep + " " + Thread.currentThread().getName());
Thread.sleep(sleep);
return Thread.currentThread().getName() + " " + sleep;
}
});
}

for (int i = 0; i < 10; i++) {
System.out.println(executorCompletionService.take().get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

}

}

欢迎关注我的其它发布渠道