Java/ExecutorService中多线程服务ExecuteService的使用

亦凉 2023-10-15 23:08 30阅读 0赞

什么是ExecutorService

ExecutorService 是 Java 中的一个接口,它扩展了 Executor 接口,并提供了更多的方法来处理多线程任务。它是 Java 中用于执行多线程任务的框架之一,可以创建一个线程池,将多个任务提交到线程池中执行。ExecutorService 接口提供了许多方法,如 shutdown()、shutdownNow()、submit()、execute()、invokeAll() 等,可以更方便地提交任务、执行任务、关闭线程池等操作。同时,ExecutorService 还提供了线程池的管理和监控功能,可以更好地控制和管理线程池中的线程。在实际应用中,ExecutorService 通常与 Callable 和 Future 接口一起使用,可以实现更加灵活和高效的多线程编程。

by zhengkai.blog.csdn.net

ExecutorService基本原理

ExecutorService 的实现原理主要是基于线程池的概念。当我们创建一个 ExecutorService 对象时,实际上就是创建了一个线程池。线程池中包含了若干个线程,这些线程可以执行我们提交的任务。当线程池中的线程空闲时,它们会等待任务的到来,一旦有任务提交,就会从线程池中选择一个空闲的线程执行该任务。如果线程池中的线程都在执行任务,那么新的任务就会被暂时放在任务队列中,等待线程空闲时再来执行。

在 ExecutorService 的实现中,任务的提交和执行是异步的,也就是说,我们提交任务时不会阻塞当前线程,而是将任务交给线程池中的线程去执行。当任务执行完成后,线程会将执行结果返回给我们。同时,我们可以通过调用 ExecutorService 的方法来管理和控制线程池,如增加或减少线程数量、关闭线程池等。总之,ExecutorService 的实现原理是基于线程池的概念,通过管理和调度线程,提高程序的效率和性能,同时避免线程阻塞和死锁等问题,从而更好地管理和调度线程,提高应用程序的并发处理能力。

6f142397b4d3405ca575ce67e61eb1af.png

附加:线程池中的五种状态

线程池中有五种状态,分别是 RUNNING、STOP、SHUTDOWN、TIDYING 和 TERMINATED。它们的含义和区别如下:

  • RUNNING:表示线程池处于运行状态,接受新的任务并且处理任务队列中的任务,直到线程池被显式地关闭。
  • SHUTDOWN:表示线程池处于关闭状态,不再接受新的任务,但是会尝试执行任务队列中的任务,直到任务队列为空。在任务队列为空后,线程池会进入 TIDYING 状态。
  • STOP:表示线程池处于停止状态,不再接受新的任务,也不会继续执行任务队列中的任务。此时,线程池会尝试中断正在执行的任务,并立即返回任务队列中的所有任务。在任务队列为空后,线程池会进入 TIDYING 状态。
  • TIDYING:表示线程池正在进行线程回收的操作,此时线程池中的所有任务都已经执行完成,而线程池中的线程也已经被销毁。在线程回收完成后,线程池会进入 TERMINATED 状态。
  • TERMINATED:表示线程池已经完全终止,不再接受任何任务,也不会执行任何任务。此时,线程池中的所有线程都已经被销毁,线程池对象也可以被垃圾回收。

总之,这五种状态代表了 ThreadPoolExecutor 在不同时间点的不同状态,分别表示线程池的运行状态、关闭状态、停止状态、回收状态和终止状态。它们的区别在于线程池在不同状态下的行为和状态转换。

ExecuteService提供了什么方法

使用ExecuteService代表Executors创建线程池

  • submit提交的是Callable方法,返回Future,说明submit是有返回值的
  • execute执行的是Runnable方法,没有返回值

所以submit和execute的区别是提交的方法和是否有返回值,取决于你的业务需求。

d5cfb0c28140446895522dc60b519429.png

  1. /*
  2. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  3. *
  4. * This code is free software; you can redistribute it and/or modify it
  5. * under the terms of the GNU General Public License version 2 only, as
  6. * published by the Free Software Foundation. Oracle designates this
  7. * particular file as subject to the "Classpath" exception as provided
  8. * by Oracle in the LICENSE file that accompanied this code.
  9. *
  10. * This code is distributed in the hope that it will be useful, but WITHOUT
  11. * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12. * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
  13. * version 2 for more details (a copy is included in the LICENSE file that
  14. * accompanied this code).
  15. *
  16. * You should have received a copy of the GNU General Public License version
  17. * 2 along with this work; if not, write to the Free Software Foundation,
  18. * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19. *
  20. * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21. * or visit www.oracle.com if you need additional information or have any
  22. * questions.
  23. */
  24. /*
  25. * This file is available under and governed by the GNU General Public
  26. * License version 2 only, as published by the Free Software Foundation.
  27. * However, the following notice accompanied the original version of this
  28. * file:
  29. *
  30. * Written by Doug Lea with assistance from members of JCP JSR-166
  31. * Expert Group and released to the public domain, as explained at
  32. * http://creativecommons.org/publicdomain/zero/1.0/
  33. */
  34. package java.util.concurrent;
  35. import java.util.Collection;
  36. import java.util.List;
  37. /**
  38. * An {@link Executor} that provides methods to manage termination and
  39. * methods that can produce a {@link Future} for tracking progress of
  40. * one or more asynchronous tasks.
  41. *
  42. * <p>An {@code ExecutorService} can be shut down, which will cause
  43. * it to reject new tasks. Two different methods are provided for
  44. * shutting down an {@code ExecutorService}. The {@link #shutdown}
  45. * method will allow previously submitted tasks to execute before
  46. * terminating, while the {@link #shutdownNow} method prevents waiting
  47. * tasks from starting and attempts to stop currently executing tasks.
  48. * Upon termination, an executor has no tasks actively executing, no
  49. * tasks awaiting execution, and no new tasks can be submitted. An
  50. * unused {@code ExecutorService} should be shut down to allow
  51. * reclamation of its resources.
  52. *
  53. * <p>Method {@code submit} extends base method {@link
  54. * Executor#execute(Runnable)} by creating and returning a {@link Future}
  55. * that can be used to cancel execution and/or wait for completion.
  56. * Methods {@code invokeAny} and {@code invokeAll} perform the most
  57. * commonly useful forms of bulk execution, executing a collection of
  58. * tasks and then waiting for at least one, or all, to
  59. * complete. (Class {@link ExecutorCompletionService} can be used to
  60. * write customized variants of these methods.)
  61. *
  62. * <p>The {@link Executors} class provides factory methods for the
  63. * executor services provided in this package.
  64. *
  65. * <h2>Usage Examples</h2>
  66. *
  67. * Here is a sketch of a network service in which threads in a thread
  68. * pool service incoming requests. It uses the preconfigured {@link
  69. * Executors#newFixedThreadPool} factory method:
  70. *
  71. * <pre> {@code
  72. * class NetworkService implements Runnable {
  73. * private final ServerSocket serverSocket;
  74. * private final ExecutorService pool;
  75. *
  76. * public NetworkService(int port, int poolSize)
  77. * throws IOException {
  78. * serverSocket = new ServerSocket(port);
  79. * pool = Executors.newFixedThreadPool(poolSize);
  80. * }
  81. *
  82. * public void run() { // run the service
  83. * try {
  84. * for (;;) {
  85. * pool.execute(new Handler(serverSocket.accept()));
  86. * }
  87. * } catch (IOException ex) {
  88. * pool.shutdown();
  89. * }
  90. * }
  91. * }
  92. *
  93. * class Handler implements Runnable {
  94. * private final Socket socket;
  95. * Handler(Socket socket) { this.socket = socket; }
  96. * public void run() {
  97. * // read and service request on socket
  98. * }
  99. * }}</pre>
  100. *
  101. * The following method shuts down an {@code ExecutorService} in two phases,
  102. * first by calling {@code shutdown} to reject incoming tasks, and then
  103. * calling {@code shutdownNow}, if necessary, to cancel any lingering tasks:
  104. *
  105. * <pre> {@code
  106. * void shutdownAndAwaitTermination(ExecutorService pool) {
  107. * pool.shutdown(); // Disable new tasks from being submitted
  108. * try {
  109. * // Wait a while for existing tasks to terminate
  110. * if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
  111. * pool.shutdownNow(); // Cancel currently executing tasks
  112. * // Wait a while for tasks to respond to being cancelled
  113. * if (!pool.awaitTermination(60, TimeUnit.SECONDS))
  114. * System.err.println("Pool did not terminate");
  115. * }
  116. * } catch (InterruptedException ex) {
  117. * // (Re-)Cancel if current thread also interrupted
  118. * pool.shutdownNow();
  119. * // Preserve interrupt status
  120. * Thread.currentThread().interrupt();
  121. * }
  122. * }}</pre>
  123. *
  124. * <p>Memory consistency effects: Actions in a thread prior to the
  125. * submission of a {@code Runnable} or {@code Callable} task to an
  126. * {@code ExecutorService}
  127. * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
  128. * any actions taken by that task, which in turn <i>happen-before</i> the
  129. * result is retrieved via {@code Future.get()}.
  130. *
  131. * @since 1.5
  132. * @author Doug Lea
  133. */
  134. public interface ExecutorService extends Executor {
  135. /**
  136. * Initiates an orderly shutdown in which previously submitted
  137. * tasks are executed, but no new tasks will be accepted.
  138. * Invocation has no additional effect if already shut down.
  139. *
  140. * <p>This method does not wait for previously submitted tasks to
  141. * complete execution. Use {@link #awaitTermination awaitTermination}
  142. * to do that.
  143. *
  144. * @throws SecurityException if a security manager exists and
  145. * shutting down this ExecutorService may manipulate
  146. * threads that the caller is not permitted to modify
  147. * because it does not hold {@link
  148. * java.lang.RuntimePermission}{@code ("modifyThread")},
  149. * or the security manager's {@code checkAccess} method
  150. * denies access.
  151. */
  152. void shutdown();
  153. /**
  154. * Attempts to stop all actively executing tasks, halts the
  155. * processing of waiting tasks, and returns a list of the tasks
  156. * that were awaiting execution.
  157. *
  158. * <p>This method does not wait for actively executing tasks to
  159. * terminate. Use {@link #awaitTermination awaitTermination} to
  160. * do that.
  161. *
  162. * <p>There are no guarantees beyond best-effort attempts to stop
  163. * processing actively executing tasks. For example, typical
  164. * implementations will cancel via {@link Thread#interrupt}, so any
  165. * task that fails to respond to interrupts may never terminate.
  166. *
  167. * @return list of tasks that never commenced execution
  168. * @throws SecurityException if a security manager exists and
  169. * shutting down this ExecutorService may manipulate
  170. * threads that the caller is not permitted to modify
  171. * because it does not hold {@link
  172. * java.lang.RuntimePermission}{@code ("modifyThread")},
  173. * or the security manager's {@code checkAccess} method
  174. * denies access.
  175. */
  176. List<Runnable> shutdownNow();
  177. /**
  178. * Returns {@code true} if this executor has been shut down.
  179. *
  180. * @return {@code true} if this executor has been shut down
  181. */
  182. boolean isShutdown();
  183. /**
  184. * Returns {@code true} if all tasks have completed following shut down.
  185. * Note that {@code isTerminated} is never {@code true} unless
  186. * either {@code shutdown} or {@code shutdownNow} was called first.
  187. *
  188. * @return {@code true} if all tasks have completed following shut down
  189. */
  190. boolean isTerminated();
  191. /**
  192. * Blocks until all tasks have completed execution after a shutdown
  193. * request, or the timeout occurs, or the current thread is
  194. * interrupted, whichever happens first.
  195. *
  196. * @param timeout the maximum time to wait
  197. * @param unit the time unit of the timeout argument
  198. * @return {@code true} if this executor terminated and
  199. * {@code false} if the timeout elapsed before termination
  200. * @throws InterruptedException if interrupted while waiting
  201. */
  202. boolean awaitTermination(long timeout, TimeUnit unit)
  203. throws InterruptedException;
  204. /**
  205. * Submits a value-returning task for execution and returns a
  206. * Future representing the pending results of the task. The
  207. * Future's {@code get} method will return the task's result upon
  208. * successful completion.
  209. *
  210. * <p>
  211. * If you would like to immediately block waiting
  212. * for a task, you can use constructions of the form
  213. * {@code result = exec.submit(aCallable).get();}
  214. *
  215. * <p>Note: The {@link Executors} class includes a set of methods
  216. * that can convert some other common closure-like objects,
  217. * for example, {@link java.security.PrivilegedAction} to
  218. * {@link Callable} form so they can be submitted.
  219. *
  220. * @param task the task to submit
  221. * @param <T> the type of the task's result
  222. * @return a Future representing pending completion of the task
  223. * @throws RejectedExecutionException if the task cannot be
  224. * scheduled for execution
  225. * @throws NullPointerException if the task is null
  226. */
  227. <T> Future<T> submit(Callable<T> task);
  228. /**
  229. * Submits a Runnable task for execution and returns a Future
  230. * representing that task. The Future's {@code get} method will
  231. * return the given result upon successful completion.
  232. *
  233. * @param task the task to submit
  234. * @param result the result to return
  235. * @param <T> the type of the result
  236. * @return a Future representing pending completion of the task
  237. * @throws RejectedExecutionException if the task cannot be
  238. * scheduled for execution
  239. * @throws NullPointerException if the task is null
  240. */
  241. <T> Future<T> submit(Runnable task, T result);
  242. /**
  243. * Submits a Runnable task for execution and returns a Future
  244. * representing that task. The Future's {@code get} method will
  245. * return {@code null} upon <em>successful</em> completion.
  246. *
  247. * @param task the task to submit
  248. * @return a Future representing pending completion of the task
  249. * @throws RejectedExecutionException if the task cannot be
  250. * scheduled for execution
  251. * @throws NullPointerException if the task is null
  252. */
  253. Future<?> submit(Runnable task);
  254. /**
  255. * Executes the given tasks, returning a list of Futures holding
  256. * their status and results when all complete.
  257. * {@link Future#isDone} is {@code true} for each
  258. * element of the returned list.
  259. * Note that a <em>completed</em> task could have
  260. * terminated either normally or by throwing an exception.
  261. * The results of this method are undefined if the given
  262. * collection is modified while this operation is in progress.
  263. *
  264. * @param tasks the collection of tasks
  265. * @param <T> the type of the values returned from the tasks
  266. * @return a list of Futures representing the tasks, in the same
  267. * sequential order as produced by the iterator for the
  268. * given task list, each of which has completed
  269. * @throws InterruptedException if interrupted while waiting, in
  270. * which case unfinished tasks are cancelled
  271. * @throws NullPointerException if tasks or any of its elements are {@code null}
  272. * @throws RejectedExecutionException if any task cannot be
  273. * scheduled for execution
  274. */
  275. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  276. throws InterruptedException;
  277. /**
  278. * Executes the given tasks, returning a list of Futures holding
  279. * their status and results
  280. * when all complete or the timeout expires, whichever happens first.
  281. * {@link Future#isDone} is {@code true} for each
  282. * element of the returned list.
  283. * Upon return, tasks that have not completed are cancelled.
  284. * Note that a <em>completed</em> task could have
  285. * terminated either normally or by throwing an exception.
  286. * The results of this method are undefined if the given
  287. * collection is modified while this operation is in progress.
  288. *
  289. * @param tasks the collection of tasks
  290. * @param timeout the maximum time to wait
  291. * @param unit the time unit of the timeout argument
  292. * @param <T> the type of the values returned from the tasks
  293. * @return a list of Futures representing the tasks, in the same
  294. * sequential order as produced by the iterator for the
  295. * given task list. If the operation did not time out,
  296. * each task will have completed. If it did time out, some
  297. * of these tasks will not have completed.
  298. * @throws InterruptedException if interrupted while waiting, in
  299. * which case unfinished tasks are cancelled
  300. * @throws NullPointerException if tasks, any of its elements, or
  301. * unit are {@code null}
  302. * @throws RejectedExecutionException if any task cannot be scheduled
  303. * for execution
  304. */
  305. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  306. long timeout, TimeUnit unit)
  307. throws InterruptedException;
  308. /**
  309. * Executes the given tasks, returning the result
  310. * of one that has completed successfully (i.e., without throwing
  311. * an exception), if any do. Upon normal or exceptional return,
  312. * tasks that have not completed are cancelled.
  313. * The results of this method are undefined if the given
  314. * collection is modified while this operation is in progress.
  315. *
  316. * @param tasks the collection of tasks
  317. * @param <T> the type of the values returned from the tasks
  318. * @return the result returned by one of the tasks
  319. * @throws InterruptedException if interrupted while waiting
  320. * @throws NullPointerException if tasks or any element task
  321. * subject to execution is {@code null}
  322. * @throws IllegalArgumentException if tasks is empty
  323. * @throws ExecutionException if no task successfully completes
  324. * @throws RejectedExecutionException if tasks cannot be scheduled
  325. * for execution
  326. */
  327. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  328. throws InterruptedException, ExecutionException;
  329. /**
  330. * Executes the given tasks, returning the result
  331. * of one that has completed successfully (i.e., without throwing
  332. * an exception), if any do before the given timeout elapses.
  333. * Upon normal or exceptional return, tasks that have not
  334. * completed are cancelled.
  335. * The results of this method are undefined if the given
  336. * collection is modified while this operation is in progress.
  337. *
  338. * @param tasks the collection of tasks
  339. * @param timeout the maximum time to wait
  340. * @param unit the time unit of the timeout argument
  341. * @param <T> the type of the values returned from the tasks
  342. * @return the result returned by one of the tasks
  343. * @throws InterruptedException if interrupted while waiting
  344. * @throws NullPointerException if tasks, or unit, or any element
  345. * task subject to execution is {@code null}
  346. * @throws TimeoutException if the given timeout elapses before
  347. * any task successfully completes
  348. * @throws ExecutionException if no task successfully completes
  349. * @throws RejectedExecutionException if tasks cannot be scheduled
  350. * for execution
  351. */
  352. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  353. long timeout, TimeUnit unit)
  354. throws InterruptedException, ExecutionException, TimeoutException;
  355. }

new ExecutorService的创建

创建一个什么样的ExecutorService的实例(即线程池)需要g根据具体应用场景而定,不过Java给我们提供了一个Executors工厂类,它可以帮助我们很方便的创建各种类型ExecutorService线程池,Executors一共可以创建下面这四类线程池:

  • newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

注意:Executors只是一个工厂类,它所有的方法返回的都是ThreadPoolExecutor、ScheduledThreadPoolExecutor这两个类的实例。

execute(Runnable)

这个方法接收一个Runnable实例,并且异步的执行,缺点就是只负责执行不负责返回:

  1. ExecutorService executorService = Executors.newSingleThreadExecutor();
  2. executorService.execute(new Runnable() {
  3. public void run() {
  4. System.out.println("Asynchronous task");
  5. }
  6. });
  7. executorService.shutdown();

submit(Runnable)

submit(Runnable)和execute(Runnable)区别是前者可以返回一个Future对象,通过返回的Future对象,我们可以检查提交的任务是否执行完毕,请看下面执行的例子:

  1. Future future = executorService.submit(new Runnable() {
  2. public void run() {
  3. System.out.println("Asynchronous task");
  4. }
  5. });
  6. future.get();
  7. //如果返回了空,则代表没有任务正在执行中,任务已经运行完毕

如果任务执行完成,future.get()方法会返回一个null。注意,future.get()方法会产生阻塞。

submit(Callable)

submit(Callable)和submit(Runnable)类似,也会返回一个Future对象,但是除此之外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值。请看下面实例:

  1. Future future = executorService.submit(new Callable(){
  2. public Object call() throws Exception {
  3. System.out.println("Asynchronous Callable");
  4. return "Callable Result";
  5. }
  6. });
  7. System.out.println("future.get() = " + future.get());

如果任务执行完成,future.get()方法会返回Callable任务的执行结果。注意,future.get()方法会产生阻塞。

invokeAny(Collection<? extends Callable> tasks)

invokeAny(Collection<? extends Callable> tasks)方法接收的是一个Callable的集合,执行这个方法不会返回Future,但是会返回所有Callable任务中其中一个任务的执行结果。这个方法也无法保证返回的是哪个任务的执行结果,反正是其中的某一个。

以下代码每次执行都会返回一个结果,并且返回的结果是变化的,可能会返回“Task2”也可是“Task1”或者“Task3”。

  1. ExecutorService executorService = Executors.newSingleThreadExecutor();
  2. Set<Callable<String>> callables = new HashSet<Callable<String>>();
  3. callables.add(new Callable<String>() {
  4. public String call() throws Exception {
  5. return "Task 1";
  6. }
  7. });
  8. callables.add(new Callable<String>() {
  9. public String call() throws Exception {
  10. return "Task 2";
  11. }
  12. });
  13. callables.add(new Callable<String>() {
  14. public String call() throws Exception {
  15. return "Task 3";
  16. }
  17. });
  18. String result = executorService.invokeAny(callables);
  19. System.out.println("result = " + result);
  20. executorService.shutdown();

invokeAll(Collection<? extends Callable> tasks)

invokeAll(…)与 invokeAny(…)类似也是接收一个Callable集合,但是前者执行之后会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象。r

  1. ExecutorService executorService = Executors.newSingleThreadExecutor();
  2. Set<Callable<String>> callables = new HashSet<Callable<String>>();
  3. callables.add(new Callable<String>() {
  4. public String call() throws Exception {
  5. return "Task 1";
  6. }
  7. });
  8. callables.add(new Callable<String>() {
  9. public String call() throws Exception {
  10. return "Task 2";
  11. }
  12. });
  13. callables.add(new Callable<String>() {
  14. public String call() throws Exception {
  15. return "Task 3";
  16. }
  17. });
  18. List<Future<String>> futures = executorService.invokeAll(callables);
  19. for(Future<String> future : futures){
  20. System.out.println("future.get = " + future.get());
  21. }
  22. executorService.shutdown();

优雅的关闭ExecutorService

如果的应用程序是通过main()方法启动的,在这个main()退出之后,如果应用程序中的ExecutorService没有关闭,这个应用将一直运行。之所以会出现这种情况,是因为ExecutorService中运行的线程会阻止JVM关闭。

如果要关闭ExecutorService中执行的线程,我们可以调用ExecutorService.shutdown()方法。在调用shutdown()方法之后,ExecutorService不会立即关闭,但是它不再接收新的任务,直到当前所有线程执行完成才会关闭,所有在shutdown()执行之前提交的任务都会被执行。

如果我们想立即关闭ExecutorService,我们可以调用ExecutorService.shutdownNow()方法。这个动作将跳过所有正在执行的任务和被提交还没有执行的任务。但是它并不对正在执行的任务做任何保证,有可能它们都会停止,也有可能执行完成。

优雅一点就是

  1. //使用这种方式,ExecutorService 首先停止执行新任务,等待指定的时间段完成所有任务。如果该时间到期,则立即停止执行。
  2. executorService.shutdown();
  3. try {
  4. if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
  5. executorService.shutdownNow();
  6. }
  7. } catch (InterruptedException e) {
  8. executorService.shutdownNow();
  9. }

SpringBoot中注入ExecutorService

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.scheduling.annotation.EnableAsync;
  5. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  6. import java.util.concurrent.Executor;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.ThreadPoolExecutor;
  10. /**
  11. * 多线程池配置
  12. */
  13. @Slf4j
  14. @EnableAsync
  15. @Configuration
  16. public class ThreadExecutorConfig {
  17. /**
  18. * SpringBoot会优先使用名称为"taskExecutor"的线程池。
  19. * 如果没有找到,才会使用其他类型为TaskExecutor或其子类的线程池。
  20. *
  21. * @return
  22. */
  23. @Bean
  24. public Executor taskExecutor() {
  25. log.info("start taskExecutor");
  26. int size = Runtime.getRuntime().availableProcessors();
  27. //return new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
  28. ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
  29. // 配置核心线程数
  30. executor.setCorePoolSize(2);
  31. // 设置最大线程数
  32. executor.setMaxPoolSize(8);
  33. // 设置队列容量
  34. executor.setQueueCapacity(20);
  35. // 设置线程活跃时间(秒)
  36. executor.setKeepAliveSeconds(0);
  37. // 配置线程池中的线程的名称前缀
  38. executor.setThreadNamePrefix("executor-");
  39. // 设置拒绝策略
  40. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
  41. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
  42. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  43. // 等待所有任务结束后再关闭线程池
  44. executor.setWaitForTasksToCompleteOnShutdown(true);
  45. // 执行初始化
  46. executor.initialize();
  47. return executor;
  48. }
  49. //Bean销毁时会执行线程池销毁方法
  50. @Bean(destroyMethod = "shutdown")
  51. public ExecutorService executorService() {
  52. return Executors.newScheduledThreadPool(10);
  53. }
  54. }
  55. @AutoWired
  56. private ExecutorService executorService;

使用Callable+FutureTask

  1. public class Test {
  2. public static void main(String[] args) {
  3. //第一种方式
  4. ExecutorService executor = Executors.newCachedThreadPool();
  5. Task task = new Task();
  6. FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
  7. executor.submit(futureTask);
  8. executor.shutdown();
  9. //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
  10. /*Task task = new Task();
  11. FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
  12. Thread thread = new Thread(futureTask);
  13. thread.start();*/
  14. try {
  15. Thread.sleep(1000);
  16. } catch (InterruptedException e1) {
  17. e1.printStackTrace();
  18. }
  19. System.out.println("主线程在执行任务");
  20. try {
  21. System.out.println("task运行结果"+futureTask.get());
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. } catch (ExecutionException e) {
  25. e.printStackTrace();
  26. }
  27. System.out.println("所有任务执行完毕");
  28. }
  29. }
  30. class Task implements Callable<Integer>{
  31. @Override
  32. public Integer call() throws Exception {
  33. System.out.println("子线程在进行计算");
  34. Thread.sleep(3000);
  35. int sum = 0;
  36. for(int i=0;i<100;i++)
  37. sum += i;
  38. return sum;
  39. }
  40. }

#

Future 接口

submit() 方法和 invokeAll() 方法返回一个 Future 接口的对象或 Future 类型的对象集合。这些 Future 接口的对象允许我们获取任务执行的结果或检查任务的状态 ( 是正在运行还是执行完毕 )。

Future 接口 get() 方法

Future 接口提供了一个特殊的阻塞方法 get(),它返回 Callable 任务执行的实际结果,但如果是 Runnable 任务,则只会返回 null。

因为 get() 方法是阻塞的。如果调用 get() 方法时任务仍在运行,那么调用将会一直被执阻塞,直到任务正确执行完毕并且结果可用时才返回。

而且更重要的是,正在被执行的任务随时都可能抛出异常或中断执行。因此我们要将 get() 调用放在 try catch 语句块中,并捕捉 InterruptedExceptionExecutionException 异常。

  1. Future<String> future = executorService.submit(callableTask);
  2. String result = null;
  3. try {
  4. result = future.get();
  5. } catch (InterruptedException | ExecutionException e) {
  6. e.printStackTrace();
  7. }

因为 get() 方法是阻塞的,而且并不知道要阻塞多长时间。因此可能导致应用程序的性能降低。如果结果数据并不重要,那么我们可以使用超时机制来避免长时间阻塞。

  1. String result = future.get(200, TimeUnit.MILLISECONDS);

这个 get() 的重载,第一个参数为超时的时间,第二个参数为时间的单位。上面的实例所表示就的就是等待 200 毫秒。

注意,这个 get() 重载方法,如果在超时时间内正常结束,那么返回的是 Future 类型的结果,如果超时了还没结束,那么将抛出 TimeoutException 异常。

除了 get() 方法之外,Future 还提供了其它很多方法,我们将几个重要的方法罗列在此






















方法 说明
isDone() 检查已分配的任务是否已处理
cancel() 取消任务执行
isCancelled() 检查任务是否已取消

这些方法的使用方式如下

  1. boolean isDone = future.isDone();
  2. boolean canceled = future.cancel(true);
  3. boolean isCancelled = future.isCancelled();

发表评论

表情:
评论列表 (有 0 条评论,30人围观)

还没有评论,来说两句吧...

相关阅读