Java——ThreadPool 布满荆棘的人生 2021-06-10 20:40 274阅读 0赞 # 1. 为什么使用线程池 # 当并发执行线程数量很多时,且每个线程执行很短的时间就结束了,这样,我们频繁的创建、销毁线程就大大降低了工作效率(创建和销毁线程需要时间、资源)。 java中的线程池可以达到这样的效果:一个线程执行完任务之后,继续去执行下一个任务,不被销毁,这样线程利用率提高了。 假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。 # 2. 线程池作用 # 线程池作用就是限制系统中执行线程的数量。 线程池可以应对突然大爆发量的访问,通过有限个固定线程为大量的操作服务,减少创建和销毁线程所需的时间。 * 减少了创建和销毁线程的次数,提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行; * 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场)。 * 提供更强大的功能,延时定时线程池。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5MDcyNG9r_size_16_color_FFFFFF_t_70] 根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。 一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。 # 3. 线程池的主要参数 # public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } * **corePoolSize**(线程池基本大小):当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize时,(除了利用提交新任务来创建和启动线程(按需构造),也可以通过 prestartCoreThread() 或 prestartAllCoreThreads() 方法来提前启动线程池中的基本线程。 * **maximumPoolSize**(线程池最大大小):线程池所允许的最大线程个数。当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。 * **keepAliveTime**(线程存活保持时间)当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数。 * **workQueue**(任务队列):用于传输和保存等待执行任务的阻塞队列。 * **threadFactory**(线程工厂):用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。 * **handler**(线程饱和策略):当线程池和队列都满了,再加入线程会执行此策略。 # 4. ThreadPool的创建,关闭,监控 # 一般通过Executors类下的四个成员函数创建相应的线程池: //创建一个定时任务的线程池 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4); //创建单线程的线程池 ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); //创建缓存线程池(重用先前的线程) ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); //创建一个带有固定线程的线程池 ExecutorService threadPool = Executors.newFixedThreadPool(4); 我们尽量优先使用Executors提供的静态方法来创建线程池,如果Executors提供的方法无法满足要求,再自己通过ThreadPoolExecutor类来创建线程池。 **向线程池提交任务:** ① 我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务是否被线程池执行成功。通过以下代码可知execute方法输入的任务是一个Runnable类的实例。 threadsPool.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }); ② 使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。 Future<Object> future = executor.submit(harReturnValuetask); try { Object s = future.get(); } catch (InterruptedException e) { // 处理中断异常 } catch (ExecutionException e) { // 处理无法执行任务异常 } finally { // 关闭线程池 executor.shutdown(); } **线程池的关闭:** 我们可以通过调用线程池(ExecutorService的对象): * 原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止 * shutdown方法,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表 * shutdownNow方法,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。 只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。 **线程池的监控:** 通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。 # 5. 举个栗子 # ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5MDcyNG9r_size_16_color_FFFFFF_t_70 1] 上代码之前,要先补充一下线程池构造的核心几个点 * 线程池里的核心线程数与最大线程数 * 线程池里真正工作的线程——worker * 线程池里用来存取任务的队列——BlockingQueue * 线程中的任务——task 实现BlockingQueue存放任务,然后每个worker取任务并执行: 首先定义一个线程池ThreadExcutor class ThreadExcutor{ //创建 private volatile boolean RUNNING = true; //所有任务都放队列中,让工作线程来消费 private static BlockingQueue<Runnable> queue = null; private final HashSet<Worker> workers = new HashSet<Worker>(); private final List<Thread> threadList = new ArrayList<Thread>(); //工作线程数 int poolSize = 0; //核心线程数(创建了多少个工作线程) int coreSize = 0; boolean shutdown = false; public ThreadExcutor(int poolSize){ this.poolSize = poolSize; queue = new LinkedBlockingQueue<Runnable>(poolSize); } public void exec(Runnable runnable) { if (runnable == null) throw new NullPointerException(); if(coreSize < poolSize){ addThread(runnable); }else{ //System.out.println("offer" + runnable.toString() + " " + queue.size()); try { queue.put(runnable); } catch (InterruptedException e) { e.printStackTrace(); } } } public void addThread(Runnable runnable){ coreSize ++; Worker worker = new Worker(runnable); workers.add(worker); Thread t = new Thread(worker); threadList.add(t); try { t.start(); }catch (Exception e){ e.printStackTrace(); } } public void shutdown() { RUNNING = false; if(!workers.isEmpty()){ for (Worker worker : workers){ worker.interruptIfIdle(); } } shutdown = true; Thread.currentThread().interrupt(); } //这里留个位置放内部类Worker } 然后定义一个内部类Worker,这个内部类Worker是用来执行每个任务的,在创建线程池后,往线程里添加任务,每个任务都是由Worker一个一个来启动的。 /** * 工作线程 */ class Worker implements Runnable{ public Worker(Runnable runnable){ queue.offer(runnable); } @Override public void run() { while (true && RUNNING){ if(shutdown == true){ Thread.interrupted(); } Runnable task = null; try { task = getTask(); task.run(); } catch (InterruptedException e) { e.printStackTrace(); } } } public Runnable getTask() throws InterruptedException { return queue.take(); } public void interruptIfIdle() { for (Thread thread :threadList) { System.out.println(thread.getName() + " interrupt"); thread.interrupt(); } } } 首先注意的一点,这个Worker是个内部类,是在线程池内声明的。 **exec方法** ![在这里插入图片描述][20210510194445865.png] **Worker怎么工作** ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5MDcyNG9r_size_16_color_FFFFFF_t_70 2] 这个工作线程实例化的时候就先加入一个任务到队列中,也就是说在实例化这个工作线程时,这个工作线程也是一个任务被加入到线程池中。然后就是run方法,这个run方法是线程调start方法生成的线程,而Worker调的run方法并没有生成新的线程。就是一个循环,一直在不停的从队列中取任务,然后执行。可以看到,取队列的方法是take(),这个方法意思如果队列为空了,取不到数据时就阻塞队列。 **然后看shutdown()** ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5MDcyNG9r_size_16_color_FFFFFF_t_70 3] shutdown方法并不是用线程那种强制停止的搞法,而是先用一个标识符告诉工作线程,不要再接任务了。然后通知工作线程,你可以interrupt()了,当所有的线程停止后记得要把主线程也停掉,这样,一个简单任务的线程池就完成了。 测试一下: public class TheadBlockedQ { public static void main(String[] args) throws InterruptedException { ThreadExcutor excutor = new ThreadExcutor(3); for (int i = 0; i < 10; i++) { excutor.exec(new Runnable() { @Override public void run() { System.out.println("线程 " + Thread.currentThread().getName() + " 在帮我干活"); } }); } excutor.shutdown(); } } 结果为: 线程 Thread-0 在帮我干活 线程 Thread-2 在帮我干活 线程 Thread-1 在帮我干活 线程 Thread-0 在帮我干活 线程 Thread-2 在帮我干活 线程 Thread-2 在帮我干活 线程 Thread-1 在帮我干活 线程 Thread-0 在帮我干活 Thread-0 interrupt Thread-1 interrupt Thread-2 interrupt Thread-0 interrupt Thread-1 interrupt Thread-2 interrupt Thread-0 interrupt Thread-1 interrupt Thread-2 interrupt [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5MDcyNG9r_size_16_color_FFFFFF_t_70]: /images/20210516/0e4261fb621d4f73826d2001a9a0f76c.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5MDcyNG9r_size_16_color_FFFFFF_t_70 1]: /images/20210516/bd3a3da26e364f5781798fc6ff1b5370.png [20210510194445865.png]: /images/20210516/1e54377b7b43431280a42d49df784c95.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5MDcyNG9r_size_16_color_FFFFFF_t_70 2]: /images/20210516/eb1d7014c7414e67a2c80f501c1f5d07.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2x5MDcyNG9r_size_16_color_FFFFFF_t_70 3]: /images/20210516/49a4a6ac1bd04e38a57b565f01f1944b.png
还没有评论,来说两句吧...