1.
        
                 
        
         ExecutorService
        
      
      
         
      
      
      
         
      
    
    
      Java
      
        从
      
       1.5
      
        开始正式提供了并发包
      
       ,
      
        而这个并发包里面除了原子变量
      
       ,synchronizer,
      
        并发容器
      
       ,
      
        另外一个非常重要的特性就是线程池
      
       .
      
        对于线程池的意义
      
       ,
      
        我们这边不再多说
      
       . 
    
  
上图是线程池的主体类图 ,ThreadPoolExecutor 是应用最为广泛的一个线程池实现 ( 我也将在接下来的文字中详细描述我对这个类的理解和执行机制 ),ScheduledThreadPoolExecutor 则在 ThreadPoolExecutor 上提供了定时执行的等附加功能 , 这个可以从 ScheduledExecutorService 接口的定义中看出来 .Executors 则类似工厂方法 , 提供了几个非常常用的线程池初始化方法 .
ThreadPoolExecutor
这个类继承了 AbstractExecutorService 抽象类 , AbstractExecutorService 主要的职责有 2 部分 , 一部分定义和实现提交任务的方法 (3 个 submit 方法的实现 ) , 实例化 FutureTask 并且交给子类执行 , 另外一部分实现 invokeAny,invokeAll 方法 . 留给子类的方法为 execute 方法 , 也就是 Executor 接口定义的方法 .
 //
    
    
      实例化一个FutureTask,交给子类的execute方法执行.这种设计能够保证callable和runnable的执行接口方法的一致性(FutureTask包装了这个差别)
    
      //
    
    
      实例化一个FutureTask,交给子类的execute方法执行.这种设计能够保证callable和runnable的执行接口方法的一致性(FutureTask包装了这个差别)
    
    
       public
    
    
       
    
    
      <
    
    
      T
    
    
      >
    
    
       Future
    
    
      <
    
    
      T
    
    
      >
    
    
       submit(Runnable task, T result) 
    
    
    
      
        {
      
    
    
      public
    
    
       
    
    
      <
    
    
      T
    
    
      >
    
    
       Future
    
    
      <
    
    
      T
    
    
      >
    
    
       submit(Runnable task, T result) 
    
    
    
      
        {
         if
      
      
         (task 
      
      
        ==
      
      
         
      
      
        null
      
      
        ) 
      
      
        throw
      
      
         
      
      
        new
      
      
         NullPointerException();
             
      
      
        if
      
      
         (task 
      
      
        ==
      
      
         
      
      
        null
      
      
        ) 
      
      
        throw
      
      
         
      
      
        new
      
      
         NullPointerException();
         RunnableFuture
      
      
        <
      
      
        T
      
      
        >
      
      
         ftask 
      
      
        =
      
      
         newTaskFor(task, result);
             RunnableFuture
      
      
        <
      
      
        T
      
      
        >
      
      
         ftask 
      
      
        =
      
      
         newTaskFor(task, result);
         execute(ftask);
             execute(ftask);
         return
      
      
         ftask;
             
      
      
        return
      
      
         ftask;
         }
         }
      
    
    
       
       protected
    
    
       
    
    
      <
    
    
      T
    
    
      >
    
    
       RunnableFuture
    
    
      <
    
    
      T
    
    
      >
    
    
       newTaskFor(Runnable runnable, T value) 
    
    
    
      
        {
      
    
    
      protected
    
    
       
    
    
      <
    
    
      T
    
    
      >
    
    
       RunnableFuture
    
    
      <
    
    
      T
    
    
      >
    
    
       newTaskFor(Runnable runnable, T value) 
    
    
    
      
        {
         return
      
      
         
      
      
        new
      
      
         FutureTask
      
      
        <
      
      
        T
      
      
        >
      
      
        (runnable, value);
             
      
      
        return
      
      
         
      
      
        new
      
      
         FutureTask
      
      
        <
      
      
        T
      
      
        >
      
      
        (runnable, value);
         }
         }
      
    
  
    
      关于
    
     FutureTask
    
      这个类的实现
    
     ,
    
      我在前面的
    
     JAVA LOCK
    
      代码浅析有讲过其实现原理
    
     ,
    
      主要的思想就是关注任务完成与未完成的状态
    
     ,
    
      任务提交线程
    
     get()
    
      结果时被
    
     park
    
      住
    
     ,
    
      等待任务执行完成被唤醒
    
     ,
    
      任务执行线程在任务执行完毕后设置结果
    
     ,
    
      并且
    
     unpark
    
      对应线程并且让其得到执行结果
    
     . 
    
    
    
      回到
    
     ThreadPoolExecutor
    
      类
    
     .ThreadPoolExecutor
    
      需要实现除了我们刚才说的
    
     execute(Runnable command)
    
      方法外
    
     ,
    
      还得实现
    
     ExecutorService
    
      接口定义的部分方法
    
     .
    
      但
    
     ThreadPoolExecutor
    
      所提供的不光是这些
    
     ,
    
      以下根据我的理解来列一下它所具有的特性
      
    
     1.
    
             
    
     execute
    
      流程
      
    
     2.
    
             
    
    
      池
      
    
     3.
    
             
    
    
      工作队列
      
    
     4.
    
             
    
    
      饱和拒绝策略
      
    
     5.
    
             
    
    
      线程工厂
      
    
     6.
    
             
    
     beforeExecute
    
      和
    
     afterExecute
    
      扩展
      
      
    
     execute
    
      方法的实现有个机制非常重要
    
     ,
    
      当当前线程池线程数量小于
    
     corePoolSize,
    
      那么生成一个新的
    
     worker
    
      并把提交的任务置为这个工作线程的头一个执行任务
    
     ,
    
      如果大于
    
     corePoolSize,
    
      那么会试着将提交的任务塞到
    
     workQueue
    
      里面供线程池里面的worker稍后执行
    
     ,
    
      并不是直接再起一个
    
     worker,
    
      但是当
    
     workQueue
    
      也满
    
     ,
    
      并且当前线程池小于
    
     maxPoolSize,
    
      那么起一个新的
    
     worker
    
      并将该任务设为该
    
     worker
    
      执行的第一个任务执行
    
     ,
    
      大于
    
     maxPoolSize,workQueue
    
      也满负荷
    
     ,
    
      那么调用饱和策略里面的行为
    
     . 
    
    
     worker
    
      线程在执行完一个任务之后并不会立刻关闭
    
     ,
    
      而是尝试着去
    
     workQueue
    
      里面取任务
    
     ,
    
      如果取不到
    
     ,
    
      根据策略关闭或者保持空闲状态
    
     .
    
      所以
    
     submit
    
      任务的时候
    
     ,
    
      提交的顺序为
      
        核心线程池
      
    
    
      ------
    
    
      
        工作队列
      
       ------
    
    
      
        扩展线程池
      
    
     . 
    
    
      
       池包括核心池
    
     ,
    
      扩展池
    
    
      (2
      
        者的线程在同一个
      
       hashset
      
        中,这里只是为了方便才这么称呼,并不是分离的
      
       ),
    
    
      核心池在池内
    
     worker
    
      没有用完的情况下
    
     ,
    
      只要有任务提交都会创建新的线程
    
     ,
    
      其代表线程池正常处理任务的能力
    
     .
    
      扩展池
    
     ,
    
      是在核心线程池用完
    
     ,
    
      并且工作队列也已排满任务的情况下才会开始初始化线程
    
     ,
    
      其代表的是线程池超出正常负载时的解决方案
    
     ,
    
      一旦任务完成
    
     ,
    
      并且试图从
    
     workQueue
    
      取不到任务
    
     ,
    
      那么会比较当前线程池与核心线程池的大小
    
     ,
    
      大于核心线程池数的
    
     worker
    
      将被销毁
    
     .
  
 Runnable getTask() 
    
    
    
      
        {
    
    
      Runnable getTask() 
    
    
    
      
        {
         for
      
      
         (;;) 
      
      
      
        
          {
        
             
      
      
        for
      
      
         (;;) 
      
      
      
        
          {
           try
        
        
           
        
        
        
          
            {
          
                   
        
        
          try
        
        
           
        
        
        
          
            {
             int
          
          
             state 
          
          
            =
          
          
             runState;
                         
          
          
            int
          
          
             state 
          
          
            =
          
          
             runState;
             //
          
          
            >SHUTDOWN就是STOP或者TERMINATED
                         
          
          
            //
          
          
            >SHUTDOWN就是STOP或者TERMINATED
             //
          
          
            直接返回
                         
          
          
            //
          
          
            直接返回
          
          
             if
          
          
             (state 
          
          
            >
          
          
             SHUTDOWN)
          
          
                        
          
          
            if
          
          
             (state 
          
          
            >
          
          
             SHUTDOWN)
             return
          
          
             
          
          
            null
          
          
            ;
                             
          
          
            return
          
          
             
          
          
            null
          
          
            ;
             Runnable r;
                         Runnable r;
             //
          
          
            如果是SHUTDOWN状态,那么取任务,如果有
                         
          
          
            //
          
          
            如果是SHUTDOWN状态,那么取任务,如果有
             //
          
          
            将剩余任务执行完毕,否则就结束了
                           
          
          
            //
          
          
            将剩余任务执行完毕,否则就结束了
          
          
             if
          
          
             (state 
          
          
            ==
          
          
             SHUTDOWN)  
          
          
            //
          
          
             Help drain queue
          
          
                        
          
          
            if
          
          
             (state 
          
          
            ==
          
          
             SHUTDOWN)  
          
          
            //
          
          
             Help drain queue
          
          
             r 
          
          
            =
          
          
             workQueue.poll();
          
          
                            r 
          
          
            =
          
          
             workQueue.poll();
             //
          
          
            如果不是以上状态的(也就是RUNNING状态的),那么如果当前池大于核心池数量,
                         
          
          
            //
          
          
            如果不是以上状态的(也就是RUNNING状态的),那么如果当前池大于核心池数量,
             //
          
          
            或者允许核心线程池取任务超时就可以关闭,那么从任务队列取任务,
                         
          
          
            //
          
          
            或者允许核心线程池取任务超时就可以关闭,那么从任务队列取任务,
             //
          
          
            如果超出keepAliveTime,那么就返回null了,也就意味着这个worker结束了
                         
          
          
            //
          
          
            如果超出keepAliveTime,那么就返回null了,也就意味着这个worker结束了
          
          
             else
          
          
             
          
          
            if
          
          
             (poolSize 
          
          
            >
          
          
             corePoolSize 
          
          
            ||
          
          
             allowCoreThreadTimeOut)
          
          
                        
          
          
            else
          
          
             
          
          
            if
          
          
             (poolSize 
          
          
            >
          
          
             corePoolSize 
          
          
            ||
          
          
             allowCoreThreadTimeOut)
             r 
          
          
            =
          
          
             workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                             r 
          
          
            =
          
          
             workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
             //
          
          
            如果当前池小于核心池,并且不允许核心线程池取任务超时就关闭,那么take(),直到拿到任务或者被interrupt
                         
          
          
            //
          
          
            如果当前池小于核心池,并且不允许核心线程池取任务超时就关闭,那么take(),直到拿到任务或者被interrupt
          
          
             else
          
          
                        
          
          
            else
          
          
             r 
          
          
            =
          
          
             workQueue.take();
                             r 
          
          
            =
          
          
             workQueue.take();
             //
          
          
            如果经过以上判定,任务不为空,那么返回任务
                         
          
          
            //
          
          
            如果经过以上判定,任务不为空,那么返回任务
          
          
             if
          
          
             (r 
          
          
            !=
          
          
             
          
          
            null
          
          
            )
          
          
                        
          
          
            if
          
          
             (r 
          
          
            !=
          
          
             
          
          
            null
          
          
            )
             return
          
          
             r;
                             
          
          
            return
          
          
             r;
             //
          
          
            如果取到任务为空,那么判定是否可以退出
                         
          
          
            //
          
          
            如果取到任务为空,那么判定是否可以退出
          
          
             if
          
          
             (workerCanExit()) 
          
          
          
            
              {
            
          
          
                        
          
          
            if
          
          
             (workerCanExit()) 
          
          
          
            
              {
               //
            
            
              如果整个线程池状态变为SHUTDOWN或者TERMINATED,那么将所有worker interrupt (如果正在执行,那继续让其执行)
                               
            
            
              //
            
            
              如果整个线程池状态变为SHUTDOWN或者TERMINATED,那么将所有worker interrupt (如果正在执行,那继续让其执行)
            
            
               if
            
            
               (runState 
            
            
              >=
            
            
               SHUTDOWN) 
            
            
              //
            
            
               Wake up others
            
            
                              
            
            
              if
            
            
               (runState 
            
            
              >=
            
            
               SHUTDOWN) 
            
            
              //
            
            
               Wake up others
            
            
               interruptIdleWorkers();
            
            
                                  interruptIdleWorkers();
               return
            
            
               
            
            
              null
            
            
              ;
                               
            
            
              return
            
            
               
            
            
              null
            
            
              ;
               }
                           }
            
          
          
             //
          
          
             Else retry
                         
          
          
            //
          
          
             Else retry
          
          
             }
          
        
        
           
        
        
          catch
        
        
           (InterruptedException ie) 
        
        
        
          
            {
            
          
          
                    }
          
        
        
           
        
        
          catch
        
        
           (InterruptedException ie) 
        
        
        
          
            {
             //
          
          
             On interruption, re-check runState
                         
          
          
            //
          
          
             On interruption, re-check runState
          
          
             }
          
          
                    }
          
        
        
           }
           }
        
      
      
         }
             }
      
    
  
    
  
 //
    
    
      worker从workQueue中取不到数据的时候调用此方法,以决定自己是否跳出取任务的无限循环,从而结束此worker的运行
    
      //
    
    
      worker从workQueue中取不到数据的时候调用此方法,以决定自己是否跳出取任务的无限循环,从而结束此worker的运行
    
    
       private
    
    
       
    
    
      boolean
    
    
       workerCanExit() 
    
    
    
      
        {
      
    
    
      private
    
    
       
    
    
      boolean
    
    
       workerCanExit() 
    
    
    
      
        {
         final
      
      
         ReentrantLock mainLock 
      
      
        =
      
      
         
      
      
        this
      
      
        .mainLock;
             
      
      
        final
      
      
         ReentrantLock mainLock 
      
      
        =
      
      
         
      
      
        this
      
      
        .mainLock;
         mainLock.lock();
             mainLock.lock();
         boolean
      
      
         canExit;
             
      
      
        boolean
      
      
         canExit;
         try
      
      
         
      
      
      
        
          {
        
             
      
      
        try
      
      
         
      
      
      
        
          {
           /*
          
                   
        
        
        
          
            /*
          
          
             *线程池状态为stop或者terminated,
                     *线程池状态为stop或者terminated,
             *或者任务队列里面任务已经为空,
                     *或者任务队列里面任务已经为空,
             *或者允许线程池线程空闲超时(实现方式是从工作队列拿最多keepAliveTime的任务,超过这个时间就返回null了)并且
                     *或者允许线程池线程空闲超时(实现方式是从工作队列拿最多keepAliveTime的任务,超过这个时间就返回null了)并且
             *当前线程池大于corePoolSize(>1)
                      *当前线程池大于corePoolSize(>1)
             *那么允许线程结束
                     *那么允许线程结束
             *static final int RUNNING    = 0;
                     *static final int RUNNING    = 0;
             *static final int SHUTDOWN   = 1;
                     *static final int SHUTDOWN   = 1;
             *static final int STOP       = 2;
                     *static final int STOP       = 2;
             *static final int TERMINATED = 3;
                     *static final int TERMINATED = 3;
             */
                     
          
          
            */
          
        
        
           canExit 
        
        
          =
        
        
           runState 
        
        
          >=
        
        
           STOP 
        
        
          ||
                   canExit 
        
        
          =
        
        
           runState 
        
        
          >=
        
        
           STOP 
        
        
          ||
        
        
           workQueue.isEmpty() 
        
        
          ||
                   workQueue.isEmpty() 
        
        
          ||
        
        
           (allowCoreThreadTimeOut 
        
        
          &&
                  (allowCoreThreadTimeOut 
        
        
          &&
        
        
           poolSize 
        
        
          >
        
        
           Math.max(
        
        
          1
        
        
          ,corePoolSize));
                   poolSize 
        
        
          >
        
        
           Math.max(
        
        
          1
        
        
          ,corePoolSize));
           }
        
      
      
         
      
      
        finally
      
      
         
      
      
      
        
          {
          
               }
        
      
      
         
      
      
        finally
      
      
         
      
      
      
        
          {
           mainLock.unlock();
                   mainLock.unlock();
           }
               }
        
      
      
         return
      
      
         canExit;
             
      
      
        return
      
      
         canExit;
         }
         }
      
    
  
    
    
      当提交任务是
    
     ,
    
      线程池都已满
    
     ,
    
      并且工作队列也无空闲位置的情况下
    
     ,ThreadPoolExecutor
    
      会执行
    
     reject
    
      操作
    
     ,JDK
    
      提供了四种
    
     reject
    
      策略
    
     ,
    
      包括
    
     AbortPolicy(
    
      直接抛
    
     RejectedException Exception),CallerRunsPolicy(
    
      提交任务线程自己执行
    
     ,
    
      当然这时剩余任务也将无法提交
    
     ),DiscardOldestPolicy(
    
      将线程池的
    
     workQueue
    
      任务队列里面最老的任务剔除
    
     ,
    
      将新任务丢入
    
     ),DiscardPolicy(
    
      无视
    
     ,
    
      忽略此任务
    
     ,
    
      并且立即返回
    
     ).
    
      实例化
    
     ThreadPoolExecutor
    
      时
    
     ,
    
      如果不指定任何饱和策略
    
     ,
    
      默认将使用
    
     AbortPolicy.
    
    
      
       个人认为这些饱和策略并不十分理想
    
     ,
    
      特别是在应用既要保证快速
    
     ,
    
      又要高可用的情况下
    
     ,
    
      我的想法是能够加入超时等待策略
    
     ,
    
      也就是提交线程时线程池满
    
     ,
    
      能够
    
     park
    
      住提交任务的线程
    
     ,
    
      一旦有空闲
    
     ,
    
      能在第一时间通知到等待线程
    
     . 
    
      这个实际上和主线程执行相似
    
     ,
    
      但是主线程执行期间即使线程池有大量空闲也不会立即可以提交任务
    
     ,
    
      效率上后者可能会比较低
    
     ,
    
      特别是执行慢速任务
    
     .
    
    
    
      实例化
    
     Worker
    
      的时候会调用
    
     ThreadFactory
    
      的
    
     addThread(Runnable r)
    
      方法返回一个
    
     Thread,
    
      这个线程工厂是可以在
    
     ThreadPoolExecutor
    
      实例化的时候指定的
    
     ,
    
      如果不指定
    
     ,
    
      那么将会使用
    
     DefaultThreadFactory, 
    
      这个也就是提供给使用者命名线程
    
     ,
    
      线程归组
    
     ,
    
      是否是
    
     demon
    
      等线程相关属性设置的机会
    
     . 
  
beforeExecute 和 afterExecute 是提供给使用者扩展的 , 这两个方法会在 worker runTask 之前和 run 完毕之后分别调用 .JDK 注释里 Doug Lea(concurrent 包作者 ) 展示了 beforeExecute 一个很有趣的示例 . 代码如下 .
 class
    
    
       PausableThreadPoolExecutor 
    
    
      extends
    
    
       ThreadPoolExecutor 
    
    
    
      
        {
    
    
      class
    
    
       PausableThreadPoolExecutor 
    
    
      extends
    
    
       ThreadPoolExecutor 
    
    
    
      
        {
         private
      
      
         
      
      
        boolean
      
      
         isPaused;
             
      
      
        private
      
      
         
      
      
        boolean
      
      
         isPaused;
         private
      
      
         ReentrantLock pauseLock 
      
      
        =
      
      
         
      
      
        new
      
      
         ReentrantLock();
             
      
      
        private
      
      
         ReentrantLock pauseLock 
      
      
        =
      
      
         
      
      
        new
      
      
         ReentrantLock();
         private
      
      
         Condition unpaused 
      
      
        =
      
      
         pauseLock.newCondition();
             
      
      
        private
      
      
         Condition unpaused 
      
      
        =
      
      
         pauseLock.newCondition();
         
          
         public
      
      
         PausableThreadPoolExecutor(
        
      
      
        public
      
      
         PausableThreadPoolExecutor(
         ) 
      
      
      
        
          { 
        
        
          super
        
        
          (
         ) 
      
      
      
        
          { 
        
        
          super
        
        
          (
           ); }
           ); }
        
      
      
         
         protected
      
      
         
      
      
        void
      
      
         beforeExecute(Thread t, Runnable r) 
      
      
      
        
          {
        
      
      
        protected
      
      
         
      
      
        void
      
      
         beforeExecute(Thread t, Runnable r) 
      
      
      
        
          {
           super
        
        
          .beforeExecute(t, r);
               
        
        
          super
        
        
          .beforeExecute(t, r);
           pauseLock.lock();
               pauseLock.lock();
           try
        
        
           
        
        
        
          
            {
          
               
        
        
          try
        
        
           
        
        
        
          
            {
             while
          
          
             (isPaused) unpaused.await();
                     
          
          
            while
          
          
             (isPaused) unpaused.await();
             }
          
        
        
           
        
        
          catch
        
        
           (InterruptedException ie) 
        
        
        
          
            {
            
                 }
          
        
        
           
        
        
          catch
        
        
           (InterruptedException ie) 
        
        
        
          
            {
             t.interrupt();
                     t.interrupt();
             }
          
        
        
           
        
        
          finally
        
        
           
        
        
        
          
            {
            
                 }
          
        
        
           
        
        
          finally
        
        
           
        
        
        
          
            {
             pauseLock.unlock();
                     pauseLock.unlock();
             }
                 }
          
        
        
           }
           }
        
      
      
         
          
         public
      
      
         
      
      
        void
      
      
         pause() 
      
      
      
        
          {
        
      
      
        public
      
      
         
      
      
        void
      
      
         pause() 
      
      
      
        
          {
           pauseLock.lock();
               pauseLock.lock();
           try
        
        
           
        
        
        
          
            {
          
               
        
        
          try
        
        
           
        
        
        
          
            {
             isPaused 
          
          
            =
          
          
             
          
          
            true
          
          
            ;
                     isPaused 
          
          
            =
          
          
             
          
          
            true
          
          
            ;
             }
          
        
        
           
        
        
          finally
        
        
           
        
        
        
          
            {
            
                 }
          
        
        
           
        
        
          finally
        
        
           
        
        
        
          
            {
             pauseLock.unlock();
                     pauseLock.unlock();
             }
                 }
          
        
        
           }
           }
        
      
      
         
         public
      
      
         
      
      
        void
      
      
         resume() 
      
      
      
        
          {
        
      
      
        public
      
      
         
      
      
        void
      
      
         resume() 
      
      
      
        
          {
           pauseLock.lock();
               pauseLock.lock();
           try
        
        
           
        
        
        
          
            {
          
               
        
        
          try
        
        
           
        
        
        
          
            {
             isPaused 
          
          
            =
          
          
             
          
          
            false
          
          
            ;
                     isPaused 
          
          
            =
          
          
             
          
          
            false
          
          
            ;
             unpaused.signalAll();
                     unpaused.signalAll();
             }
          
        
        
           
        
        
          finally
        
        
           
        
        
        
          
            {
            
                 }
          
        
        
           
        
        
          finally
        
        
           
        
        
        
          
            {
             pauseLock.unlock();
                     pauseLock.unlock();
             }
                 }
          
        
        
           }
           }
        
      
      
         }
           }
      
    
    
       
    
  
    
      使用这个线程池
    
    
      ,
    
    
      用户可以随时调用
    
    
      pause
    
    
      中止剩余任务执行
    
    
      ,
    
    
      当然也可以使用
    
    
      resume
    
    
      重新开始执行剩余任务
    
    
      .
      
      
    
    
      
        ScheduledThreadPoolExecutor
        
      
    
    
      
       ScheduledThreadPoolExecutor
    
    
      是一个很实用的类
    
    
      ,
    
    
      它的实现核心是基于
    
    
      DelayedWorkQueue.
    
    
      从
    
    
      ScheduledThreadPoolExecutor
    
    
      的继承结构上来看
    
    
      ,
    
    
      各位应该能够看出些端倪来
    
    
      ,
    
    
      就是
    
    
      ScheduledThreadPoolExecutor
    
    
      将
    
    
      ThreadPoolExecutor
    
    
      中的任务队列设置成了
    
    
      DelayedWorkQueue,
    
    
      这也就是说
    
    
      ,
    
    
      线程池
    
    
      Worker
    
    
      从任务队列中取的一个任务
    
    
      ,
    
    
      需要等待这个队列中最短超时任务的超时
    
    
      ,
    
    
      也就是实现定时的效果
    
    
      .
    
    
      所以
    
    
      ScheduledThreadPoolExecutor
    
    
      所做的工作其实是比较少的
    
    
      .
    
    
      主要就是实现任务的实例化并加入工作队列
    
    
      ,
    
    
      以及支持
    
    
      scheduleAtFixedRate
    
    
      和
    
    
      scheduleAtFixedDelay
    
    
      这种周期性任务执行
    
    
      .
      
    
  
 public
    
    
       ScheduledThreadPoolExecutor(
    
    
      int
    
    
       corePoolSize,ThreadFactory threadFactory) 
    
    
    
      
        {
    
    
      public
    
    
       ScheduledThreadPoolExecutor(
    
    
      int
    
    
       corePoolSize,ThreadFactory threadFactory) 
    
    
    
      
        {
         super
      
      
        (corePoolSize, Integer.MAX_VALUE, 
      
      
        0
      
      
        , TimeUnit.NANOSECONDS,
      
      
        new
      
      
         DelayedWorkQueue(), threadFactory);
                    
      
      
        super
      
      
        (corePoolSize, Integer.MAX_VALUE, 
      
      
        0
      
      
        , TimeUnit.NANOSECONDS,
      
      
        new
      
      
         DelayedWorkQueue(), threadFactory);
         }
         }
      
    
    
       
    
   private
    
    
       
    
    
      void
    
    
       runPeriodic() 
    
    
    
      
        {
    
    
      private
    
    
       
    
    
      void
    
    
       runPeriodic() 
    
    
    
      
        {
         boolean
      
      
         ok 
      
      
        =
      
      
         ScheduledFutureTask.
      
      
        super
      
      
        .runAndReset();
                   
      
      
        boolean
      
      
         ok 
      
      
        =
      
      
         ScheduledFutureTask.
      
      
        super
      
      
        .runAndReset();
         boolean
      
      
         down 
      
      
        =
      
      
         isShutdown();
                   
      
      
        boolean
      
      
         down 
      
      
        =
      
      
         isShutdown();
         //
      
      
         Reschedule if not cancelled and not shutdown or policy allows
                   
      
      
        //
      
      
         Reschedule if not cancelled and not shutdown or policy allows
      
      
         if
      
      
         (ok 
      
      
        &&
      
      
         (
      
      
        !
      
      
        down 
      
      
        ||
      
      
        (getContinueExistingPeriodicTasksAfterShutdownPolicy() 
      
      
        &&
      
      
         
      
      
        !
      
      
        isStopped()))) 
      
      
      
        
          {
        
      
      
              
      
      
        if
      
      
         (ok 
      
      
        &&
      
      
         (
      
      
        !
      
      
        down 
      
      
        ||
      
      
        (getContinueExistingPeriodicTasksAfterShutdownPolicy() 
      
      
        &&
      
      
         
      
      
        !
      
      
        isStopped()))) 
      
      
      
        
          {
           long
        
        
           p 
        
        
          =
        
        
           period;
                           
        
        
          long
        
        
           p 
        
        
          =
        
        
           period;
           if
        
        
           (p 
        
        
          >
        
        
           
        
        
          0
        
        
          )
                           
        
        
          if
        
        
           (p 
        
        
          >
        
        
           
        
        
          0
        
        
          )
           time 
        
        
          +=
        
        
           p;
                                 time 
        
        
          +=
        
        
           p;
           else
                           
        
        
          else
        
        
           time 
        
        
          =
        
        
           triggerTime(
        
        
          -
        
        
          p);
                                 time 
        
        
          =
        
        
           triggerTime(
        
        
          -
        
        
          p);
           
                
           ScheduledThreadPoolExecutor.
        
        
          super
        
        
          .getQueue().add(
        
        
          this
        
        
          );
                           ScheduledThreadPoolExecutor.
        
        
          super
        
        
          .getQueue().add(
        
        
          this
        
        
          );
           }
                    }
        
      
      
         //
      
      
         This might have been the final executed delayed
                  
      
      
        //
      
      
         This might have been the final executed delayed
         //
      
      
         task.  Wake up threads to check.
                 
      
      
        //
      
      
         task.  Wake up threads to check.
      
      
         else
      
      
         
      
      
        if
      
      
         (down)
      
      
                
      
      
        else
      
      
         
      
      
        if
      
      
         (down)
         interruptIdleWorkers();
                       interruptIdleWorkers();
         }
         }
      
    
    
       
    
  2. CompletionService
 
  
  ExecutorCompletionService
CompletionService 定义了线程池执行任务集 , 可以依次拿到任务执行完毕的 Future,ExecutorCompletionService 是其实现类 , 先举个例子 , 如下代码 , 这个例子中 , 需要注意 ThreadPoolExecutor 核心池一定保证能够让任务提交并且马上执行 , 而不是放到等待队列中去 , 那样次序将会无法控制 ,CompletionService 也将失去效果 ( 其实核心池中的任务完成顺序还是准确的 ).
 public
    
    
       
    
    
      static
    
    
       
    
    
      void
    
    
       main(String[] args) 
    
    
      throws
    
    
       InterruptedException, ExecutionException
    
    
    
      
        {
    
    
      public
    
    
       
    
    
      static
    
    
       
    
    
      void
    
    
       main(String[] args) 
    
    
      throws
    
    
       InterruptedException, ExecutionException
    
    
    
      
        {
         ThreadPoolExecutor es
      
      
        =
      
      
        new
      
      
         ThreadPoolExecutor(
      
      
        10
      
      
        , 
      
      
        15
      
      
        , 
      
      
        2000
      
      
        , TimeUnit.MILLISECONDS, 
      
      
        new
      
      
         ArrayBlockingQueue
      
      
        <
      
      
        Runnable
      
      
        >
      
      
        (
      
      
        10
      
      
        ),
      
      
        new
      
      
         ThreadPoolExecutor.AbortPolicy());
             ThreadPoolExecutor es
      
      
        =
      
      
        new
      
      
         ThreadPoolExecutor(
      
      
        10
      
      
        , 
      
      
        15
      
      
        , 
      
      
        2000
      
      
        , TimeUnit.MILLISECONDS, 
      
      
        new
      
      
         ArrayBlockingQueue
      
      
        <
      
      
        Runnable
      
      
        >
      
      
        (
      
      
        10
      
      
        ),
      
      
        new
      
      
         ThreadPoolExecutor.AbortPolicy());
         CompletionService
      
      
        <
      
      
        String
      
      
        >
      
      
         cs
      
      
        =
      
      
        new
      
      
         ExecutorCompletionService
      
      
        <
      
      
        String
      
      
        >
      
      
        (es);
             CompletionService
      
      
        <
      
      
        String
      
      
        >
      
      
         cs
      
      
        =
      
      
        new
      
      
         ExecutorCompletionService
      
      
        <
      
      
        String
      
      
        >
      
      
        (es);    
         cs.submit(
      
      
        new
      
      
         Callable
      
      
        <
      
      
        String
      
      
        >
      
      
        () 
      
      
      
        
          {
        
             cs.submit(
      
      
        new
      
      
         Callable
      
      
        <
      
      
        String
      
      
        >
      
      
        () 
      
      
      
        
          {
           @Override
                @Override
           public
        
        
           String call() 
        
        
          throws
        
        
           Exception 
        
        
          Codehi
          
                
        
        
          public
        
        
           String call() 
        
        
          throws
        
        
           Exception 
        
        
          Codehi 
        
      
    
  发表评论


 
     
             
       
       
       
       
					 
					 
		
评论