/ Afred's Blog / JAVA线程池源码分析

JAVA线程池源码分析

2015-06-20 posted in [编程之旅]

ThreadPoolExecutor构造函数

ThreadPoolExecutor构造函数提供比较多的参数配置,方便开发者自定义:


public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

如果在构造函数中没有指定threadFactory,默认会调用Executors.defaultThreadFactory()。默认的threadFactory,创建的所有线程属于同一个ThreadGroup,并且线程优先级相同,不是守护线程,也就是说,只要线程池没有退出,调用线程池的主线程也不会退出。根据默认的threadFactory,线程名称为pool-#poolNumber#-#thread-threadNumber#,可以通过jstack命令查看。

线程的生命周期

线程池只有在需要的时候才会创建新的线程。下面从execute函数入手分析,线程池中线程的生命周期:


int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
  if (addWorker(command, true))
     return;
  c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
  reject(command);
else if (workerCountOf(recheck) == 0)
  addWorker(null, false);
}
else if (!addWorker(command, false))
  reject(command);

主要流程如下:

  1. 如果当前worker数量小于配置的核心线程数量,则添加新的worker;
  2. 否则,判断线程池是否在运行,然后将提交的任务存放到任务队列中;
  3. 即使往任务队列添加成功,仍然需要double-check保证线程池正常运行;
  4. 如果添加任务队列失败,会尝试添加worker。

addWorker

addWorker方法根据当前线程池的状态和设定的coremaximum判断是否需要创建新的woker。如果添加worker成功,则会启动worker对应的线程,并将execute方法中传入的任务作为线程的第一个任务。如果当前线程池状态不允许再添加新的worker,则该函数返回false。执行逻辑如下:


private boolean addWorker(Runnable firstTask, boolean core) {
	retry:
	for (;;) {
		// 检查线程池状态
		
		for (;;) {
			// 检查线程数量是否超过指定容量
			
			// 增加worker数目,但是还没有真正添加worker
			if (compareAndIncrementWorkerCount(c)) {
				break retry:
			}
			if (runStateOf(c) != rs) {
				continue retry;
			}
			// CAS failed
		}
	}
	
	Worker w = new Worker(firstTask);
	mainLock.lock;
	try {
		workers.add(w)
	} finally {
		mainLock.unlock;
	}
	
	w.thread.start();
	
	 if (! workerStarted)
        addWorkerFailed(w);
	
}

添加worker数量和创建新的worker,并将worker添加到woker set中这两个过程并没有同时加锁,在函数的最后,如果woker添加失败,线程池会加锁回滚。线程池将任务提交和任务执行解藕,addWoker分析完成之后,任务提交模块的功能完成了,如果一切顺利,此时已经成功将任务提交给线程池。

Worker是Runnable接口的子类,有两个主要的成员变量:firstTaskthread,分别代表任务和线程对象。在添加worker时,由上文的addWorker方法可知,worker的绑定线程也随之启动,接下来分析线程的执行,Worker类的run方法大致如下:


public void run() {
	Runnable task = this.firstTask;
	while (task != null || (task = getTask()) != null){
		beforeExecute();
		task.run();
		afterExecute();
	}
	
	processWorkerExit();
}

也就是说,worker的绑定线程启动之后,只要任务队列有数据,就会不停的跑任务。beforeExecuteafterExecute是两个钩子方法,默认没有添加任何逻辑,如果实现自己的ThreadPoolExecutor,可以重写这两个方法,实现一些统计分析逻辑。

线程池中的线程是可以回收的,有两个控制变量allowCoreThreadTimeOutkeepAliveTime,接下来看看线程池是怎么控制线程的回收的。主要是getTaskprocessWorkerExit两个方法。

getTask

getTask方法就是一个无限循环,退出有几种可能:

  1. 线程池当前是退出状态,则直接返回null
  2. 成功取到任务
  3. 超时退出

从任务队列中取任务的部分代码如下:


timed = allowCoreThreadTimeOut || wc > corePoolSize;
try {
     Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
     if (r != null)
           return r;
     timedOut = true;
     } catch (InterruptedException retry) {
         timedOut = false;
     }
     

首先判断取任务时需要超时设置,如果配置了allowCoreThreadTimeOut或者当前线程数量大于配置的corePoolSize,则取任务时最多等待keepAliveTime,否则一直阻塞等待,成功取到任务则直接返回,否则此次循环超时,继续下一轮循环。getTask方法使用CAS避免加锁操作,提高线程池的并发性能。

processWorkerExit

processWorkerExit方法是线程退出前执行的方法,执行完成之后,线程的run函数退出,整个线程退出。线程退出while循环,可能是正常退出,也可能是异常中断,所以需要分开处理。如果线程是正常退出,并且线程不够用,整个方法会调用addWorker补充线程。

线程池的使用

execute vs submit

executesubmit都能提交任务。虽然submit最后也是通过调用execute实现任务提交,但是还是有如下几个区别:

  1. execute方法只能接收Runnable作为参数,而submit除了接收Runnable,还可以接收Callable
  2. 一个任务通过execute方法提交到线程池后,如果抛出RuntimeException,整个Worker会退出,同时会调用UncaughtExceptionHandler处理异常,线程池的执行最后都会交给Worker对象,所以参考一下Worker源码:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	// 1
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

在注释1处,task就是通过execute传入的Runnable对象。submit调用execute提交任务,所以任务执行逻辑和execute一致,区别在于调用execute之前,submit会将提交的task封装成FutureTask,然后在把这个FutureTask提交到线程池,所以通过submit提交的任务,注释1处是一个FutureTask对象,接下来看一下FutureTask对象的run方法:


                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }

FutureTask直接把异常吞了,所以submit的任务,即使有异常,Worker也不会退出,如果要获取到这个异常,调用FutureTask#get方法:


    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

参考

  1. 线程池数据结构与线程构造方法
  2. ThreadPoolExecutor thread safe
  3. http://brokendreams.iteye.com/blog/2252344
  4. http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-ThreadPoolExecutor.html
  5. http://www.idouba.net/sync-implementation-by-aqs/
comments powered by Disqus