关于线程池,你只需要看这篇!
1、什么是线程池以及为什么要有线程池?
1.1 什么是线程池?
线程池其实是一种池化的技术实现,池化技术的核心思想就是实现资源的复用,避免资源的重复创建和销毁而带来的性能开销。
线程池可以管理一批线程,让线程执行完任务后不被销毁,而是继续去处理其它任务。
1.2 为什么要有线程池?
线程池存在的主要原因 (好处) 有以下四个:
- 降低资源消耗。 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程降低系统资源消耗!
- 提高响应速度。 当任务到达时,任务能够立即被执行,而不需要等待线程创建好后才能被执行!
- 控制并发数量。 并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃(主要原因)!
- 统一管理线程。 线程是稀缺资源,如果无限制的创建不仅会消耗系统资源还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控!
2、线程池的两种创建方式
2.1 通过Executors工具类来创建
Executors
类中提供的几个静态方法来创建线程池:newCachedThreadPool
、newFixedThreadPool
、newSingleThreadExecutor
、newScheduledThreadPool
!
newCachedThreadPool:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程;若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程的手头任务执行完毕后,将会返回到线程池进行复用;线程池中的线程如果空闲时间达到60
秒将会被销毁,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源!注意,使用改线程池可能导致导致 OOM!
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
// 同步队列,任务队列最大长度为 Integer.MAX_VALUE
new SynchronousQueue<Runnable>());
}
newFixedThreadPool:该方法返回一个固定线程数量的线程池。核心线程数量和总线程数量相等,都是传入的参数nThreads,所以创建的线程都是核心线程、没有创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果有空闲的核心线程,则交给核心线程处理;如果没有空闲的核心线程,则所有的新增任务都将入队等待,直到有空闲核心线程。注意,使用改线程池可能导致导致 OOM!
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
// 无界队列,任务队列最大长度为 Integer.MAX_VALUE
new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor:该方法返回一个只有一个线程的线程池。有且仅有一个核心线程( corePoolSize =maximumPoolSize=1),不会创建非核心线程,所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列LinkedBlockingQueue里等待执行。注意,使用改线程池可能导致导致 OOM!
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
// 无界队列,任务队列最大长度为 Integer.MAX_VALUE
new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool:该方法返回一个定长线程池,支持执行定时任务。注意,使用改线程池可能导致导致 OOM!
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
// 延迟阻塞队列,任务队列最大长度为 Integer.MAX_VALUE
new DelayedWorkQueue());
}
2.2 通过ThreadPoolExecutor构造函数创建(推荐)
ThreadPoolExecutor一共提供了四个构造方法:
// 五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
// 六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
// 六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
// 七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
int corePoolSize:该线程池的核心线程数
核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。
int maximumPoolSize:该线程池中线程总数最大值
该值等于核心线程数量 + 非核心线程数量。
long keepAliveTime:非核心线程闲置超时时长
非核心线程如果处于闲置状态的时间超过该值,就会被销毁;如果调用线程池的方法allowCoreThreadTimeOut(true),则会也作用于核心线程。
TimeUnit unit:keepAliveTime的单位。
TimeUnit是一个枚举类型 ,包括MILLISECONDS(1毫秒)、NANOSECONDS (1微毫秒)
BlockingQueue workQueue:阻塞队列,维护着等待执行的Runnable任务对象
常用的几个阻塞队列 LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、DelayQueue。
ThreadFactory threadFactory:创建线程的工厂,线程池内部在创建线程时使用该工厂创建;如果不指定,会新建一个默认的线程工厂。
用于批量创建线程,线程工厂在创建线程时统一设置一些参数,如是否守护线程、线程的优先级等
RejectedExecutionHandler handler:拒绝处理策略,也叫饱和策略;线程数量大于最大线程数就会采用饱和策略,共有4种饱和策略。
ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常
ThreadPoolExecutor.DiscardPolicy:直接丢弃,但不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃最早未处理的任务请求,如果再次失败,重复此过程
ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用线程池的execute
方法的线程中执行被拒绝的任务,如果父线程已关闭,则会丢弃该任务。因此这种策略会降低对于新任务的提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略!
3、线程池的工作原理
Java中的各种线程池的顶层接口都是Executor
接口,ThreadPoolExecutor
就是这个接口的实现类,我们搞懂ThreadPoolExecutor
这个类也就明白了线程池的工作原理,现在让我们来看看ThreadPoolExecutor
这个类!
3.1 ThreadPoolExecutor提供的构造方法
再来回顾一下ThreadPoolExecutor提供的四个构造方法:
// 五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
// 六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
// 六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
// 七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
int corePoolSize:该线程池的核心线程数
核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。
int maximumPoolSize:该线程池中线程总数最大值
该值等于核心线程数量 + 非核心线程数量。
long keepAliveTime:非核心线程闲置超时时长
非核心线程如果处于闲置状态的时间超过该值,就会被销毁;如果调用线程池的方法allowCoreThreadTimeOut(true),则会也作用于核心线程。
TimeUnit unit:keepAliveTime的单位。
TimeUnit是一个枚举类型 ,包括MILLISECONDS(1毫秒)、NANOSECONDS (1微毫秒)
BlockingQueue workQueue:阻塞队列,维护着等待执行的Runnable任务对象
常用的几个阻塞队列 LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、DelayQueue。
ThreadFactory threadFactory:创建线程的工厂,线程池内部在创建线程时使用该工厂创建;如果不指定,会新建一个默认的线程工厂。
用于批量创建线程,线程工厂在创建线程时统一设置一些参数,如是否守护线程、线程的优先级等
RejectedExecutionHandler handler:拒绝处理策略,也叫饱和策略;线程数量大于最大线程数就会采用饱和策略,共有4种饱和策略。
ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常
ThreadPoolExecutor.DiscardPolicy:直接丢弃,但不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃最早未处理的任务请求,如果再次失败,重复此过程
ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用线程池的execute
方法的线程中执行被拒绝的任务,如果父线程已关闭,则会丢弃该任务。因此这种策略会降低对于新任务的提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略!
3.2 ThreadPoolExecutor的5种状态
每种线程池内部都有一个调度线程,这个线程就是用于管理整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等,故线程池也有自己的状态。ThreadPoolExecutor
类中使用了一些final int
常量变量来表示线程池的状态 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED!
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING: 线程池创建后处于RUNNING状态。
SHUTDOWN: 调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,但会等待阻塞队列的任务全部处理完成。
STOP: 调用shutdownNow()方法后处于STOP状态,线程池不再接受新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务。
TIDYING: 当所有的任务已终止,线程池会变为TIDYING状态,接着会执行terminated()
函数。
ThreadPoolExecutor中有一个控制状态的属性叫
ctl
,它是一个AtomicInteger类型的变量,线程池状态就是通过该成员变量ctl
来获取到的。
TERMINATED: 线程池处在TIDYING状态时,执行完terminated()方法之后,线程池就会被设置为TERMINATED状态。
3.3 ThreadPoolExecutor执行任务的主要流程
我们先来看看 JDK 1.8 源码中ThreadPoolExecutor
是如何处理线程任务的,重点看execute
方法:
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//任务队列
private final BlockingQueue<Runnable> workQueue;
private static int workerCountOf(int c) {
return c & CAPACITY;
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 1.当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果不小于corePoolSize,则将任务添加到workQueue队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.如果放入workQueue失败,则创建非核心线程执行任务,
// 如果这时创建非核心线程失败(当前线程总数不小于maximumPoolSize时),就会执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
第一步,通过ctl.get()
获取线程池的状态;第二步,入队前进行了一次isRunning
判断;第三步,入队之后又进行了一次isRunning
判断!
为什么需要double check线程池的状态?
在多线程环境下,线程池的状态时刻在变化,而ctl.get()
是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了;判断是否将command加入workque是用线程池当前的状态,倘若将command加入后没有进行double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么刚加入的command将永远不会被执行!
在 execute
方法中,多次调用 addWorker
方法!addWorker
这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则返回 false,我们也来简单看下 addWorker
方法的源码:
// 全局锁,并发操作必备
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合
private int largestPoolSize;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 添加新的工作线程到线程池
* @param firstTask 要执行
* @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
* @return 添加成功就返回true否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//这两句用来获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程池中工作的线程的数量
int wc = workerCountOf(c);
// core参数为false的话表明队列也满了,线程池大小变为 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将workcount的数量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程的状态改变了就再次执行上述操作
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
//(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
// firstTask == null证明只新建线程而不执行任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//更新当前工作线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程是否启动成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
//// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
if (workerAdded) {
t.start();
/// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从工作线程中移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
总结一下处理流程:
- 线程总数量 < corePoolSize,无论核心线程是否空闲,都会新建一个核心线程执行任务。注意,这一步需要获得全局锁!
- 线程总数量 >= corePoolSize时,新来的线程任务会进入等待队列中,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)!
- 当缓存队列满了,说明这个时候任务已经多到爆棚、需要一些“临时工”来处理这些任务了,于是开始创建非核心线程。注意,这一步也需要获得全局锁!
- 缓存队列满了,且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理新来的任务!
注意:如果遇到面试官问线程池的原理,你能说清楚3.1中各个参数的意义,以及3.3中的任务处理流程即可吊打99%的面试官!
如果还想让那最后1%的面试官跪下来唱征服😂,那你还需要了解一下线程池是如何做到线程复用的,感兴趣的小伙伴自己去研究!
4、关于线程池的几个问题
4.1 为什么很多公司不允许使用Executors创建线程池?
《阿里巴巴开发手册》建议我们不要直接使用Executors
工具类来创建线程池,而是通过ThreadPoolExecutor
的构建函数显示的创建,这样的处理方式需要开发同学更加明确线程池的运行规则,规避资源耗尽的风险。
但如果你以及你们团队对线程池非常熟悉,又确定业务规模不会大到资源耗尽的程度(比如线程数量或任务队列长度可能达到Integer.MAX_VALUE)时,其实是可以使用JDK提供的Executors
工具类,它能让我们的代码具有更强的可读性!
4.2 配置线程池需要考虑哪些因素?
线程数
线程数的设置主要取决于业务是 IO 密集型还是 CPU 密集型。
CPU密集型:指的是任务主要使用来进行大量的计算,没有什么导致线程阻塞,一般这种场景的线程数设置为 CPU核心数+1
!
IO 密集型:当任务需要大量的 io,比如磁盘 io、网络 io,单线程处理可能会出现大量的阻塞,所以针对 IO 密集型任务使用多线程可以提高处理速度,一般线程数设置为 CPU核心数x2
!
Java 中用来获取 CPU 核心数的方法是:Runtime.getRuntime().availableProcessors()
!
线程工厂
一般建议自定义线程工厂,定义线程工厂的时候设置好线程名称的前缀
,这样在查日志的时候就方便知道是哪个线程执行的代码。例如:
import java.util.concurrent.ThreadFactory;
public class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("zaohuojian-" + threadNumber.getAndIncrement());
return thread;
}
}
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new MyThreadFactory());
有界队列
一般需要设置有界队列的大小,比如 LinkedBlockingQueue
在构造的时候可以传入参数来限制队列中任务数据的大小,这样就不会因为无限往队列中扔任务导致系统的 OOM。比如:
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10),
new MyThreadFactory());
4.3 几种常见的对比
(1) Runnable
vs Callable
Runnable
接口不会返回结果或抛出检查异常,但是 Callable
接口可以。
所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable
接口,这样代码看起来会更加简洁。
Runnable.java
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}
Callable.java
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}
(2) execute()
vs submit()
- execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
- submit()方法用于提交需要返回值的任务,线程池会返回一个
Future
类型的对象,通过这个Future
对象可以判断任务是否执行成功,并且可以通过Future
的get
方法来获取返回值;其中,get()
方法会阻塞当前线程直到任务完成,而get(long timeout,TimeUnit unit)
方法如果任务在timeout
时间内任务还没有执行完就会抛出java.util.concurrent.TimeoutException
。
示例 1:使用 get()
方法获取返回值。
//这里只是为了演示使用,推荐使用 `ThreadPoolExecutor` 构造方法来创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<String> submit = executorService.submit(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
String s = submit.get();
System.out.println(s);
executorService.shutdown();
输出:
abc
示例 2:使用 get(long timeout,TimeUnit unit)
方法获取返回值。
//这里只是为了演示使用,推荐使用 `ThreadPoolExecutor` 构造方法来创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<String> submit = executorService.submit(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
String s = submit.get(3, TimeUnit.SECONDS);
System.out.println(s);
executorService.shutdown();
输出:
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
(3) shutdown()
VSshutdownNow()
shutdown()
:关闭线程池,线程池的状态变为SHUTDOWN
。线程池不再接受新任务,但会等待阻塞队列的任务全部处理完成!shutdownNow()
: 关闭线程池,线程池的状态变为STOP
。线程池不再接受新任务,也不会处理阻塞队列中的任务,而且会中断当前正在运行的任务!
(4) isTerminated()
VS isShutdown()
- 当调用
shutdown()
方法后,isShutdown()
返回为 true - 当调用
shutdown()
方法后,并且所有提交的任务处理完成后,isTerminated()
返回为 true
5、线程池几个典型的应用场景
5.1 并行计算
比如我们要处理 6 个文件,这 6 个文件的处理无需关心顺序,但是处理完毕后需要将这几个文件的处理结果进行统计整理并返回。为此我们定义了一个线程池和 count 为 6 的CountDownLatch
对象 ,使用线程池处理读取任务,每一个线程处理完之后就将 count-1,调用CountDownLatch
对象的 await()
方法,直到所有文件处理完毕之后,才会接着执行后面的逻辑。伪代码:
public class ParallelCompute {
// 处理文件的数量
private static final int threadCount = 6;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(推荐使用构造方法创建,这里只是为了演示)
ExecutorService threadPool = Executors.newFixedThreadPool(6);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
threadPool.execute(() -> {
try {
//处理文件的操作
//...
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//表示一个文件已经处理完成
countDownLatch.countDown();
}
});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}
}
5.2 异步任务
一个常见的异步任务场景是在Web开发中处理用户上传的文件。当用户上传一个文件时,通常需要进行一些耗时的处理,比如文件的解析、存储、压缩等操作。为了不阻塞Web服务器的主线程,我们可以使用线程池来异步处理这些任务,以提高系统的并发能力和响应速度。伪代码:
public class AsynTask {
// 创建一个固定大小的线程池
private ExecutorService executor = Executors.newFixedThreadPool(5);
// 用户上传文件的处理方法
public void handleFileUpload(File uploadedFile) {
// 异步处理文件上传任务
executor.submit(() -> {
parseFile(uploadedFile);
storeFile(uploadedFile);
compressFile(uploadedFile);
System.out.println("finish");
});
}
}
5.3 并发控制
一个常见的并发控制场景是控制对数据库的并发写请求。无论是关系型数据库Mysql还是非关系型数据库Elasticsearch等等,他们并发处理读写请求的能力都有一个上限,如果超过这个极值则很可能会压垮数据库、导致数据丢失,带来无法挽回的损失!伪代码:
public class SinkWriter {
private ExecutorService submitService;
//公共队列,生产者、消费者共用
private final BlockingQueue<SinkRequest> commonQueue;
public SinkWriter() {
//固定线程数量10,则写并发最大为10
submitService = Executors.newFixedThreadPool(10);
commonQueue = new LinkedBlockingQueue<>(1000);
buildWriters(10);
}
//单线程生产
public void put(SinkRequest sinkRequest) {
try {
commonQueue.put(sinkRequest);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//多线程消费
private void buildWriters(int threadNum) {
try {
for (int i = 0; i < threadNum; i++) {
submitService.submit(() -> {
//消费
SinkRequest request = commonQueue.poll();
//落库
//...
});
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
6、总结
6.1 使用线程池的4个好处
①降低资源消耗。 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程降低系统资源消耗!
②提高响应速度。 当任务到达时,任务能够立即被执行,而不需要等待线程创建好后才能被执行!
③控制并发数量。 并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃(主要原因)!
④统一管理线程。 线程是稀缺资源,如果无限制的创建不仅会消耗系统资源还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控!
6.2 创建线程池的两种方式
①通过Executors
工具类来创建,Executors
提供了4种内置的线程池创建方法。
②通过ThreadPoolExecutor
的构造函数创建,ThreadPoolExecutor
提供了4种构造函数。
6.3 线程池执行任务的主要流程
①线程总数量 < corePoolSize,无论核心线程是否空闲,都会新建一个核心线程执行任务。注意,这一步需要获得全局锁!
②线程总数量 >= corePoolSize时,新来的线程任务会进入等待队列中,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)!
③当缓存队列满了,说明这个时候任务已经多到爆棚、需要一些“临时工”来处理这些任务了,于是开始创建非核心线程。注意,这一步也需要获得全局锁!
④缓存队列满了,且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理新来的任务!
6.4 关于线程池几个问题
①不要直接使用Executors
工具类来创建线程池,而是通过ThreadPoolExecutor
的构建函数显示的创建,这样的处理方式需要开发同学更加明确线程池的运行规则,规避资源耗尽的风险!
②配置线程池需要考虑的3个因素:线程数、线程工厂、有界队列
③几种常见的对比:Runnable
vs Callable
、execute()
vs submit()
、shutdown()
VSshutdownNow()
、isTerminated()
VS isShutdown()
6.5 线程池几个典型的应用场景
①并行计算: 多线程处理多个excel小文件,处理完毕后,把多个excel小文件合并成一个大文件。
②异步任务: 文件上传,解析、存储、压缩。
③并发控制: 控制对数据库的并发写请求。