jetty6中ThreadPool实现的代码解析
QueuedThreadPool承载了jetty6提交任务至线程工作的使命,在没有用concurrent包的情况下,作者实现了漂亮的线程池。
?
QueuedThreadPool内部维护了一个FIFO的job队列,该队列基于数组实现。
?
QueuedThreadPool的核心接口是dispatch(Runnable),正如字面所表达的那样,该方法将派发一个job到一个线程来执行。下面是对该方法的注释解析。
?
?
public boolean dispatch(Runnable job) { //前置条件的判断 if (!isRunning() || job==null) return false; //可以执行该job的线程 PoolThread thread=null; //是否需要创建新线程的标识 boolean spawn=false; synchronized(_lock) { // Look for an idle thread int idle=_idle.size(); //判断是否有空闲线程,如果有,取出空闲线程列表的最后一个线程 if (idle>0) thread=(PoolThread)_idle.remove(idle-1); else//已经没有空闲线程了,将该job放入到job队列中 { // queue the job //增加队列的大小,得到最新的队列大小 _queued++; //如果最新的队列大小大于最大值,则将最大值设置为最新的队列大小 if (_queued>_maxQueued) _maxQueued=_queued; //设置插入位置的job为最新dispatch的job,并且递增下一个插入位置,_nextJobSlot记录的是下一个job可以放置的索引 _jobs[_nextJobSlot++]=job; //如果已经插入位置的游标已经到达了队列的尾部,则从头开始 if (_nextJobSlot==_jobs.length) _nextJobSlot=0; //如果队列已经满了,则扩容队列。_nextJob是下一个可以获取job的位置,可以认为是队列的尾部,这是一个FIFO的队列 if (_nextJobSlot==_nextJob) { // Grow the job queue Runnable[] jobs= new Runnable[_jobs.length+_maxThreads]; int split=_jobs.length-_nextJob; if (split>0) System.arraycopy(_jobs,_nextJob,jobs,0,split); if (_nextJob!=0) System.arraycopy(_jobs,0,jobs,split,_nextJobSlot); _jobs=jobs; _nextJob=0; _nextJobSlot=_queued; } spawn=_queued>_spawnOrShrinkAt; } } //如果得到了可以工作的空闲线程,则将利用该线程执行job if (thread!=null) { thread.dispatch(job); } else if (spawn)//如果需要增加新的线程,则创建一个新的线程 { newThread(); } //不管怎么说,这个job要么被一个线程运行,要么成功放到了job的队列,我们的工作算是成功做完了。 return true; }?
可以看到实现逻辑大致如下:首先判断是否有空闲线程,如果有,从空闲线程列表中移除最后一个线程,用该线程来执行提交的job。如果没有空闲线程,就把该job放到job的队列中。如果放入队列后,队列的长度大于_spawnOrShrinkAt,就说明可以创建一个新的线程。
?
QueuedThreadPool是用PoolThread作为工作线程的。PoolThread是Thread的子类。看看run方法有什么事情要做。
?
?
public void run() { boolean idle=false; Runnable job=null; try { //只要系统还在运行,就不让该线程停止工作 while (isRunning()) { //有job要做了 if (job!=null) { final Runnable todo=job; job=null; //标识这个线程不在空闲,因为我有工作要做了。 idle=false; todo.run(); //我觉得这个地方应该加上 idle = true;下面if(!idle)改成if(idle)语义上更清楚一些 } //需要操作FIFO的job队列,给临界代码加锁 synchronized(_lock) { //哈哈,有job可以做了 if (_queued>0) { //递减队列的长度 _queued--; //从队列的头部取出一个job job=_jobs[_nextJob]; //将头部的位置值为null,并且递增_nextJob,更新头部 _jobs[_nextJob++]=null; //如果到达了数组的尾部,则从数组的头开始 if (_nextJob==_jobs.length) _nextJob=0; //非常从容地得到了一个需要执行的job,跳到while的开始,执行该job continue; } //如果job队列为空,那是不是我这个线程是多余的线程,job能很快地被执行掉,需要亲手干掉自己?下面做一些判断 //所有线程的数量总和 final int threads=_threads.size(); //如果从数量上判断可以清除掉该线程 if (threads>_minThreads && (threads>_maxThreads || _idle.size()>_spawnOrShrinkAt)) { //空闲时间的条件是否能满足。如果有一个兄弟在不久之前已经自杀了,我需要再晚点。 long now = System.currentTimeMillis(); if ((now-_lastShrink)>getMaxIdleTimeMs()) { //记录我自杀的时间(即最后一个线程自杀的时间) _lastShrink=now; //从空闲队列中移除自己 _idle.remove(this); //使命完成,寿终正寝 return; } } //如果我做完了自己的job,我已经空闲了,进入空闲线程池里 if (!idle) { // Add ourselves to the idle set. _idle.add(this); idle=true; } } //既然我已经空闲了,如果这个时候还没有job让我来做,我就再等一会吧。 //如果等待的时间到了,或者等待的时候有新的job分配给我做,我就不再等待。 synchronized (this) { if (_job==null) this.wait(getMaxIdleTimeMs()); job=_job; _job=null; } } } catch (InterruptedException e) { Log.ignore(e); } //我死掉了(自杀,或者在执行job时发生异常意外死亡),善后工作 finally { //从空闲线程池中把我的尸体抬走 synchronized (_lock) { _idle.remove(this); } //从总的线程池中把我的尸体抬走。这个地方可以看到锁的分离。_idle和_threads两个集合用不同的锁控制,提高了性能 synchronized (_threadsLock) { _threads.remove(this); } synchronized (this) { job=_job; } //如果我在处理job时意外死亡,就把我未完成的事业交给下一个兄弟来处理 if (job!=null) { QueuedThreadPool.this.dispatch(job); } } } ??
?
?
?