Java线程池和阻塞队列

痛定思痛。 2023-03-06 03:26 197阅读 0赞

一.Java线程池的优点

1.**降低资源消耗**:通过重复利用线程池中已创建好的线程来降低线程创建和销毁造成的消耗。

2.**提高响应速度**:当任务到达时,任务可以直接拿到线程池中已创建好的线程立即执行。

3.提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池任务增加过程:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3h5eTEwMjg_size_16_color_FFFFFF_t_70

注:此图为ThreadPoolExecutor的机制,但不代表全部,比如dubbo提供的线程池 EagerThreadPoolExecutor,就是先起线程到最大值,再进队列

二.ThreadPoolExecutor

1. ThreadPoolExecutor类中共提供四种构造方法:

a.拥有五个参数:

  1. int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue
  6. public ThreadPoolExecutor(int corePoolSize,
  7. int maximumPoolSize,
  8. long keepAliveTime,
  9. TimeUnit unit,
  10. BlockingQueue<Runnable> workQueue) {
  11. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  12. Executors.defaultThreadFactory(), defaultHandler);
  13. }

b.拥有六个参数:

比a多了参数:

  1. ThreadFactory threadFactory
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,
  7. ThreadFactory threadFactory) {
  8. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  9. threadFactory, defaultHandler);
  10. }

c.拥有六个参数:

比a多了参数:

  1. RejectedExecutionHandler handler
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,
  7. RejectedExecutionHandler handler) {
  8. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  9. Executors.defaultThreadFactory(), handler);
  10. }

d.拥有七个参数:

  1. ThreadFactory threadFactory,
  2. RejectedExecutionHandler handler
  3. public ThreadPoolExecutor(int corePoolSize,
  4. int maximumPoolSize,
  5. long keepAliveTime,
  6. TimeUnit unit,
  7. BlockingQueue<Runnable> workQueue,
  8. ThreadFactory threadFactory,
  9. RejectedExecutionHandler handler) {
  10. if (corePoolSize < 0 ||
  11. maximumPoolSize <= 0 ||
  12. maximumPoolSize < corePoolSize ||
  13. keepAliveTime < 0)
  14. throw new IllegalArgumentException();
  15. if (workQueue == null || threadFactory == null || handler == null)
  16. throw new NullPointerException();
  17. this.corePoolSize = corePoolSize;
  18. this.maximumPoolSize = maximumPoolSize;
  19. this.workQueue = workQueue;
  20. this.keepAliveTime = unit.toNanos(keepAliveTime);
  21. this.threadFactory = threadFactory;
  22. this.handler = handler;
  23. }

但是仔细看的话,ThreadPoolExecutor虽然提供了四个5到7个参数,但是在方法体中,均是 7 个参数。

2.参数说明:

1. corePoolSize**(必需):核心线程数,默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。**

2. maximumPoolSize**(必需):线程池维护线程的最大数量。**

3. keepAliveTime**(必需):线程闲置超时时长。如果超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。**

4. unit**(必需):指定keepAliveTime参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。**

5. workQueue**(必需):任务队列。通过线程池的execute()方法提交的Runnable对象将存储在该参数中。其采用阻塞队列实现。**

6. threadFactory**(可选):线程工厂,用于指定为线程池创建新线程的方式。**

7. handler**(可选):线程池对拒绝任务的处理策略。**

3.任务队列

此队列常用方法:

  1. // 将指定元素插入到此队列的尾部,如果此队列已满则直接抛出异常
  2. boolean add(E e)
  3. // 将指定元素插入到此队列的尾部,在成功时返回 true,如果此队列已满,则返回 false。
  4. boolean offer(E e)
  5. // 将指定元素插入到此队列的尾部,如有队列已满,则等待队列中的数据被消费。
  6. void put(E e)
  7. // 获取但不移除此队列的头;如果队列为空,则返回 null。
  8. E peek()
  9. // 获取并移除此队列的头,若队列为空,返回null。
  10. E poll()
  11. // 获取并移除此队列的头部,若队列为空,则发生阻塞,等待队列中被插入数据。
  12. E take()
  13. //移除此队列中所有可用的元素,并将它们添加到给定collection中。
  14. int drainTo(Collection<? super E> c)
  15. //最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定collection 中。
  16. int drainTo(Collection<? super E> c, int maxElements)
  17. // 移除队列中指定元素的单个实例,若队列为空,抛出NoSuchElementException异常。
  18. boolean remove(Object o)

任务队列是基于阻塞队列实现的,即采用生产者消费者模式,需要实现BlockingQueue接口。,Java并发包中的阻塞队列一共是7个,它们都是线程安全的。

BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。

3.1 LinkedBlockingQueue

1.LinkedBlockingQueue 是一个基于链表的无界阻塞队列(也可以指定队列长度),**当生产者往队列中放一个数据时,队列会从生产者手中获取数据并缓存在队列内部,而生产者立即返回,只有当队列缓存区达到所指定的最大容量时才会阻塞生产队列,直到消费者从队列中消费掉一份数据,生产者线程才会被唤醒,消费者这端的处理也基于同样的原理。**

2.**LinkedBlockingQueue可以高效的处理并发数据,这是因为生产者和消费者端分别采用了独立的锁来控制数据同步,所以在高并发的情况下生产者和消费者可以并行地操作队列中的数据,从而提高整个队列的并发性能。**

3.如果没有给 LinkedBlockingQueue 指定其容量大小,则默认为 Integer.MAX_VALUE,这样的话,**如果生产者的速度大于消费者的速度,则可能还没等到队列阻塞,系统内存就被消耗完了,从而导致内存溢出**

以下为一个简单的用阻塞队列实现的生产者消费者模式

  1. public static void main(String[] args) {
  2. LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(10);
  3. new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. int a = 1;
  7. while (true) {
  8. try {
  9. linkedBlockingQueue.put("第" + a++ + "次推送");
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. }).start();
  16. while (true) {
  17. Object ob = null;
  18. try {
  19. Thread.sleep(1000);
  20. ob = linkedBlockingQueue.take();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. if (null != ob) {
  25. System.out.println(ob);
  26. }
  27. }
  28. }

每次取固定条数的数据:

  1. public static void main(String[] args) {
  2. LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue (10);
  3. new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. int a = 0;
  7. while (true){
  8. try {
  9. linkedBlockingQueue.put("第"+ a++ +"次推送");
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. }).start();
  16. while (true){
  17. Object ob = null;
  18. try {
  19. if (linkedBlockingQueue.size() == 10){
  20. List data = new ArrayList();
  21. linkedBlockingQueue.drainTo(data);
  22. System.out.println("取一次数据:" + data);
  23. }
  24. Thread.sleep(1000);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }

3.2 ArrayBlockingQueue

1.基于数组的有界阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,在初始化时必须指定大小。

2.ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。**因为ArrayBlockingQueue采用的是数组作为数据存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。**

3.ArrayBlockingQueue**内部的阻塞队列由重入锁ReenterLock和Condition条件队列实现,并且实现的队列中的锁是没有分离的,所以ArrayBlockingQueue中可以通过构造方法设置公平访问或非公平访问,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序**。,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

使用方法如 LinkedBlockingQueue

3.3 PriorityBlockingQueue

1.一个支持优先级排序的无界阻塞队列,对元素没有要求。

2.可以实现Comparable接口也可以提供Comparator来对队列中的元素进行比较。

3.跟时间没有任何关系,仅仅是按照优先级取任务。每次出队都返回优先级最高或者最低的元素。

4.内部是使用平衡二叉树实现的,遍历不保证有序。

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.PriorityBlockingQueue;
  4. public class PriorityBlockingQueueTest implements Runnable, Comparable {
  5. public int num;
  6. public PriorityBlockingQueueTest(int num) {
  7. this.num = num;
  8. }
  9. public void run() {
  10. System.out.println("消费任务 - " + num);
  11. }
  12. public int compareTo(Object o) {
  13. PriorityBlockingQueueTest runnable = (PriorityBlockingQueueTest) o;
  14. return this.num < runnable.num ? 1 : -1;
  15. }
  16. public static void main(String[] args) {
  17. PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
  18. new Thread(new Runnable() {
  19. @Override
  20. public void run() {
  21. int num = 1;
  22. while (true) {
  23. try {
  24. Thread.sleep(1000);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. System.out.println("push : " + num);
  29. PriorityBlockingQueueTest priorityBlockingQueueTest = new PriorityBlockingQueueTest(num++);
  30. priorityBlockingQueue.put(priorityBlockingQueueTest);
  31. }
  32. }
  33. }).start();
  34. int cnt = 1;
  35. while (true) {
  36. if (priorityBlockingQueue.size() == 10){
  37. System.out.println("第 " + cnt++ + " 次取数据");
  38. List<PriorityBlockingQueueTest> priorityBlockingQueueTestList = new ArrayList<>();
  39. try {
  40. Thread.sleep(50);
  41. priorityBlockingQueue.drainTo(priorityBlockingQueueTestList);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. priorityBlockingQueueTestList.forEach(a -> a.run());
  46. }
  47. }
  48. }
  49. }

输出:

  1. push 1
  2. push 2
  3. push 3
  4. push 4
  5. push 5
  6. push 6
  7. push 7
  8. push 8
  9. push 9
  10. push 10
  11. 1 次取数据
  12. 消费任务 - 10
  13. 消费任务 - 9
  14. 消费任务 - 8
  15. 消费任务 - 7
  16. 消费任务 - 6
  17. 消费任务 - 5
  18. 消费任务 - 4
  19. 消费任务 - 3
  20. 消费任务 - 2
  21. 消费任务 - 1
  22. push 11
  23. push 12
  24. push 13
  25. push 14
  26. push 15
  27. push 16
  28. push 17
  29. push 18
  30. push 19
  31. push 20
  32. 2 次取数据
  33. 消费任务 - 20
  34. 消费任务 - 19
  35. 消费任务 - 18
  36. 消费任务 - 17
  37. 消费任务 - 16
  38. 消费任务 - 15
  39. 消费任务 - 14
  40. 消费任务 - 13
  41. 消费任务 - 12
  42. 消费任务 - 11

3.4 DelayQueue

1.DelayQueue类似于PriorityBlockingQueue,是**二叉堆实现的无界优先级阻塞队列**。

2.**要求元素都实现Delayed接口,通过执行时延从队列中提取任务,时间没到任务取不出来。同样要实现compareTo()方法,将队列中的任务排序(一般是按时间排序)**

3.**应用场景有缓存系统,任务调度系统,或者网吧上机下机系统还有的就是线上考试系统,比如考驾照开始答题时间不同,但是大家同样都有两小时的时间。这些系统虽然依靠遍历也能实现,但是耗时效率低下。**

  1. package thread_pool_test;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import java.util.concurrent.DelayQueue;
  6. import java.util.concurrent.Delayed;
  7. import java.util.concurrent.TimeUnit;
  8. public class DelayQueueTest implements Delayed {
  9. private String name;
  10. //开始时间
  11. private Date startTime;
  12. //定义时间工具类
  13. private TimeUnit timeUnit = TimeUnit.HOURS;
  14. public DelayQueueTest(String name, Date endTime) {
  15. this.name = name;
  16. this.startTime = endTime;
  17. }
  18. public void sendMessage() {
  19. System.out.println(this.name + "开始时间为:" + formatDateToStr(this.startTime) +
  20. ", 结束时间为:" + formatDateToStr(new Date()) + ", 现在结束");
  21. }
  22. @Override
  23. public long getDelay(TimeUnit unit) {
  24. return startTime.getTime() + 2 * 3600000 - System.currentTimeMillis();
  25. }
  26. @Override
  27. public int compareTo(Delayed delayed) {
  28. DelayQueueTest delayQueueTest = (DelayQueueTest) delayed;
  29. return this.getDelay(this.timeUnit) - delayQueueTest.getDelay(this.timeUnit) > 0 ? 1 : -1;
  30. }
  31. public static Date formatStrToDate(String dateStr) {
  32. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  33. Date transDate = null;
  34. try {
  35. transDate = sdf.parse(dateStr);
  36. } catch (ParseException e) {
  37. e.printStackTrace();
  38. }
  39. return transDate;
  40. }
  41. public static String formatDateToStr(Date date) {
  42. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  43. String transDate = sdf.format(date);
  44. return transDate;
  45. }
  46. public static void main(String[] args) {
  47. DelayQueue delayQueue = new DelayQueue();
  48. new Thread(new Runnable() {
  49. @Override
  50. public void run() {
  51. DelayQueueTest a = new DelayQueueTest("小红", DelayQueueTest.formatStrToDate("2020-08-10 01:19:00"));
  52. DelayQueueTest b = new DelayQueueTest("小黄", DelayQueueTest.formatStrToDate("2020-08-10 01:18:40"));
  53. DelayQueueTest c = new DelayQueueTest("小紫", DelayQueueTest.formatStrToDate("2020-08-10 01:20:00"));
  54. DelayQueueTest d = new DelayQueueTest("小王", DelayQueueTest.formatStrToDate("2020-08-10 01:22:00"));
  55. DelayQueueTest e = new DelayQueueTest("小李", DelayQueueTest.formatStrToDate("2020-08-10 01:21:30"));
  56. delayQueue.put(a);
  57. delayQueue.put(b);
  58. delayQueue.put(c);
  59. delayQueue.put(d);
  60. delayQueue.put(e);
  61. }
  62. }).start();
  63. while (true) {
  64. DelayQueueTest delayQueueTest = null;
  65. try {
  66. delayQueueTest = (DelayQueueTest) delayQueue.take();
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. delayQueueTest.sendMessage();
  71. }
  72. }
  73. }

输出:

  1. 小黄开始时间为:2020-08-10 01:18:40, 结束时间为:2020-08-10 03:18:40, 现在结束
  2. 小红开始时间为:2020-08-10 01:19:00, 结束时间为:2020-08-10 03:19:00, 现在结束
  3. 小紫开始时间为:2020-08-10 01:20:00, 结束时间为:2020-08-10 03:20:00, 现在结束
  4. 小李开始时间为:2020-08-10 01:21:30, 结束时间为:2020-08-10 03:21:30, 现在结束
  5. 小王开始时间为:2020-08-10 01:22:00, 结束时间为:2020-08-10 03:22:00, 现在结束

3.5 SynchronousQueue

1.**LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,可以从队列的两端插入和移除元素。**

2.**相比于其他队列,它多了一些带 Last或 First 的方法,代表对队列进行两端的操作。**

3.如果没有给 LinkedBlockingDeque 指定其容量大小,则默认为 Integer.MAX_VALUE。

实现以新加入的任务为优先消费的程序:

  1. public static void main(String[] args) {
  2. LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque(10);
  3. new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. int a = 1;
  7. while (a <= 20) {
  8. try {
  9. linkedBlockingDeque.putFirst("第" + a++ + "次推送");
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. }).start();
  16. while (true) {
  17. Object ob = null;
  18. try {
  19. Thread.sleep(1000);
  20. ob = linkedBlockingDeque.takeFirst();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. if (null != ob) {
  25. System.out.println(ob);
  26. }
  27. }
  28. }

输出:

  1. 10次推送
  2. 11次推送
  3. 12次推送
  4. 13次推送
  5. 14次推送
  6. 15次推送
  7. 16次推送
  8. 17次推送
  9. 18次推送
  10. 19次推送
  11. 20次推送
  12. 9次推送
  13. 8次推送
  14. 7次推送
  15. 6次推送
  16. 5次推送
  17. 4次推送
  18. 3次推送
  19. 2次推送
  20. 1次推送

3.6 SynchronousQueue

1.SynchronousQueue是**一个无缓冲不存储元素的阻塞队列**,消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用put()方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。

2.**其他存储元素的阻塞队的吞吐量会高一些,但因为队列对元素进行了缓存,所以及时响应性能可能会降低。**

3.**SynchronousQueue可以声明使用公平锁或非公平锁。**

3.7 LinkedTransferQueue

1.LinkedTransferQueue 实现了 TransferQueue 接口,**是一个由链表结构组成的无界阻塞队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法**。

2.LinkedTransferQueue 采用一种**预占模式当消费者线程取元素时,如果队列不为空,则直接取走数据,如果队列为空,则生成一个节点(节点元素为null)入队,然后消费者线程在这个节点等待,后面生产者线程入队时如果发现有一个元素为null的节点,生产者线程直接将元素填充到该节点,并唤醒在该节点等待的消费者线程取走元素**。

3.LinkedTransferQueue 是 ConcurrentLinkedQueue、SynchronousQueue(公平模式下转交元素)、LinkedBlockingQueue的集合体。**相较于 SynchronousQueue 多了一个可以存储的队列,相较于 LinkedBlockingQueue 可以直接传递元素,少了用锁来同步,因此性能更好**。

4.线程工厂(ThreadFactory)

用于设置创建线程的工厂

DefaultThreadFactory 为线程池提供的默认的线程工厂:

  1. static class DefaultThreadFactory implements ThreadFactory {
  2. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  3. private final ThreadGroup group;
  4. private final AtomicInteger threadNumber = new AtomicInteger(1);
  5. private final String namePrefix;
  6. DefaultThreadFactory() {
  7. // 声明安全管理器
  8. SecurityManager s = System.getSecurityManager();
  9. // 获取或创建线程组
  10. group = (s != null) ? s.getThreadGroup() :
  11. Thread.currentThread().getThreadGroup();
  12. // 为线程取个名
  13. namePrefix = "pool-" +
  14. poolNumber.getAndIncrement() +
  15. "-thread-";
  16. }
  17. // 创建一个新线程
  18. public Thread newThread(Runnable r) {
  19. Thread t = new Thread(group, r,
  20. namePrefix + threadNumber.getAndIncrement(),
  21. 0);
  22. // 设置为非守护线程,即设置为用户线程
  23. if (t.isDaemon())
  24. t.setDaemon(false);
  25. // 置线程t的优先级为5
  26. if (t.getPriority() != Thread.NORM_PRIORITY)
  27. t.setPriority(Thread.NORM_PRIORITY);
  28. return t;
  29. }
  30. }

5.拒绝策略(RejectedExecutionHandler handler)

5.1 AbortPolicy:默认策略,直接抛出一个异常

  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2. public AbortPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. throw new RejectedExecutionException("Task " + r.toString() +
  5. " rejected from " +
  6. e.toString());
  7. }
  8. }

5.2 DiscardPolicy: 直接丢弃任务

  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. public DiscardPolicy() { }
  3. // 什么也不做,这样就会丢弃任务
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5. }
  6. }

5.3 DiscardOldestPolicy:抛弃下一个将要被执行的任务(最旧任务)

  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. public DiscardOldestPolicy() { }
  3. // 如果执行器没有被关闭,则获取并忽略掉即将执行的任务,并重试执行任务r
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5. if (!e.isShutdown()) {
  6. e.getQueue().poll();
  7. e.execute(r);
  8. }
  9. }
  10. }

5.4 CallerRunsPolicy:在任务被拒绝添加后,会用调用execute函数的上层线程去执行被拒绝的任务。策略的缺点就是可能会阻塞主线程。

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. public CallerRunsPolicy() { }
  3. // 在调用者的线程中执行task r
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5. if (!e.isShutdown()) {
  6. r.run();
  7. }
  8. }
  9. }

三.jdk 提供的几种线程池

1.newFixedThreadPool

定长线程池:**最大线程数与核心线程数一样多,采用链表结构的无界队列 LinkedBlockingQueue**

任务流程:**向 newFixedThreadPool 线程池提交任务时,如果线程数少于核心线程,创建核心线程执行任务,如果线程数等于核心线程,把任务添加到 LinkedBlockingQueue 阻塞队列,如果有线程执行完任务,去阻塞队列取任务,继续执行。**

适用场景:**FixedThreadPool 能够保证当前的线程数能够比较稳定,适用于处理CPU密集型的任务确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,控制线程最大并发数。即适用执行长期的任务。**

注意事项:**newFixedThreadPool 使用了无界的阻塞队列 LinkedBlockingQueue,如果线程获取一个任务后,如果单个任务的执行时间比较长,会导致队列中累积的任务越积越多,导致机器内存不断飙升, 从而导致OOM。**

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  6. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  7. return new ThreadPoolExecutor(nThreads, nThreads,
  8. 0L, TimeUnit.MILLISECONDS,
  9. new LinkedBlockingQueue<Runnable>(),
  10. threadFactory);
  11. }

示例:

  1. public static void main(String[] args) {
  2. ExecutorService threadPool = Executors.newFixedThreadPool(3);
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. threadPool.execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. for (int i = 0; i < 5; i++) {
  9. try {
  10. Thread.sleep(500);
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println("第" + taskID + " 个任务的第" + i + " 次执行");
  15. }
  16. }
  17. });
  18. }
  19. // 关闭线程池
  20. threadPool.shutdown();
  21. System.out.println("主线程执行结束");
  22. }

2.newCachedThreadPool

可缓存线程池:**无核心线程,非核心线程数量为 Interger. MAX_VALUE,采用 无缓冲不存储元素的阻塞队列 SynchronousQueue,非核心线程空闲存活时间为60秒。所以如果长时间处于空闲的,该线程池不会占用任何资源。**

任务流程:**因为没有核心线程,所以任务直接加到SynchronousQueue队列,如果此时有空闲线程,则取出任务执行,如果没有空闲线程,就新建一个线程去执行。执行完任务的线程,可以存活60秒,如果在这期间接到任务,则可以继续活下去,否则将被销毁。**

适用场景:**适合执行大量、耗时少的任务**。

注意事项:**当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程**。这样会导致创建的线程过多,耗尽 CPU 和内存资源,导致OOM。

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  6. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  7. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  8. 60L, TimeUnit.SECONDS,
  9. new SynchronousQueue<Runnable>(),
  10. threadFactory);
  11. }

示例:

  1. public static void main(String[] args) {
  2. ExecutorService threadPool = Executors.newCachedThreadPool();
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. threadPool.execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. for (int i = 0; i < 5; i++) {
  9. try {
  10. Thread.sleep(500);
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println("第" + taskID + " 个任务的第" + i + " 次执行");
  15. }
  16. }
  17. });
  18. }
  19. // 关闭线程池
  20. threadPool.shutdown();
  21. System.out.println("主线程执行结束");
  22. }

3.newSingleThreadExecutor

单线程化线程池:**核心线程数和最大线程数均为1,采用链表结构的无界队列 LinkedBlockingQueue,keepAliveTime为0,执行完任务立即回收。如果当前线程意外终止,则会创建一个新的线程继续执行该任务**。

任务流程:**判断线程池中是否有一条线程在,如果没有,新建线程执行任务,如果有,则将任务加到阻塞队列,执行完线程后,再继续从队列中取。**

适用场景:**适用于串行按顺序执行任务的场景**。不适合可能引起IO阻塞性及影响UI线程响应的并发操作,如数据库操作、文件操作等。

注意事项:**使用了无界的阻塞队列 LinkedBlockingQueue,如果线程获取一个任务后,如果单个任务的执行时间比较长,会导致队列中累积的任务越积越多,导致机器内存不断飙升, 从而导致OOM。**

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }
  7. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  8. return new FinalizableDelegatedExecutorService
  9. (new ThreadPoolExecutor(1, 1,
  10. 0L, TimeUnit.MILLISECONDS,
  11. new LinkedBlockingQueue<Runnable>(),
  12. threadFactory));
  13. }

示例:

  1. public static void main(String[] args) {
  2. ExecutorService threadPool = Executors.newSingleThreadExecutor();
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. threadPool.execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. for (int i = 0; i < 5; i++) {
  9. try {
  10. Thread.sleep(500);
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println("第" + taskID + " 个任务的第" + i + " 次执行");
  15. }
  16. }
  17. });
  18. }
  19. // 关闭线程池
  20. threadPool.shutdown();
  21. System.out.println("主线程执行结束");
  22. }

4.newSingleThreadExecutor

定时线程池:**核心线程数固定,非核心最大线程数量为 Interger. MAX_VALUE,采用链表结构的无界队列 DelayedWorkQueue(用数组储存元素的优先级延迟阻塞队列),keepAliveTime为0,执行完任务立即回收。可以按某种速率周期执行scheduleAtFixedRate(),也可以在某个延迟后执行 scheduleWithFixedDelay()。**

任务流程:**当添加一个任务,线程池中的线程从 DelayedWorkQueue 中取任务,获取 time 大于等于当前时间的task,执行完后修改这个 task 的 time 为下次被执行的时间,然后把 task 放回DelayedWorkQueue 队列中**

适用场景:**执行定时或周期性的任务,需要限制线程数量的场景。**

注意事项:**线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。**

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  4. public static ScheduledExecutorService newScheduledThreadPool(
  5. int corePoolSize, ThreadFactory threadFactory) {
  6. return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  7. }

示例:

schedule方法:**延迟指定时间后执行一次。**

  1. public static void main(String[] args) {
  2. ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. // 延迟 5s 后执行任务
  6. threadPool.schedule(new Runnable() {
  7. @Override
  8. public void run() {
  9. try {
  10. Thread.sleep(500);
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println("第" + taskID + " 个任务");
  15. }
  16. }, 5, TimeUnit.SECONDS);
  17. }
  18. System.out.println("主线程执行结束");
  19. }

scheduleAtFixedRate方法:**以固定的频率执行,period(周期)指的是两次成功执行之间的时间 ,该示例的意思就是,5秒延迟后执行一次,然后每2秒循环执行。**

  1. public static void main(String[] args) {
  2. ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. //5秒后执行任务,以后每2秒循环执行一次
  6. threadPool.scheduleAtFixedRate(new Runnable() {
  7. @Override
  8. public void run() {
  9. try {
  10. Thread.sleep(500);
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println("第" + taskID + " 个任务");
  15. }
  16. }, 2, 5, TimeUnit.SECONDS);
  17. }
  18. System.out.println("主线程执行结束");
  19. }

scheduleWithFixedDelay:**以固定的延时执行,delay(延时)指的是一次执行终止和下一次执行开始之间的延迟该示例中,假设执行一次任务需要10秒,则第一次是在延迟5秒后执行一次,然后10秒后执行结束,再过2秒又开始执行,再过10秒结束,以此类推。**

  1. public static void main(String[] args) {
  2. ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. //5秒后执行任务,结束后,再过2秒循环执行一次,以此类推
  6. threadPool.scheduleWithFixedDelay(new Runnable() {
  7. @Override
  8. public void run() {
  9. System.out.println("第" + taskID + " 个任务");
  10. }
  11. }, 5, 2, TimeUnit.SECONDS);
  12. }
  13. System.out.println("主线程执行结束");
  14. }

5.newSingleThreadScheduledExecutor

单线程定时线程池: 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者周期期地执行。

适用场景:一些轻量级的定时操作,如定时查数据库,将数据加载到内存中。

  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  2. return new DelegatedScheduledExecutorService
  3. (new ScheduledThreadPoolExecutor(1));
  4. }
  5. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
  6. return new DelegatedScheduledExecutorService
  7. (new ScheduledThreadPoolExecutor(1, threadFactory));
  8. }

示例:

  1. public static void main(String[] args) {
  2. ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. //5秒后执行任务,以后每2秒循环执行一次
  6. threadPool.scheduleAtFixedRate(new Runnable() {
  7. @Override
  8. public void run() {
  9. try {
  10. Thread.sleep(500);
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println("第" + taskID + " 个任务");
  15. }
  16. }, 2, 5, TimeUnit.SECONDS);
  17. }
  18. System.out.println("主线程执行结束");
  19. }
  20. public static void main(String[] args) {
  21. ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
  22. for (int i = 1; i < 5; i++) {
  23. final int taskID = i;
  24. // 延迟 5s 后执行任务
  25. threadPool.schedule(new Runnable() {
  26. @Override
  27. public void run() {
  28. try {
  29. Thread.sleep(500);
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. }
  33. System.out.println("第" + taskID + " 个任务");
  34. }
  35. }, 5, TimeUnit.SECONDS);
  36. }
  37. System.out.println("主线程执行结束");
  38. }

四.线程池的几种状态

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3h5eTEwMjg_size_16_color_FFFFFF_t_70 1

1.RUNNING

a.该状态的线程池会接收新任务,并处理阻塞队列中的任务;
b.调用线程池的shutdown()方法,可以切换到SHUTDOWN状态;
c.调用线程池的shutdownNow()方法,可以切换到STOP状态;

2.SHUTDOWN

a.该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
b.队列为空,并且线程池中执行的任务也为空,进入TIDYING状态;

3.STOP

a.该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
b.线程池中执行的任务为空,进入TIDYING状态;

4.TIDYING

a.当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。
b.当线程池变为TIDYING状态时,会执行钩子函数terminated(),进入TERMINATED状态。

5.TERMINATED

a.该状态表示线程池彻底终止

五.线程池的 submit 方法

1.execute传参是Runnable,而submit传参是Callable或Runnable类型。

2.execute执行没有返回值,而submit会返回一个Future类型的对象。

3.execute执行的时候,如果有异常,会直接抛出异常,而submit在遇到异常的时候通常不会立刻抛出,而是会将异常暂时存储起来,等调用Future.get()方法的时候才会抛出异常。

  1. public <T> Future<T> submit(Callable<T> task);
  2. public <T> Future<T> submit(Runnable task, T result);
  3. public Future<?> submit(Runnable task);

1. submit(Callable task)

  1. public static void main(String[] args) {
  2. ExecutorService threadPool = Executors.newFixedThreadPool(3);
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. Future<String> future = threadPool.submit(new Callable<String>() {
  6. @Override
  7. public String call() throws Exception {
  8. return "第" + taskID + " 个任务";
  9. }
  10. });
  11. try {
  12. System.out.println(future.get());
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. } catch (ExecutionException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. // 关闭线程池
  20. threadPool.shutdown();
  21. System.out.println("主线程执行结束");
  22. }

2.submit(Runnable task)

Runnable 不会返回任何值,我们用这个测试下异常:

  1. public static void main(String[] args) {
  2. ExecutorService threadPool = Executors.newFixedThreadPool(3);
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. Future future = threadPool.submit(new Runnable() {
  6. @Override
  7. public void run() {
  8. throw new RuntimeException("这是第" + taskID + "个异常");
  9. }
  10. });
  11. try {
  12. System.out.println(future.get());
  13. } catch (Exception e) {
  14. System.out.println("在这里抛异常");
  15. }
  16. }
  17. // 关闭线程池
  18. threadPool.shutdown();
  19. System.out.println("主线程执行结束");
  20. }

3.submit(Runnable task, T result)

同样不会有返回值,T为一个预设的默认值而已,使当 get 时返回个默认值,而不必返回 null

  1. public static void main(String[] args) {
  2. ExecutorService threadPool = Executors.newFixedThreadPool(3);
  3. for (int i = 1; i < 5; i++) {
  4. final int taskID = i;
  5. Future future = threadPool.submit(new Runnable() {
  6. @Override
  7. public void run() {
  8. System.out.println("这是第" + taskID + "个任务");
  9. }
  10. }, "这是一个默认的返回值" + taskID);
  11. try {
  12. System.out.println(future.get());
  13. } catch (Exception e) {
  14. System.out.println("在这里抛异常");
  15. }
  16. }
  17. // 关闭线程池
  18. threadPool.shutdown();
  19. System.out.println("主线程执行结束");
  20. }

发表评论

表情:
评论列表 (有 0 条评论,197人围观)

还没有评论,来说两句吧...

相关阅读