Home » concurrent-sources » EDU.oswego.cs.dl.util.concurrent » [javadoc | source]

    1   /*
    2     File: PooledExecutor.java
    3   
    4     Originally written by Doug Lea and released into the public domain.
    5     This may be used for any purposes whatsoever without acknowledgment.
    6     Thanks for the assistance and support of Sun Microsystems Labs,
    7     and everyone contributing, testing, and using this code.
    8   
    9     History:
   10     Date       Who                What
   11     19Jun1998  dl               Create public version
   12     29aug1998  dl               rely on ThreadFactoryUser, 
   13                                 remove ThreadGroup-based methods
   14                                 adjusted locking policies
   15      3mar1999  dl               Worker threads sense decreases in pool size
   16     31mar1999  dl               Allow supplied channel in constructor;
   17                                 add methods createThreads, drain
   18     15may1999  dl               Allow infinite keepalives
   19     21oct1999  dl               add minimumPoolSize methods
   20      7sep2000  dl               BlockedExecutionHandler now an interface,
   21                                 new DiscardOldestWhenBlocked policy
   22     12oct2000  dl               add shutdownAfterProcessingCurrentlyQueuedTasks
   23     13nov2000  dl               null out task ref after run 
   24     08apr2001  dl               declare inner class ctor protected 
   25     12nov2001  dl               Better shutdown support
   26                                 Blocked exec handlers can throw IE
   27                                 Simplify locking scheme
   28     25jan2001  dl               {get,set}BlockedExecutionHandler now public
   29     17may2002  dl               null out task var in worker run to enable GC.
   30   */
   31   
   32   package EDU.oswego.cs.dl.util.concurrent;
   33   import java.util;
   34   
   35   /**
   36    * A tunable, extensible thread pool class. The main supported public
   37    * method is <code>execute(Runnable command)</code>, which can be
   38    * called instead of directly creating threads to execute commands.
   39    *
   40    * <p>
   41    * Thread pools can be useful for several, usually intertwined
   42    * reasons:
   43    *
   44    * <ul>
   45    *
   46    *    <li> To bound resource use. A limit can be placed on the maximum
   47    *    number of simultaneously executing threads.
   48    *
   49    *    <li> To manage concurrency levels. A targeted number of threads
   50    *    can be allowed to execute simultaneously.
   51    *
   52    *    <li> To manage a set of threads performing related tasks.
   53    *
   54    *    <li> To minimize overhead, by reusing previously constructed
   55    *    Thread objects rather than creating new ones.  (Note however
   56    *    that pools are hardly ever cure-alls for performance problems
   57    *    associated with thread construction, especially on JVMs that
   58    *    themselves internally pool or recycle threads.)  
   59    *
   60    * </ul>
   61    *
   62    * These goals introduce a number of policy parameters that are
   63    * encapsulated in this class. All of these parameters have defaults
   64    * and are tunable, either via get/set methods, or, in cases where
   65    * decisions should hold across lifetimes, via methods that can be
   66    * easily overridden in subclasses.  The main, most commonly set
   67    * parameters can be established in constructors.  Policy choices
   68    * across these dimensions can and do interact.  Be careful, and
   69    * please read this documentation completely before using!  See also
   70    * the usage examples below.
   71    *
   72    * <dl>
   73    *   <dt> Queueing 
   74    *
   75    *   <dd> By default, this pool uses queueless synchronous channels to
   76    *   to hand off work to threads. This is a safe, conservative policy
   77    *   that avoids lockups when handling sets of requests that might
   78    *   have internal dependencies. (In these cases, queuing one task
   79    *   could lock up another that would be able to continue if the
   80    *   queued task were to run.)  If you are sure that this cannot
   81    *   happen, then you can instead supply a queue of some sort (for
   82    *   example, a BoundedBuffer or LinkedQueue) in the constructor.
   83    *   This will cause new commands to be queued in cases where all
   84    *   MaximumPoolSize threads are busy. Queues are sometimes
   85    *   appropriate when each task is completely independent of others,
   86    *   so tasks cannot affect each others execution.  For example, in an
   87    *   http server.  <p>
   88    *
   89    *   When given a choice, this pool always prefers adding a new thread
   90    *   rather than queueing if there are currently fewer than the
   91    *   current getMinimumPoolSize threads running, but otherwise always
   92    *   prefers queuing a request rather than adding a new thread. Thus,
   93    *   if you use an unbounded buffer, you will never have more than
   94    *   getMinimumPoolSize threads running. (Since the default
   95    *   minimumPoolSize is one, you will probably want to explicitly
   96    *   setMinimumPoolSize.)  <p>
   97    *
   98    *   While queuing can be useful in smoothing out transient bursts of
   99    *   requests, especially in socket-based services, it is not very
  100    *   well behaved when commands continue to arrive on average faster
  101    *   than they can be processed.  Using bounds for both the queue and
  102    *   the pool size, along with run-when-blocked policy is often a
  103    *   reasonable response to such possibilities.  <p>
  104    *
  105    *   Queue sizes and maximum pool sizes can often be traded off for
  106    *   each other. Using large queues and small pools minimizes CPU
  107    *   usage, OS resources, and context-switching overhead, but can lead
  108    *   to artifically low throughput. Especially if tasks frequently
  109    *   block (for example if they are I/O bound), a JVM and underlying
  110    *   OS may be able to schedule time for more threads than you
  111    *   otherwise allow. Use of small queues or queueless handoffs
  112    *   generally requires larger pool sizes, which keeps CPUs busier but
  113    *   may encounter unacceptable scheduling overhead, which also
  114    *   decreases throughput.  <p>
  115    *
  116    *   <dt> Maximum Pool size
  117    *
  118    *   <dd> The maximum number of threads to use, when needed.  The pool
  119    *   does not by default preallocate threads.  Instead, a thread is
  120    *   created, if necessary and if there are fewer than the maximum,
  121    *   only when an <code>execute</code> request arrives.  The default
  122    *   value is (for all practical purposes) infinite --
  123    *   <code>Integer.MAX_VALUE</code>, so should be set in the
  124    *   constructor or the set method unless you are just using the pool
  125    *   to minimize construction overhead.  Because task handoffs to idle
  126    *   worker threads require synchronization that in turn relies on JVM
  127    *   scheduling policies to ensure progress, it is possible that a new
  128    *   thread will be created even though an existing worker thread has
  129    *   just become idle but has not progressed to the point at which it
  130    *   can accept a new task. This phenomenon tends to occur on some
  131    *   JVMs when bursts of short tasks are executed.  <p>
  132    *
  133    *   <dt> Minimum Pool size
  134    *
  135    *   <dd> The minimum number of threads to use, when needed (default
  136    *   1).  When a new request is received, and fewer than the minimum
  137    *   number of threads are running, a new thread is always created to
  138    *   handle the request even if other worker threads are idly waiting
  139    *   for work. Otherwise, a new thread is created only if there are
  140    *   fewer than the maximum and the request cannot immediately be
  141    *   queued.  <p>
  142    *
  143    *   <dt> Preallocation
  144    *
  145    *   <dd> You can override lazy thread construction policies via
  146    *   method createThreads, which establishes a given number of warm
  147    *   threads. Be aware that these preallocated threads will time out
  148    *   and die (and later be replaced with others if needed) if not used
  149    *   within the keep-alive time window. If you use preallocation, you
  150    *   probably want to increase the keepalive time.  The difference
  151    *   between setMinimumPoolSize and createThreads is that
  152    *   createThreads immediately establishes threads, while setting the
  153    *   minimum pool size waits until requests arrive.  <p>
  154    *
  155    *   <dt> Keep-alive time
  156    *
  157    *   <dd> If the pool maintained references to a fixed set of threads
  158    *   in the pool, then it would impede garbage collection of otherwise
  159    *   idle threads. This would defeat the resource-management aspects
  160    *   of pools. One solution would be to use weak references.  However,
  161    *   this would impose costly and difficult synchronization issues.
  162    *   Instead, threads are simply allowed to terminate and thus be
  163    *   GCable if they have been idle for the given keep-alive time.  The
  164    *   value of this parameter represents a trade-off between GCability
  165    *   and construction time. In most current Java VMs, thread
  166    *   construction and cleanup overhead is on the order of
  167    *   milliseconds. The default keep-alive value is one minute, which
  168    *   means that the time needed to construct and then GC a thread is
  169    *   expended at most once per minute.  
  170    *   <p> 
  171    *
  172    *   To establish worker threads permanently, use a <em>negative</em>
  173    *   argument to setKeepAliveTime.  <p>
  174    *
  175    *   <dt> Blocked execution policy
  176    *
  177    *   <dd> If the maximum pool size or queue size is bounded, then it
  178    *   is possible for incoming <code>execute</code> requests to
  179    *   block. There are four supported policies for handling this
  180    *   problem, and mechanics (based on the Strategy Object pattern) to
  181    *   allow others in subclasses: <p>
  182    *
  183    *   <dl>
  184    *     <dt> Run (the default)
  185    *     <dd> The thread making the <code>execute</code> request
  186    *          runs the task itself. This policy helps guard against lockup. 
  187    *     <dt> Wait
  188    *     <dd> Wait until a thread becomes available.
  189    *     <dt> Abort
  190    *     <dd> Throw a RuntimeException
  191    *     <dt> Discard 
  192    *     <dd> Throw away the current request and return.
  193    *     <dt> DiscardOldest
  194    *     <dd> Throw away the oldest request and return.
  195    *   </dl>
  196    *
  197    *   Other plausible policies include raising the maximum pool size
  198    *   after checking with some other objects that this is OK.  <p>
  199    *
  200    *   These cases can never occur if the maximum pool size is unbounded
  201    *   or the queue is unbounded.  In these cases you instead face
  202    *   potential resource exhaustion.)  The execute method does not
  203    *   throw any checked exceptions in any of these cases since any
  204    *   errors associated with them must normally be dealt with via
  205    *   handlers or callbacks. (Although in some cases, these might be
  206    *   associated with throwing unchecked exceptions.)  You may wish to
  207    *   add special implementations even if you choose one of the listed
  208    *   policies. For example, the supplied Discard policy does not
  209    *   inform the caller of the drop. You could add your own version
  210    *   that does so.  Since choice of policies is normally a system-wide
  211    *   decision, selecting a policy affects all calls to
  212    *   <code>execute</code>.  If for some reason you would instead like
  213    *   to make per-call decisions, you could add variant versions of the
  214    *   <code>execute</code> method (for example,
  215    *   <code>executeIfWouldNotBlock</code>) in subclasses.  <p>
  216    *
  217    *   <dt> Thread construction parameters
  218    *
  219    *   <dd> A settable ThreadFactory establishes each new thread.  By
  220    *   default, it merely generates a new instance of class Thread, but
  221    *   can be changed to use a Thread subclass, to set priorities,
  222    *   ThreadLocals, etc.  <p>
  223    *
  224    *   <dt> Interruption policy
  225    *
  226    *   <dd> Worker threads check for interruption after processing each
  227    *   command, and terminate upon interruption.  Fresh threads will
  228    *   replace them if needed. Thus, new tasks will not start out in an
  229    *   interrupted state due to an uncleared interruption in a previous
  230    *   task. Also, unprocessed commands are never dropped upon
  231    *   interruption. It would conceptually suffice simply to clear
  232    *   interruption between tasks, but implementation characteristics of
  233    *   interruption-based methods are uncertain enough to warrant this
  234    *   conservative strategy. It is a good idea to be equally
  235    *   conservative in your code for the tasks running within pools.
  236    *   <p>
  237    *
  238    *   <dt> Shutdown policy
  239    *
  240    *   <dd> The interruptAll method interrupts, but does not disable the
  241    *   pool. Two different shutdown methods are supported for use when
  242    *   you do want to (permanently) stop processing tasks. Method
  243    *   shutdownAfterProcessingCurrentlyQueuedTasks waits until all
  244    *   current tasks are finished. The shutDownNow method interrupts
  245    *   current threads and leaves other queued requests unprocessed.
  246    *   <p>
  247    *
  248    *   <dt> Handling requests after shutdown
  249    *
  250    *   <dd> When the pool is shutdown, new incoming requests are handled
  251    *   by the blockedExecutionHandler. By default, the handler is set to
  252    *   discard new requests, but this can be set with an optional
  253    *   argument to method
  254    *   shutdownAfterProcessingCurrentlyQueuedTasks. <p> Also, if you are
  255    *   using some form of queuing, you may wish to call method drain()
  256    *   to remove (and return) unprocessed commands from the queue after
  257    *   shutting down the pool and its clients. If you need to be sure
  258    *   these commands are processed, you can then run() each of the
  259    *   commands in the list returned by drain().
  260    *
  261    * </dl>
  262    * <p>
  263    *
  264    * <b>Usage examples.</b>
  265    * <p>
  266    *
  267    * Probably the most common use of pools is in statics or singletons
  268    * accessible from a number of classes in a package; for example:
  269    *
  270    * <pre>
  271    * class MyPool {
  272    *   // initialize to use a maximum of 8 threads.
  273    *   static PooledExecutor pool = new PooledExecutor(8);
  274    * }
  275    * </pre>
  276    * Here are some sample variants in initialization:
  277    * <ol>
  278    *  <li> Using a bounded buffer of 10 tasks, at least 4 threads (started only
  279    *       when needed due to incoming requests), but allowing
  280    *       up to 100 threads if the buffer gets full.
  281    *     <pre>
  282    *        pool = new PooledExecutor(new BoundedBuffer(10), 100);
  283    *        pool.setMinimumPoolSize(4);
  284    *     </pre>
  285    *  <li> Same as (1), except pre-start 9 threads, allowing them to
  286    *        die if they are not used for five minutes.
  287    *     <pre>
  288    *        pool = new PooledExecutor(new BoundedBuffer(10), 100);
  289    *        pool.setMinimumPoolSize(4);
  290    *        pool.setKeepAliveTime(1000 * 60 * 5);
  291    *        pool.createThreads(9);
  292    *     </pre>
  293    *  <li> Same as (2) except clients block if both the buffer is full and
  294    *       all 100 threads are busy:
  295    *     <pre>
  296    *        pool = new PooledExecutor(new BoundedBuffer(10), 100);
  297    *        pool.setMinimumPoolSize(4);
  298    *        pool.setKeepAliveTime(1000 * 60 * 5);
  299    *        pool.waitWhenBlocked();
  300    *        pool.createThreads(9);
  301    *     </pre>
  302    *  <li> An unbounded queue serviced by exactly 5 threads:
  303    *     <pre>
  304    *        pool = new PooledExecutor(new LinkedQueue());
  305    *        pool.setKeepAliveTime(-1); // live forever
  306    *        pool.createThreads(5);
  307    *     </pre>
  308    *  </ol>
  309    *
  310    * <p>
  311    * <b>Usage notes.</b>
  312    * <p>
  313    *
  314    * Pools do not mesh well with using thread-specific storage via
  315    * java.lang.ThreadLocal.  ThreadLocal relies on the identity of a
  316    * thread executing a particular task. Pools use the same thread to
  317    * perform different tasks.  <p>
  318    *
  319    * If you need a policy not handled by the parameters in this class
  320    * consider writing a subclass.  <p>
  321    *
  322    * Version note: Previous versions of this class relied on
  323    * ThreadGroups for aggregate control. This has been removed, and the
  324    * method interruptAll added, to avoid differences in behavior across
  325    * JVMs.
  326    *
  327    * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
  328    **/
  329   
  330   public class PooledExecutor extends ThreadFactoryUser implements Executor {
  331   
  332     /** 
  333      * The maximum pool size; used if not otherwise specified.  Default
  334      * value is essentially infinite (Integer.MAX_VALUE)
  335      **/
  336     public static final int  DEFAULT_MAXIMUMPOOLSIZE = Integer.MAX_VALUE;
  337   
  338     /** 
  339      * The minimum pool size; used if not otherwise specified.  Default
  340      * value is 1.
  341      **/
  342     public static final int  DEFAULT_MINIMUMPOOLSIZE = 1;
  343   
  344     /**
  345      * The maximum time to keep worker threads alive waiting for new
  346      * tasks; used if not otherwise specified. Default value is one
  347      * minute (60000 milliseconds).
  348      **/
  349     public static final long DEFAULT_KEEPALIVETIME = 60 * 1000;
  350   
  351     /** The maximum number of threads allowed in pool. **/
  352     protected int maximumPoolSize_ = DEFAULT_MAXIMUMPOOLSIZE;
  353   
  354     /** The minumum number of threads to maintain in pool. **/
  355     protected int minimumPoolSize_ = DEFAULT_MINIMUMPOOLSIZE;
  356   
  357     /**  Current pool size.  **/
  358     protected int poolSize_ = 0;
  359   
  360     /** The maximum time for an idle thread to wait for new task. **/
  361     protected long keepAliveTime_ = DEFAULT_KEEPALIVETIME;
  362   
  363     /** 
  364      * Shutdown flag - latches true when a shutdown method is called 
  365      * in order to disable queuing/handoffs of new tasks.
  366      **/
  367     protected boolean shutdown_ = false;
  368   
  369     /**
  370      * The channel used to hand off the command to a thread in the pool.
  371      **/
  372     protected final Channel handOff_;
  373   
  374     /**
  375      * The set of active threads, declared as a map from workers to
  376      * their threads.  This is needed by the interruptAll method.  It
  377      * may also be useful in subclasses that need to perform other
  378      * thread management chores.
  379      **/
  380     protected final Map threads_;
  381   
  382     /** The current handler for unserviceable requests. **/
  383     protected BlockedExecutionHandler blockedExecutionHandler_;
  384   
  385     /** 
  386      * Create a new pool with all default settings
  387      **/
  388   
  389     public PooledExecutor() {
  390       this (new SynchronousChannel(), DEFAULT_MAXIMUMPOOLSIZE);
  391     }
  392   
  393     /** 
  394      * Create a new pool with all default settings except
  395      * for maximum pool size.
  396      **/
  397   
  398     public PooledExecutor(int maxPoolSize) {
  399       this(new SynchronousChannel(), maxPoolSize);
  400     }
  401   
  402     /** 
  403      * Create a new pool that uses the supplied Channel for queuing, and
  404      * with all default parameter settings.
  405      **/
  406   
  407     public PooledExecutor(Channel channel) {
  408       this(channel, DEFAULT_MAXIMUMPOOLSIZE);
  409     }
  410   
  411     /** 
  412      * Create a new pool that uses the supplied Channel for queuing, and
  413      * with all default parameter settings except for maximum pool size.
  414      **/
  415   
  416     public PooledExecutor(Channel channel, int maxPoolSize) {
  417       maximumPoolSize_ = maxPoolSize;
  418       handOff_ = channel;
  419       runWhenBlocked();
  420       threads_ = new HashMap();
  421     }
  422     
  423     /** 
  424      * Return the maximum number of threads to simultaneously execute
  425      * New unqueued requests will be handled according to the current
  426      * blocking policy once this limit is exceeded.
  427      **/
  428     public synchronized int getMaximumPoolSize() { 
  429       return maximumPoolSize_; 
  430     }
  431   
  432     /** 
  433      * Set the maximum number of threads to use. Decreasing the pool
  434      * size will not immediately kill existing threads, but they may
  435      * later die when idle.
  436      * @exception IllegalArgumentException if less or equal to zero.
  437      * (It is
  438      * not considered an error to set the maximum to be less than than
  439      * the minimum. However, in this case there are no guarantees
  440      * about behavior.)
  441      **/
  442     public synchronized void setMaximumPoolSize(int newMaximum) { 
  443       if (newMaximum <= 0) throw new IllegalArgumentException();
  444       maximumPoolSize_ = newMaximum; 
  445     }
  446   
  447     /** 
  448      * Return the minimum number of threads to simultaneously execute.
  449      * (Default value is 1).  If fewer than the mininum number are
  450      * running upon reception of a new request, a new thread is started
  451      * to handle this request.
  452      **/
  453     public synchronized int getMinimumPoolSize() { 
  454       return minimumPoolSize_; 
  455     }
  456   
  457     /** 
  458      * Set the minimum number of threads to use. 
  459      * @exception IllegalArgumentException if less than zero. (It is not
  460      * considered an error to set the minimum to be greater than the
  461      * maximum. However, in this case there are no guarantees about
  462      * behavior.)
  463      **/
  464     public synchronized void setMinimumPoolSize(int newMinimum) { 
  465       if (newMinimum < 0) throw new IllegalArgumentException();
  466       minimumPoolSize_ = newMinimum; 
  467     }
  468     
  469     /** 
  470      * Return the current number of active threads in the pool.  This
  471      * number is just a snaphot, and may change immediately upon
  472      * returning
  473      **/
  474     public synchronized int getPoolSize() { 
  475       return poolSize_; 
  476     }
  477   
  478     /** 
  479      * Return the number of milliseconds to keep threads alive waiting
  480      * for new commands. A negative value means to wait forever. A zero
  481      * value means not to wait at all.
  482      **/
  483     public synchronized long getKeepAliveTime() { 
  484       return keepAliveTime_; 
  485     }
  486   
  487     /** 
  488      * Set the number of milliseconds to keep threads alive waiting for
  489      * new commands. A negative value means to wait forever. A zero
  490      * value means not to wait at all.
  491      **/
  492     public synchronized void setKeepAliveTime(long msecs) { 
  493       keepAliveTime_ = msecs; 
  494     }
  495   
  496     /** Get the handler for blocked execution **/
  497     public synchronized BlockedExecutionHandler getBlockedExecutionHandler() {
  498       return blockedExecutionHandler_;
  499     }
  500   
  501     /** Set the handler for blocked execution **/
  502     public synchronized void setBlockedExecutionHandler(BlockedExecutionHandler h) {
  503       blockedExecutionHandler_ = h;
  504     }
  505   
  506     /**
  507      * Create and start a thread to handle a new command.  Call only
  508      * when holding lock.
  509      **/
  510     protected void addThread(Runnable command) {
  511       Worker worker = new Worker(command);
  512       Thread thread = getThreadFactory().newThread(worker);
  513       threads_.put(worker, thread);
  514       ++poolSize_;
  515       thread.start();
  516     }
  517   
  518     /**
  519      * Create and start up to numberOfThreads threads in the pool.
  520      * Return the number created. This may be less than the number
  521      * requested if creating more would exceed maximum pool size bound.
  522      **/
  523     public int createThreads(int numberOfThreads) {
  524       int ncreated = 0;
  525       for (int i = 0; i < numberOfThreads; ++i) {
  526         synchronized(this) { 
  527           if (poolSize_ < maximumPoolSize_) {
  528             addThread(null);
  529             ++ncreated;
  530           }
  531           else 
  532             break;
  533         }
  534       }
  535       return ncreated;
  536     }
  537   
  538     /**
  539      * Interrupt all threads in the pool, causing them all to
  540      * terminate. Assuming that executed tasks do not disable (clear)
  541      * interruptions, each thread will terminate after processing its
  542      * current task. Threads will terminate sooner if the executed tasks
  543      * themselves respond to interrupts.
  544      **/
  545     public synchronized void interruptAll() {
  546       for (Iterator it = threads_.values().iterator(); it.hasNext(); ) {
  547         Thread t = (Thread)(it.next());
  548         t.interrupt();
  549       }
  550     }
  551   
  552     /**
  553      * Interrupt all threads and disable construction of new
  554      * threads. Any tasks entered after this point will be discarded. A
  555      * shut down pool cannot be restarted.
  556      */
  557     public void shutdownNow() {
  558       shutdownNow(new DiscardWhenBlocked());
  559     }
  560   
  561     /**
  562      * Interrupt all threads and disable construction of new
  563      * threads. Any tasks entered after this point will be handled by
  564      * the given BlockedExecutionHandler.  A shut down pool cannot be
  565      * restarted.
  566      */
  567     public synchronized void shutdownNow(BlockedExecutionHandler handler) {
  568       setBlockedExecutionHandler(handler);
  569       shutdown_ = true; // don't allow new tasks
  570       minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads
  571       interruptAll(); // interrupt all existing threads
  572     }
  573   
  574     /**
  575      * Terminate threads after processing all elements currently in
  576      * queue. Any tasks entered after this point will be discarded. A
  577      * shut down pool cannot be restarted.
  578      **/
  579     public void shutdownAfterProcessingCurrentlyQueuedTasks() {
  580       shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked());
  581     }
  582   
  583     /**
  584      * Terminate threads after processing all elements currently in
  585      * queue. Any tasks entered after this point will be handled by the
  586      * given BlockedExecutionHandler.  A shut down pool cannot be
  587      * restarted.
  588      **/
  589     public synchronized void shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler handler) {
  590       setBlockedExecutionHandler(handler);
  591       shutdown_ = true;
  592       if (poolSize_ == 0) // disable new thread construction when idle
  593         minimumPoolSize_ = maximumPoolSize_ = 0;
  594     }
  595   
  596     /** 
  597      * Return true if a shutDown method has succeeded in terminating all
  598      * threads.
  599      */
  600     public synchronized boolean isTerminatedAfterShutdown() {
  601       return shutdown_ && poolSize_ == 0;
  602     }
  603   
  604     /**
  605      * Wait for a shutdown pool to fully terminate, or until the timeout
  606      * has expired. This method may only be called <em>after</em>
  607      * invoking shutdownNow or
  608      * shutdownAfterProcessingCurrentlyQueuedTasks.
  609      *
  610      * @param maxWaitTime  the maximum time in milliseconds to wait
  611      * @return true if the pool has terminated within the max wait period
  612      * @exception IllegalStateException if shutdown has not been requested
  613      * @exception InterruptedException if the current thread has been interrupted in the course of waiting
  614      */
  615     public synchronized boolean awaitTerminationAfterShutdown(long maxWaitTime) throws InterruptedException {
  616       if (!shutdown_)
  617         throw new IllegalStateException();
  618       if (poolSize_ == 0)
  619         return true;
  620       long waitTime = maxWaitTime;
  621       if (waitTime <= 0)
  622         return false;
  623       long start = System.currentTimeMillis();
  624       for (;;) {
  625         wait(waitTime);
  626         if (poolSize_ == 0)
  627           return true;
  628         waitTime = maxWaitTime - (System.currentTimeMillis() - start);
  629         if (waitTime <= 0) 
  630           return false;
  631       }
  632     }
  633   
  634     /**
  635      * Wait for a shutdown pool to fully terminate.  This method may
  636      * only be called <em>after</em> invoking shutdownNow or
  637      * shutdownAfterProcessingCurrentlyQueuedTasks.
  638      *
  639      * @exception IllegalStateException if shutdown has not been requested
  640      * @exception InterruptedException if the current thread has been interrupted in the course of waiting
  641      */
  642     public synchronized void awaitTerminationAfterShutdown() throws InterruptedException {
  643       if (!shutdown_)
  644         throw new IllegalStateException();
  645       while (poolSize_ > 0)
  646         wait();
  647     }
  648   
  649     /**
  650      * Remove all unprocessed tasks from pool queue, and return them in
  651      * a java.util.List. Thsi method should be used only when there are
  652      * not any active clients of the pool. Otherwise you face the
  653      * possibility that the method will loop pulling out tasks as
  654      * clients are putting them in.  This method can be useful after
  655      * shutting down a pool (via shutdownNow) to determine whether there
  656      * are any pending tasks that were not processed.  You can then, for
  657      * example execute all unprocessed commands via code along the lines
  658      * of:
  659      *
  660      * <pre>
  661      *   List tasks = pool.drain();
  662      *   for (Iterator it = tasks.iterator(); it.hasNext();) 
  663      *     ( (Runnable)(it.next()) ).run();
  664      * </pre>
  665      **/
  666     public List drain() {
  667       boolean wasInterrupted = false;
  668       Vector tasks = new Vector();
  669       for (;;) {
  670         try {
  671           Object x = handOff_.poll(0);
  672           if (x == null) 
  673             break;
  674           else
  675             tasks.addElement(x);
  676         }
  677         catch (InterruptedException ex) {
  678           wasInterrupted = true; // postpone re-interrupt until drained
  679         }
  680       }
  681       if (wasInterrupted) Thread.currentThread().interrupt();
  682       return tasks;
  683     }
  684     
  685     /** 
  686      * Cleanup method called upon termination of worker thread.
  687      **/
  688     protected synchronized void workerDone(Worker w) {
  689       threads_.remove(w);
  690       if (--poolSize_ == 0 && shutdown_) { 
  691         maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads
  692         notifyAll(); // notify awaitTerminationAfterShutdown
  693       }
  694     }
  695   
  696     /** 
  697      * Get a task from the handoff queue, or null if shutting down.
  698      **/
  699     protected Runnable getTask() throws InterruptedException {
  700       long waitTime;
  701       synchronized(this) {
  702         if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads
  703           return null;
  704         waitTime = (shutdown_)? 0 : keepAliveTime_;
  705       }
  706       if (waitTime >= 0) 
  707         return (Runnable)(handOff_.poll(waitTime));
  708       else 
  709         return (Runnable)(handOff_.take());
  710     }
  711     
  712   
  713     /**
  714      * Class defining the basic run loop for pooled threads.
  715      **/
  716     protected class Worker implements Runnable {
  717       protected Runnable firstTask_;
  718   
  719       protected Worker(Runnable firstTask) { firstTask_ = firstTask; }
  720   
  721       public void run() {
  722         try {
  723           Runnable task = firstTask_;
  724           firstTask_ = null; // enable GC
  725   
  726           if (task != null) {
  727             task.run();
  728             task = null;
  729           }
  730           
  731           while ( (task = getTask()) != null) {
  732             task.run();
  733             task = null;
  734           }
  735         }
  736         catch (InterruptedException ex) { } // fall through
  737         finally {
  738           workerDone(this);
  739         }
  740       }
  741     }
  742   
  743     /**
  744      * Class for actions to take when execute() blocks. Uses Strategy
  745      * pattern to represent different actions. You can add more in
  746      * subclasses, and/or create subclasses of these. If so, you will
  747      * also want to add or modify the corresponding methods that set the
  748      * current blockedExectionHandler_.
  749      **/
  750     public interface BlockedExecutionHandler {
  751       /** 
  752        * Return true if successfully handled so, execute should
  753        * terminate; else return false if execute loop should be retried.
  754        **/
  755       boolean blockedAction(Runnable command) throws InterruptedException;
  756     }
  757   
  758     /** Class defining Run action. **/
  759     protected class RunWhenBlocked implements BlockedExecutionHandler {
  760       public boolean blockedAction(Runnable command) {
  761         command.run();
  762         return true;
  763       }
  764     }
  765   
  766     /** 
  767      * Set the policy for blocked execution to be that the current
  768      * thread executes the command if there are no available threads in
  769      * the pool.
  770      **/
  771     public void runWhenBlocked() {
  772       setBlockedExecutionHandler(new RunWhenBlocked());
  773     }
  774   
  775     /** Class defining Wait action. **/
  776     protected class WaitWhenBlocked implements BlockedExecutionHandler {
  777       public boolean blockedAction(Runnable command) throws InterruptedException{
  778         handOff_.put(command);
  779         return true;
  780       }
  781     }
  782   
  783     /** 
  784      * Set the policy for blocked execution to be to wait until a thread
  785      * is available.
  786      **/
  787     public void waitWhenBlocked() {
  788       setBlockedExecutionHandler(new WaitWhenBlocked());
  789     }
  790   
  791     /** Class defining Discard action. **/
  792     protected class DiscardWhenBlocked implements BlockedExecutionHandler {
  793       public boolean blockedAction(Runnable command) {
  794         return true;
  795       }
  796     }
  797   
  798     /** 
  799      * Set the policy for blocked execution to be to return without
  800      * executing the request.
  801      **/
  802     public void discardWhenBlocked() {
  803       setBlockedExecutionHandler(new DiscardWhenBlocked());
  804     }
  805   
  806   
  807     /** Class defining Abort action. **/
  808     protected class AbortWhenBlocked implements BlockedExecutionHandler {
  809       public boolean blockedAction(Runnable command) {
  810         throw new AbortException();
  811       }
  812     }
  813     public class AbortException extends RuntimeException {
  814       public AbortException() {
  815       }
  816     }
  817   
  818     /** 
  819      * Set the policy for blocked execution to be to
  820      * throw a RuntimeException.
  821      **/
  822     public void abortWhenBlocked() {
  823       setBlockedExecutionHandler(new AbortWhenBlocked());
  824     }
  825   
  826   
  827     /**
  828      * Class defining DiscardOldest action.  Under this policy, at most
  829      * one old unhandled task is discarded.  If the new task can then be
  830      * handed off, it is.  Otherwise, the new task is run in the current
  831      * thread (i.e., RunWhenBlocked is used as a backup policy.)
  832      **/
  833     protected class DiscardOldestWhenBlocked implements BlockedExecutionHandler {
  834       public boolean blockedAction(Runnable command) throws InterruptedException{
  835         handOff_.poll(0);
  836         if (!handOff_.offer(command, 0))
  837           command.run();
  838         return true;
  839       }
  840     }
  841   
  842     /** 
  843      * Set the policy for blocked execution to be to discard the oldest
  844      * unhandled request
  845      **/
  846     public void discardOldestWhenBlocked() {
  847       setBlockedExecutionHandler(new DiscardOldestWhenBlocked());
  848     }
  849   
  850     /**
  851      * Arrange for the given command to be executed by a thread in this
  852      * pool.  The method normally returns when the command has been
  853      * handed off for (possibly later) execution.
  854      **/
  855     public void execute(Runnable command) throws InterruptedException {
  856       for (;;) {
  857         synchronized(this) { 
  858           if (!shutdown_) {
  859             int size = poolSize_;
  860   
  861             // Ensure minimum number of threads
  862             if (size < minimumPoolSize_) {
  863               addThread(command);
  864               return;
  865             }
  866             
  867             // Try to give to existing thread
  868             if (handOff_.offer(command, 0)) { 
  869               return;
  870             }
  871             
  872             // If cannot handoff and still under maximum, create new thread
  873             if (size < maximumPoolSize_) {
  874               addThread(command);
  875               return;
  876             }
  877           }
  878         }
  879   
  880         // Cannot hand off and cannot create -- ask for help
  881         if (getBlockedExecutionHandler().blockedAction(command)) {
  882           return;
  883         }
  884       }
  885     }
  886   }

Home » concurrent-sources » EDU.oswego.cs.dl.util.concurrent » [javadoc | source]