C++11标准引入thread对象,从此以后在Windows/Linux中再也不用在Createthread和pthread函数切换了,但是C++11并没有给出线程池的实现。jdk中ThreadPoolexecutor是由美国计算机科学家、纽约州立大学 Oswego 分校教授Doug Lea编写的,设计ThreadPoolexecutor时作者考虑的诸多细节,最大的特点是极致的追求优雅的退出和平稳的拒绝策略,学习ThreadPoolexecutor代码可以并发编程的诸多技术。
一、ThreadPoolexecutor分析
ThreadPoolexecutor有两种方式执行任务方式,一种带返回值的submit,一种是不带返回值的execute,重点来看execute函数,submit核心也是调用execute:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}同一个原子Integer类型变量ctl,以Integer为32位为例,前3位用来表示线程池状态,依次是Runing<0<Shutdown<Stop,注意!Runing是个负数;ctl后面29位用来存储线程池中线程的数量。线程池里定义了两类线程,一种是核心线程另一种是非核心线程,corePoolSize为核心线程数量,maximumPoolSize为最大线程数量,需要注意的是,创建线程时,并未规定创建的线程是否是核心线程。每个线程在检查工作线程数量时,当下没超过corePoolSize时都是核心线程,超过corePoolSize时为非核心线程,在介绍工作线程运行函数runWorker时会再具体介绍。
execute先检查当前工作线程数是否超过核心线程数,如果没有通过
addWorker(command, true)
添加核心线程,这里true告诉addWorker函数,此时检查当前工作线程数是否超过核心线程数,如果是 false则检查当前工作线程数是否超过maximumPoolSize最大线程数量。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
container.start(t);
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}addWorker利用原子数实现无锁流程,首先判断当前线程池状态
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false;
由于Runing<0<Shutdown<Stop,这句判断现成池至少是即将关闭状态,在此基础上,如果是以下三种情况
1、Stop
2、传入的任务不是空的
3、任务队列是空的
只要出现三个条件中的一个,那么返回终止线程池。1和3都可以理解,这里说下第二种情况,threadpool允许核心线程池数量为0,这就可能在一定空闲之后工作线程数数量为0,而任务是放入一个同步队列workQueue中,所有的工作线程从workQueue抢任务来做。再开回看execute下面一段代码,
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}当任务成功的插入队列中时,如果发现工作线程总数为0:workerCountOf(recheck) == 0这时创建没有任务工作线程:
addWorker(null, false)
这时的线程池即将关闭,这个线程主要是为了清理一些少量的未完成任,且此时任务队列也没多少任务了。再回到上面addWorker第二个条件,如果firstTask== null则不能退出,要继续创建一个空任务的工作线程Worker。jdk中为了做到优雅的退出,细节做到了极致。如果通过上面的检查线程池还在运行中,进入第二项检查,如果传入参数core,检查目前工作线程数是否超过核心线程数量或者最大线程数。如果都通过则创建工作线程Worker,并把任务传递给Worker(参数名firstTask),作为工作线程的第一个任务,由此可见任务通过两种方式传递给工作线程Worker
1、直接传递给工作线程,作为第一个任务。
2、任务传入同步队列workQueue中,工作线程不断从workQueue中取出任务执行。
对应execute中代码:
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);从if (addWorker(command, true))如果为真,代表创建核心线程成功并返回,否则为false,通过分析addWorker代码false时可能有两种情况:
1、线程池即将关闭
2、线程数量超过核心线程数量或者最大线程数量。
execute接下来针对这两种情况分别处理,取出控制变量ctl, isRunning(c) 这个条件实际排除了第一种情形:
isRunning(c) =false时会第二次进入addWorker(command, false),这时会返回false,会直接引发拒绝策略。
具体看看怎么处理第二种情况的,此时工作线程数肯定是超过核心线程数量,此时先尝试将任务传入任务同步队列workQueue中,如果能成功传入,则做二次检查,主要检查两项
1、线程池是否关闭
2、工作线程是否为0但是任务队列中有任务,这种情况前面已经讨论过。
如果任务不能插入队列中,则尝试创建非核心线程来消化任务。
!addWorker(command, false)
由此可见,如果任务队列是一些无界的队列,如LinkedBlockingQueue,线程池永远不会创建超过核心线程数量的线程。接下来分析工作 线程Woker实现,Woker是线程池一个内部嵌套类,基于抽象类AQS,所以本身就具备了锁的同步功能。Woker封装任务和线程对象,线程执行函数run()将worker封装,以访问者模式传递给线程池的runWorker()函数,又回到了ThreadPoolexecutor。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}runWorker首先检查当前工作线程是否自带任务,即工作线程的firstTask是否为空,有运行firstTask,没有从任务队列中取任务:
while (task != null || (task = getTask()) != null)
task!=null为真的话,后面(task = getTask()) != null不会运行,针对具体工作线程任务为空的条件是
workQueue.isempty() && firstTask==null
即自带任务NULL且任务队列为空,工作线程退出任务条件是没有任务可运行,线程池取任务函数为getTask()函数,既然线程没有任务就退出,所以在getTask()函数中区分工作线程是否为核心线程 。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}用变量timed来判断是否对当下线程做超时处理,当小于corePoolSize时,timed恒为false;而大于corePoolSize时,timed=true代表当下线程为非核心线程。之前所说的线程池创建线程时并未指定哪些线程是核心线程。在线程取任务时,如果当时线程数大于corePoolSize,即使这个线程当时是addWoker(command,true)创建的,也被视为非核心线程。
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r;
如果time=true,利用poll(),这是一个非阻塞函数一定时间内返回,如果取出任务是空则会再次循环进入条件判断:
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}此刻工作线程数减去1,由于取出的task是null,回到runWorker会退出循环,代表一个线程执行完毕。对于核心线程而言timed=false,会执行take()函数,take()是个阻塞函数,可以理解为核心线程不取任务就坚决不退出,这也保证了科核心线程一直处于运行状态。
二、C++实现线程池
真正CPU调度的是内核线程,ThreadPoolexecutor线程池创建的线程与内核线程是1:1,一一对应,jdk21或go语言中实现了协程,协程可将线程进一步细分,提高线程对时间片的利用效率,从感官上看,形成多个用户线程与一个内核线程对应:M:N,M>>N。c++11的thread对象也是1:1模型,1:1结构适合CPU密集型业务,即业务需要大量计算的需求,而M:N混合型适合IO密集型业务。
| 上一篇 时间序列分析:AR(p),MA(q) | |
| 评论区 | |