jdk源码解析四之FutureTask Dear 丶 2023-02-20 12:08 3阅读 0赞 ### 文章目录 ### * FutureTask * * 构造 * 带返回构造 * run * get * cancel * finishCompletion * 总结 # FutureTask # 提前加载稍后需要的数据 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70] Future表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70 1] Callable返回结果并且可能抛出异常的任务。实现者定义了一个不带任何参数的叫做 call 的方法。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70 2] /** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; //表示是个新的任务或者还没被执行完的任务。这是初始状态。 private static final int NEW = 0; //任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段 // (outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候, // 状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。 private static final int COMPLETING = 1; //任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。 private static final int NORMAL = 2; //任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。 private static final int EXCEPTIONAL = 3; //任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程, // 这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。 private static final int CANCELLED = 4; //任务还没开始执行或者已经执行但是还没有执行完成的时候, // 用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前, // 状态会从NEW转化为INTERRUPTING。这是一个中间状态。 private static final int INTERRUPTING = 5; //调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。 private static final int INTERRUPTED = 6; ## 构造 ## public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); //设置回调函数,返回value this.callable = callable; //初始状态 this.state = NEW; // ensure visibility of callable } ## 带返回构造 ## public FutureTask(Runnable runnable, V result) { //任务执行成果,返回result this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; //适配器,底层包装task RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); //返回设置结果 return result; } } ## run ## 因为继承了runnable接口,所以使用线程时,直接调用run public void run() { //当前状态不是新建,说明已经执行过或者取消了,直接返回 if (state != NEW || // 状态为新建,则尝试添加当前线程到runner中,如果失败直接返回 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; //回调函数有值且状态新建 if (c != null && state == NEW) { V result; boolean ran; try { //调用回调函数,保存值到result中 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //异常处理 setException(ex); } //正确执行,则赋值 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; //任务中断,中断执行 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void setException(Throwable t) { //将当前状态 NEW => COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //异常保存到outcome中 outcome = t; //将当前状态COMPLETING => EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } protected void set(V v) { // NEW => COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // outcome保存返回结果 outcome = v; //COMPLETING => NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } ## get ## public V get() throws InterruptedException, ExecutionException { int s = state; //还在执行,则阻塞等待 if (s <= COMPLETING) s = awaitDone(false, 0L); //返回结果 return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { //计算阻塞时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //对中断的处理 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //已经取消或者出现异常,则清空线程,返回值 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet //正在执行则暂停当前线程,并允许执行其他线程 Thread.yield(); else if (q == null) //创建等待节点 q = new WaitNode(); else if (!queued) //当前节点添加到头 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { //计算超时时间 nanos = deadline - System.nanoTime(); //对超时的处理 if (nanos <= 0L) { //删除节点,返回状态值 removeWaiter(q); return state; } //阻塞等待特定时间 LockSupport.parkNanos(this, nanos); } else //阻塞等待直到被其他线程唤醒 LockSupport.park(this); } } private void removeWaiter(WaitNode node) { if (node != null) { //清空当前线程 node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; // pred => q => s if (q.thread != null) pred = q; else if (pred != null) { //删除中间thread为null的数据 //当q.thread=null且,pred不为null,则删除q节点 pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (! //删除waiters节点的头数据,并更新为waiters.next //当q.thread=null且,pred为null,则替换为s节点,如果q为waiters节点,且修改失败,则跳过当前循环,继续执行下一个循环 UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } } private V report(int s) throws ExecutionException { //获取结果值,有可能是异常 Object x = outcome; //执行完毕,则返回结果 if (s == NORMAL) return (V)x; //异常或取消,则抛出异常 if (s >= CANCELLED) throw new CancellationException(); //抛出异常 throw new ExecutionException((Throwable)x); } ## cancel ## public boolean cancel(boolean mayInterruptIfRunning) { if (!( //如果任务已经结束 state == NEW && //且将 NEW => mayInterruptIfRunning需要中断则设置为 INTERRUPTING 否则 CANCELLED UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) //直接返回 return false; try { // in case call to interrupt throws exception //需要中断 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) //中断 t.interrupt(); } finally { // final state //修改状态为INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } ## finishCompletion ## private void finishCompletion() { //成功,异常,中断的时候会调用 // assert state > COMPLETING; //依次遍历waiters链表,唤醒节点中的线程,然后把callable置空。 for (WaitNode q; (q = waiters) != null;) { //清空waiters if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { //清空线程,且唤醒 q.thread = null; //唤醒 LockSupport.unpark(t); } WaitNode next = q.next; //到达边界,则跳出 if (next == null) break; //q更新为下一个节点 q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } ## 总结 ## 每次涉及到成功,异常,中断的时候会唤醒所有等待的线程 最后执行的会先添加到等待线程头节点中.也就是最先唤醒的时最后执行的线程 内部维护7个状态,分别监控执行,取消,中断3个执行过程. [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70]: https://img-blog.csdnimg.cn/20200527143825895.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw==,size_16,color_FFFFFF,t_70 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70 1]: https://img-blog.csdnimg.cn/20200530094844834.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw==,size_16,color_FFFFFF,t_70 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw_size_16_color_FFFFFF_t_70 2]: https://img-blog.csdnimg.cn/20200530095040909.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L051YW5fRmVuZw==,size_16,color_FFFFFF,t_70
相关 jdk源码解析八之NIO 文章目录 Buffer ByteBuffer MappedByteBuffer DirectByteBu 素颜马尾好姑娘i/ 2023年02月22日 05:12/ 0 赞/ 114 阅读
相关 jdk源码解析八之BIO 文章目录 字节流 InputStream FilterInputStream ByteArrayInputSt 怼烎@/ 2023年02月21日 14:21/ 0 赞/ 30 阅读
相关 jdk源码解析七之ReadWriteLock 文章目录 ReadWriteLock ReentrantReadWriteLock 构造 获取读写锁 灰太狼/ 2023年02月21日 11:42/ 0 赞/ 39 阅读
相关 jdk源码解析四之FutureTask 文章目录 FutureTask 构造 带返回构造 run get cancel f Dear 丶/ 2023年02月20日 12:08/ 0 赞/ 4 阅读
相关 jdk源码解析四之CountDownLatch 文章目录 CountDownLatch await countDown cancelAcquire 总结 怼烎@/ 2023年02月20日 12:08/ 0 赞/ 12 阅读
相关 jdk源码解析三之ConcurrentHashMap 文章目录 ConcurrentHashMap put 初始化 扩容 ge た 入场券/ 2023年02月20日 12:07/ 0 赞/ 29 阅读
相关 JUC框架 FutureTask源码解析 JDK8 文章目录 前言 状态 消费者链表 成员 构造器 实现Runnable接口 实现Future接口 普通get、超时ge 深碍√TFBOYSˉ_/ 2022年11月27日 15:38/ 0 赞/ 198 阅读
相关 追踪解析 FutureTask 源码 零 前期准备 0 FBI WARNING 文章异常啰嗦且绕弯。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018. ゝ一世哀愁。/ 2022年03月07日 06:56/ 0 赞/ 290 阅读
还没有评论,来说两句吧...