线程总结(八):线程池详解

上一篇总结了创建线程池的五种方法,现在挨个看一下其源码:

newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

返回一个 ThreadPoolExecutor 对象。

newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

返回一个 ScheduledThreadPoolExecutor 对象。

newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

FinalizableDelegatedExecutorService 是类 Executors 的内部类,它继承自 DelegatedExecutorService,DelegatedExecutorService 也是类Executors 的内部类,它继承自 AbstractExecutorService。

    static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

再看 DelegatedExecutorService 类:

    static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }

实际上执行的还是 ThreadPoolExecutor 对象中的方法。

newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

返回的是一个 ThreadPoolExecutor 对象。

然后我们可以发现线程池大致可以分为两类:

  • ThreadPoolExecutor

    • newCachedThreadPool

    • newSingleThreadExecutor

    • newFixedThreadPool

  • ScheduledThreadPoolExecutor

    • newScheduledThreadPool

那么我们来详细看一下这两个类:

ThreadPoolExecutor

先看参数:

  • int corePoolSize

    池中所保存的线程数,包括空闲线程。

  • int maximumPoolSize

    池中允许的最大线程数。

  • long keepAliveTime

    当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

  • TimeUnit unit

    keepAliveTime 参数的时间单位。

  • BlockingQueue workQueue

    执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。

  • ThreadFactory threadFactory

    执行程序创建新线程时使用的工厂。

  • RejectedExecutionHandler handler

    由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

核心线程数(corePoolSize)和最大线程数(maximumPoolSize)

由于 ThreadPoolExecutor 将根据 corePoolSize 和 maximumPoolSize 设置的边界自动调整池大小,当新任务在方法 execute(java.lang.Runnable) 中提交时:

  1. 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。

  2. 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建的线程池是大小固定的,如果运行的线程数与 corePoolSize 相同,当有新请求过来时,若 workQueue 未满,则将请求放入 workQueue 中,等到有空闲的线程去从 workQueue 中去任务并处理。

  3. 如果运行的线程数多余 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程去处理请求。

  4. 如果运行的线程多余 corePoolSize 并且等于 maximumPoolSize,若队列已满,则通过 RejectedExecutionHandler 所指定的策略来处理新请求。

  5. 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许线程池适应任意数量的并发任务。

ThreadPoolExecutor 提供了动态调整线程池容量的方法:setCorePoolSize() 和 setMaximumPoolSize()

核心线程的创建和启动

默认情况下,核心线程最初只是在新任务到达时才创建和启动的,也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 对其进行动态重写,如果构造带有非空队列的线程池,则可能希望预先启动线程。

创建新线程

使用 ThreadFactory 创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续进行,但不能执行任何任务。

线程保活时间

如果线程池中有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。这提供了当线程池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活跃,则可以创建新的线程。也可以使用 setKeepAliveTime(long, java.util.cincurrent.TimeUnit) 动态的更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的只在关闭前有效的终止状态禁用控件线程,默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略用于核心线程。

任务队列

BlockingQueue 用于保存等待执行的任务的阻塞队列。

  • 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。

  • 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。

  • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

排队通常有三种策略

  1. 直接提交:工作队列的默认选项是 SynchronousQueue,它将任务直接交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将会失败,因此会构造一个新的线程。该策略可以避免在处理可能具有内部以来的请求时出现锁。直接提交通常要求无界 maximumPoolSize 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

  2. 无界队列。使用无界队列将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

  3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

可以选择以下几个阻塞队列:

  • ArrayBlockingQueue

    它是一个基于数组结果的有界阻塞队列,此队列按 FIFO 原则对元素进行排序。

  • LinkedBlockingQueue

    一个基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。newFixedThreadPool() 使用该队列。

  • SynchronousQueue

    一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockQueue,newCachedThreadPool 使用该队列。

  • PrioriBlockingQueue

    一个具有优先级的无线阻塞队列。

任务饱和策略

当线程池和队列都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。以下是 JDK 1.5 提供的四种策略:

  1. AbortPolicy:直接抛出异常

  2. CallerRunsPolicy:只用调用者所在的线程来运行任务

  3. DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

  4. DiscardPolicy:不处理,丢弃掉。

向线程池提交任务

向线程池提交任务有两种方法:execute 和 submit 方法:

  • execute

    该方法提交任务没有返回值,所以无法判断任务是否被线程池执行成功。
threadsPool.execute(new Runnable() { 
     @Override 
     public void run() { 
         // TODO Auto-generated method stub 
     } 
 });
  • submit

    该方法会返回一个 future,可以通过这个 future 来判断任务是否执行成功,通过 future 的 get 方法来获取返回值,get 方法会阻塞直到任务完成,而使用 get(long timeout, TimeUnit unit) 方法会阻塞一段时间后立即返回,这时有可能任务还没有完成:
     Future<Object> future = executor.submit(harReturnValuetask);
     try {
         Object s = future.get();
     } catch (InterruptedException e) { 
         // 处理中断异常
     } catch (ExecutionException e) { 
         // 处理无法执行任务异常
     } finally { 
         // 关闭线程池 
         executor.shutdown();
     }

关闭线程池

关闭线程池有两个方法:shutdown() 和 shutdownNow():

  1. shutdown()

    不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完毕才终止,但不会再接收新任务。

  2. shutdownNow()

    立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。

线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

  • taskCount:线程池需要执行的任务数量。

  • completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于 taskCount。

  • largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。

  • getPoolSize: 线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。

  • getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。通过继承线程池并重写线程池的 beforeExecute,afterExecute 和 terminated 方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 是类 ThreadPoolExecutor 的子类,除了继承了父类的一些特性之外,它还可以另行安排在给定的延时后运行命令,或者定期执行命令。

虽然此类继承自 ThreadPoolExecutor,但是几个继承的调整方法对此类并无作用。特别是,因为它作为一个使用 corePoolSize 线程和一个无界队列的固定大小的池,所以调整 maximumPoolSize 没有什么效果

此类重写 AbstractExecutorService 的 submit 方法,以生成内部对象控制每个任务的延迟和调度。若要保留功能性,子类中任何进一步重写的这些方法都必须调用超类版本,超类版本有效地禁用附加任务的定制。但是,此类提供替代受保护的扩展方法 decorateTask(为 Runnable 和 Callable 各提供一种版本),可定制用于通过 execute、submit、schedule、scheduleAtFixedRate 和 scheduleWithFixedDelay 进入的执行命令的具体任务类型。默认情况下,ScheduledThreadPoolExecutor 使用一个扩展 FutureTask 的任务类型。

使用 schedule 方法可以创建并执行在给定延迟后启用的一次性操作。

public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.schedule(new RunnableTest(100), 5, TimeUnit.SECONDS);

    }
}

class RunnableTest implements Runnable {

    private int number;

    public RunnableTest(int number) {
        this.number = number;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getId() + " - " + number + "  start");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getId() + " - " + number + " end");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

-

Copyright© 2020-2022 li-xyz 冀ICP备2022001112号-1