guava——ListenableFuture 谁践踏了优雅 2022-12-14 03:35 138阅读 0赞 ## ListenableFuture简介 ## public interface ListenableFuture extends Future ListenableFuture继承自Futures,同样是接口,主要的功能是允许给多线程任务添加监听器listener,当任务执行完之后,会自动触发listener的调用 不同的listener可以在不同的Executors中执行,同一个的ListenableFuture可以注册多个listener。 ## 初始化ListenableFuture对象 ## 通过MoreExecutors.listeningDecorator方法初始化一个ListeningExecutorService,然后使用此实例的submit方法即可初始化ListenableFuture对象 ListeningExecutorService executorService = MoreExecutors.listeningDecorator( new ThreadPoolExecutor(2, 10,20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy())); ListenableFuture<String> listenableFuture = executorService.submit(() -> { System.out.println("call execute.."); TimeUnit.SECONDS.sleep(6); return "I'am yellow duck"; }); ## 使用ListenableFuture注册监听器 ## 接口声明了addListener方法,给future添加相应的listener,当future执行完之后,listener就会被调用执行 addListener(Runnable listener, Executor executor) Registers a listener to be run on the given executor. 使用如下: listenableFuture.addListener(() -> { try { System.out.println("get listenable future's result -- " + listenableFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }, executorService); ## 给ListenableFuture添加回调函数 ## 通过Futures.addCallback给ListenableFuture添加回调函数 使用如下: Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println("get listenable future's result with callback -- " + result); } @Override public void onFailure(Throwable t) { t.printStackTrace(); } }); ## ListenableFuture其他功能 ## * SettableFuture:不需要实现一个方法来计算返回值,而只需要返回一个固定值来做为返回值,可以通过程序设置此Future的返回值或者异常信息 * CheckedFuture: 这是一个继承自ListenableFuture接口,他提供了checkedGet方法,此方法在Future执行发生异常时,可以抛出指定类型的异常 Function<Exception, ApplicationException> mapper = from -> { if (from != null && from.getCause() instanceof ApplicationException) { try { throw (ApplicationException) from.getCause(); } catch (ApplicationException e) { e.printStackTrace(); } } return new ApplicationException("1", null); }; CheckedFuture<String, ApplicationException> checkedFuture = Futures.makeChecked(listenableFuture, mapper); try { checkedFuture.checkedGet(1, TimeUnit.SECONDS); } catch (TimeoutException | ApplicationException e) { e.printStackTrace(); } ## transform ## transform(ListenableFuture<I> future, Function<? super I,? extends O> function, Executor exec) transform是Future提供的静态函数,可以对future的返回值进行自定义函数的转换,使用如下: Function<String, String> transFunction = queryResult -> "transformed --" + queryResult; ListenableFuture<String> transformFuture = transform(listenableFuture, transFunction, executorService); try { System.out.println(transformFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } ## 完整测试代码 ## import org.omg.CORBA.portable.ApplicationException; import org.weakref.jmx.internal.guava.base.Function; import org.weakref.jmx.internal.guava.util.concurrent.CheckedFuture; import org.weakref.jmx.internal.guava.util.concurrent.FutureCallback; import org.weakref.jmx.internal.guava.util.concurrent.Futures; import org.weakref.jmx.internal.guava.util.concurrent.ListenableFuture; import org.weakref.jmx.internal.guava.util.concurrent.ListeningExecutorService; import org.weakref.jmx.internal.guava.util.concurrent.MoreExecutors; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.weakref.jmx.internal.guava.util.concurrent.Futures.transform; /** * @Author KeXin * @Date 2020/10/12 下午6:54 **/ public class ListenerFutureTest { public static void main(String[] args) { ListeningExecutorService executorService = MoreExecutors.listeningDecorator( new ThreadPoolExecutor(2, 10,20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy())); ListenableFuture<String> listenableFuture = executorService.submit(() -> { System.out.println("call execute.."); TimeUnit.SECONDS.sleep(6); return "I'am yellow duck"; }); listenableFuture.addListener(() -> { try { System.out.println("get listenable future's result -- " + listenableFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }, executorService); Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println("get listenable future's result with callback -- " + result); } @Override public void onFailure(Throwable t) { t.printStackTrace(); } }); Function<String, String> transFunction = queryResult -> "transformed --" + queryResult; ListenableFuture<String> transformFuture = transform(listenableFuture, transFunction, executorService); try { System.out.println(transformFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } // Function<Exception, ApplicationException> mapper = from -> { // if (from != null && from.getCause() instanceof ApplicationException) { // try { // throw (ApplicationException) from.getCause(); // } catch (ApplicationException e) { // e.printStackTrace(); // } // } // return new ApplicationException("1", null); // }; // CheckedFuture<String, ApplicationException> checkedFuture = Futures.makeChecked(listenableFuture, mapper); // try { // checkedFuture.checkedGet(1, TimeUnit.SECONDS); // } catch (TimeoutException | ApplicationException e) { // e.printStackTrace(); // } } }
还没有评论,来说两句吧...