ThreadPoolexecutor源码分析、C++11线程池实现

C++11标准引入thread对象,从此以后在Windows/Linux中再也不用在Createthread和pthread函数切换了,但是C++11并没有给出线程池的实现。jdk中ThreadPoolexecutor是由美国计算机科学家、纽约州立大学 Oswego 分校教授Doug Lea编写的,设计ThreadPoolexecutor时作者考虑的诸多细节,最大的特点是极致的追求优雅的退出和平稳的拒绝策略,学习ThreadPoolexecutor代码可以并发编程的诸多技术。

一、ThreadPoolexecutor分析

ThreadPoolexecutor有两种方式执行任务方式,一种带返回值的submit,一种是不带返回值的execute,重点来看execute函数,submit核心也是调用execute:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

同一个原子Integer类型变量ctl,以Integer32位为例,前3位用来表示线程池状态,依次是Runing<0<Shutdown<Stop,注意!Runing是个负数;ctl后面29位用来存储线程池中线程的数量。线程池里定义了两类线程,一种是核心线程另一种是非核心线程,corePoolSize为核心线程数量,maximumPoolSize为最大线程数量,需要注意的是,创建线程时,并未规定创建的线程是否是核心线程。每个线程在检查工作线程数量时,当下没超过corePoolSize时都是核心线程,超过corePoolSize时为非核心线程,在介绍工作线程运行函数runWorker时会再具体介绍。

execute先检查当前工作线程数是否超过核心线程数,如果没有通过

addWorker(command, true)

添加核心线程,这里true告诉addWorker函数,此时检查当前工作线程数是否超过核心线程数,如果是 false则检查当前工作线程数是否超过maximumPoolSize最大线程数量。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();

                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                container.start(t);
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker利用原子数实现无锁流程,首先判断当前线程池状态

 if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

由于Runing<0<Shutdown<Stop,这句判断现成池至少是即将关闭状态,在此基础上,如果是以下三种情况

1、Stop

2、传入的任务不是空的

3、任务队列是空的

只要出现三个条件中的一个,那么返回终止线程池。1和3都可以理解,这里说下第二种情况,threadpool允许核心线程池数量为0,这就可能在一定空闲之后工作线程数数量为0,而任务是放入一个同步队列workQueue中,所有的工作线程从workQueue抢任务来做。再开回看execute下面一段代码,

 if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }

当任务成功的插入队列中时,如果发现工作线程总数为0:workerCountOf(recheck) == 0这时创建没有任务工作线程:

addWorker(null, false)

这时的线程池即将关闭,这个线程主要是为了清理一些少量的未完成任,且此时任务队列也没多少任务了。再回到上面addWorker第二个条件,如果firstTask== null则不能退出,要继续创建一个空任务的工作线程Worker。jdk中为了做到优雅的退出,细节做到了极致。如果通过上面的检查线程池还在运行中,进入第二项检查,如果传入参数core,检查目前工作线程数是否超过核心线程数量或者最大线程数。如果都通过则创建工作线程Worker,并把任务传递给Worker(参数名firstTask),作为工作线程的第一个任务,由此可见任务通过两种方式传递给工作线程Worker

1、直接传递给工作线程,作为第一个任务。

2、任务传入同步队列workQueue中,工作线程不断从workQueue中取出任务执行。

对应execute中代码:

if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
else if (!addWorker(command, false))
    reject(command);

从if (addWorker(command, true))如果为真,代表创建核心线程成功并返回,否则为false,通过分析addWorker代码false时可能有两种情况:

1、线程池即将关闭

2、线程数量超过核心线程数量或者最大线程数量。

execute接下来针对这两种情况分别处理,取出控制变量ctl, isRunning(c) 这个条件实际排除了第一种情形:

isRunning(c) =false时会第二次进入addWorker(command, false),这时会返回false,会直接引发拒绝策略。

具体看看怎么处理第二种情况的,此时工作线程数肯定是超过核心线程数量,此时先尝试将任务传入任务同步队列workQueue中,如果能成功传入,则做二次检查,主要检查两项

1、线程池是否关闭

2、工作线程是否为0但是任务队列中有任务,这种情况前面已经讨论过。

如果任务不能插入队列中,则尝试创建非核心线程来消化任务。

!addWorker(command, false)

由此可见,如果任务队列是一些无界的队列,如LinkedBlockingQueue,线程池永远不会创建超过核心线程数量的线程。接下来分析工作 线程Woker实现,Woker是线程池一个内部嵌套类,基于抽象类AQS,所以本身就具备了锁的同步功能。Woker封装任务和线程对象,线程执行函数run()将worker封装,以访问者模式传递给线程池的runWorker()函数,又回到了ThreadPoolexecutor。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker首先检查当前工作线程是否自带任务,即工作线程的firstTask是否为空,有运行firstTask,没有从任务队列中取任务:

while (task != null || (task = getTask()) != null)

task!=null为真的话,后面(task = getTask()) != null不会运行,针对具体工作线程任务为空的条件是

workQueue.isempty() && firstTask==null

即自带任务NULL且任务队列为空,工作线程退出任务条件是没有任务可运行,线程池取任务函数为getTask()函数,既然线程没有任务就退出,所以在getTask()函数中区分工作线程是否为核心线程 。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

用变量timed来判断是否对当下线程做超时处理,当小于corePoolSize时,timed恒为false;而大于corePoolSize时,timed=true代表当下线程为非核心线程。之前所说的线程池创建线程时并未指定哪些线程是核心线程。在线程取任务时,如果当时线程数大于corePoolSize,即使这个线程当时是addWoker(command,true)创建的,也被视为非核心线程。

Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;

如果time=true,利用poll(),这是一个非阻塞函数一定时间内返回,如果取出任务是空则会再次循环进入条件判断:

if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
        return null;
    continue;
}

此刻工作线程数减去1,由于取出的task是null,回到runWorker会退出循环,代表一个线程执行完毕。对于核心线程而言timed=false,会执行take()函数,take()是个阻塞函数,可以理解为核心线程不取任务就坚决不退出,这也保证了科核心线程一直处于运行状态。

二、C++实现线程池

真正CPU调度的是内核线程,ThreadPoolexecutor线程池创建的线程与内核线程是1:1,一一对应,jdk21或go语言中实现了协程,协程可将线程进一步细分,提高线程对时间片的利用效率,从感官上看,形成多个用户线程与一个内核线程对应:M:N,M>>N。c++11的thread对象也是1:1模型,1:1结构适合CPU密集型业务,即业务需要大量计算的需求,而M:N混合型适合IO密集型业务。

-免费试读结束-
登录|注册后打赏作者吧! 0.8元
上一篇  时间序列分析:AR(p),MA(q)
评论区