上一篇总结了创建线程池的五种方法,现在挨个看一下其源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
返回一个 ThreadPoolExecutor 对象。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
返回一个 ScheduledThreadPoolExecutor 对象。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
FinalizableDelegatedExecutorService 是类 Executors 的内部类,它继承自 DelegatedExecutorService,DelegatedExecutorService 也是类Executors 的内部类,它继承自 AbstractExecutorService。
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
再看 DelegatedExecutorService 类:
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
实际上执行的还是 ThreadPoolExecutor 对象中的方法。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
返回的是一个 ThreadPoolExecutor 对象。
然后我们可以发现线程池大致可以分为两类:
ThreadPoolExecutor
newCachedThreadPool
newSingleThreadExecutor
newFixedThreadPool
ScheduledThreadPoolExecutor
那么我们来详细看一下这两个类:
先看参数:
int corePoolSize
池中所保存的线程数,包括空闲线程。
int maximumPoolSize
池中允许的最大线程数。
long keepAliveTime
当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
TimeUnit unit
keepAliveTime 参数的时间单位。
BlockingQueue
执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
ThreadFactory threadFactory
执行程序创建新线程时使用的工厂。
RejectedExecutionHandler handler
由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
由于 ThreadPoolExecutor 将根据 corePoolSize 和 maximumPoolSize 设置的边界自动调整池大小,当新任务在方法 execute(java.lang.Runnable) 中提交时:
如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建的线程池是大小固定的,如果运行的线程数与 corePoolSize 相同,当有新请求过来时,若 workQueue 未满,则将请求放入 workQueue 中,等到有空闲的线程去从 workQueue 中去任务并处理。
如果运行的线程数多余 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程去处理请求。
如果运行的线程多余 corePoolSize 并且等于 maximumPoolSize,若队列已满,则通过 RejectedExecutionHandler 所指定的策略来处理新请求。
如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许线程池适应任意数量的并发任务。
ThreadPoolExecutor 提供了动态调整线程池容量的方法:setCorePoolSize() 和 setMaximumPoolSize()
默认情况下,核心线程最初只是在新任务到达时才创建和启动的,也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 对其进行动态重写,如果构造带有非空队列的线程池,则可能希望预先启动线程。
使用 ThreadFactory 创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续进行,但不能执行任何任务。
如果线程池中有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。这提供了当线程池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活跃,则可以创建新的线程。也可以使用 setKeepAliveTime(long, java.util.cincurrent.TimeUnit) 动态的更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的只在关闭前有效的终止状态禁用控件线程,默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略用于核心线程。
BlockingQueue 用于保存等待执行的任务的阻塞队列。
如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
排队通常有三种策略
直接提交:工作队列的默认选项是 SynchronousQueue,它将任务直接交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将会失败,因此会构造一个新的线程。该策略可以避免在处理可能具有内部以来的请求时出现锁。直接提交通常要求无界 maximumPoolSize 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
无界队列。使用无界队列将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
可以选择以下几个阻塞队列:
ArrayBlockingQueue
它是一个基于数组结果的有界阻塞队列,此队列按 FIFO 原则对元素进行排序。
LinkedBlockingQueue
一个基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。newFixedThreadPool() 使用该队列。
SynchronousQueue
一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockQueue,newCachedThreadPool 使用该队列。
PrioriBlockingQueue
一个具有优先级的无线阻塞队列。
当线程池和队列都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。以下是 JDK 1.5 提供的四种策略:
AbortPolicy:直接抛出异常
CallerRunsPolicy:只用调用者所在的线程来运行任务
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:不处理,丢弃掉。
向线程池提交任务有两种方法:execute 和 submit 方法:
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}
关闭线程池有两个方法:shutdown() 和 shutdownNow():
shutdown()
不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完毕才终止,但不会再接收新任务。
shutdownNow()
立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
taskCount:线程池需要执行的任务数量。
completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于 taskCount。
largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
getPoolSize: 线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。通过继承线程池并重写线程池的 beforeExecute,afterExecute 和 terminated 方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。
ScheduledThreadPoolExecutor 是类 ThreadPoolExecutor 的子类,除了继承了父类的一些特性之外,它还可以另行安排在给定的延时后运行命令,或者定期执行命令。
虽然此类继承自 ThreadPoolExecutor,但是几个继承的调整方法对此类并无作用。特别是,因为它作为一个使用 corePoolSize 线程和一个无界队列的固定大小的池,所以调整 maximumPoolSize 没有什么效果。
此类重写 AbstractExecutorService 的 submit 方法,以生成内部对象控制每个任务的延迟和调度。若要保留功能性,子类中任何进一步重写的这些方法都必须调用超类版本,超类版本有效地禁用附加任务的定制。但是,此类提供替代受保护的扩展方法 decorateTask(为 Runnable 和 Callable 各提供一种版本),可定制用于通过 execute、submit、schedule、scheduleAtFixedRate 和 scheduleWithFixedDelay 进入的执行命令的具体任务类型。默认情况下,ScheduledThreadPoolExecutor 使用一个扩展 FutureTask 的任务类型。
使用 schedule 方法可以创建并执行在给定延迟后启用的一次性操作。
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new RunnableTest(100), 5, TimeUnit.SECONDS);
}
}
class RunnableTest implements Runnable {
private int number;
public RunnableTest(int number) {
this.number = number;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getId() + " - " + number + " start");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getId() + " - " + number + " end");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
-