/* * BioJava development code * * This code may be freely distributed and modified under the * terms of the GNU Lesser General Public Licence. This should * be distributed with the code. If you do not have a copy, * see: * * http://www.gnu.org/copyleft/lesser.html * * Copyright for this code is held jointly by the individual * authors. These should be listed in @author doc comments. * * For more information on the BioJava project and its aims, * or to join the biojava-l mailing list, visit the home page * at: * * http://www.biojava.org/ * */ package org.biojava.utils; import java.util.LinkedList; /** *
SimpleThreadPool
is a basic implementation of
* ThreadPool
for use where we don't wish to introduce a
* dependency on a 3rd-party pool. In general, objects which require a
* pool should only use the interface and parameterize such that other
* implementations may be dropped in in place of this one, possibly
* using this one as a fallback.
This class offers a service for running Runnable
s
* using multiple threads, the number of which is specified in the
* constructor. Runnable
s are queued in a simple FIFO
* queue. The worker threads wait on the queue when it is empty and
* are notified when a new Runnable
is submitted.
This implementation will prevent an application from exiting
* until stopThreads()
is called unless the pool contains
* daemon threads.
SimpleThreadPool
containing 4
* non-daemon threads and starts them. The threads have priority
* Thread.NORM_PRIORITY. Because threads are non-deamon you will need
* to call stopThreads() to terminate them.
*/
public SimpleThreadPool()
{
this(4, false);
}
/**
* Creates a new SimpleThreadPool
containing the
* specified number of threads and starts them. The threads have
* priority Thread.NORM_PRIORITY.
*
* @param threadCount an int
thread count.
* @param daemon a boolean
indicating whether the
* threads should be daemons. If threads are non-deamon you will need
* to call stopThreads() to terminate them.
*/
public SimpleThreadPool(int threadCount, boolean daemon)
{
this(threadCount, daemon, Thread.NORM_PRIORITY);
}
/**
* Creates a new SimpleThreadPool
containing the
* specified number of threads and starts them.
*
* @param threadCount an int
thread count.
* @param daemon a boolean
indicating whether the
* threads should be daemons. If threads are non-deamon you will need
* to call stopThreads() to terminate them.
* @param priority an int
priority for the threads.
*/
public SimpleThreadPool(int threadCount, boolean daemon, int priority)
{
this.daemon = daemon;
this.priority = priority;
queue = new LinkedList();
threads = new PooledThread[threadCount];
stopped = true;
waiting = false;
startThreads();
}
public void addRequest(Runnable task)
{
if (waiting || stopped)
throw new IllegalStateException("Thread pool has been closed to new requests");
synchronized(queue)
{
queue.add(task);
// Notify threads blocked in nextRequest()
queue.notifyAll();
}
}
public void startThreads()
{
if (! stopped)
throw new IllegalStateException("Thread pool is already started");
stopped = false;
synchronized(threads)
{
for (int i = 0; i < threads.length; i++)
{
threads[i] = new PooledThread();
if (daemon)
threads[i].setDaemon(true);
threads[i].setPriority(priority);
threads[i].start();
}
}
}
/**
* Waits for all working threads to return and then stops them. If the
* thread pool contains non-daemon threads you will have to call this method
* to make your program return.
* @throws IllegalStateException if the pool is already stopped.
*/
public void stopThreads()
{
if (stopped)
throw new IllegalStateException("Thread pool has already been stopped");
stopped = true;
synchronized(queue)
{
// Ensure working threads return and die
while (threadsAlive() > 0)
{
try
{
queue.wait(500);
queue.notifyAll();
}
catch (InterruptedException ie) { }
}
}
}
public void waitForThreads()
{
if (stopped)
throw new IllegalStateException("Thread pool has been stopped");
waiting = true;
synchronized(threads)
{
// Ensure queue gets emptied and all work is done
while (! queue.isEmpty() || threadsWorking() > 0)
{
try
{
threads.wait();
}
catch (InterruptedException ie) { }
}
}
waiting = false;
}
/**
* threadsWorking
returns the number of threads
* currently performing work.
*
* @return an int
.
*/
public int threadsWorking()
{
int workingCount = 0;
synchronized(threads)
{
for (int i = 0; i < threads.length; i++)
if (threads[i].working)
workingCount++;
}
return workingCount;
}
/**
* threadsIdle
returns the number of threads
* currently waiting for work.
*
* @return an int
.
*/
public int threadsIdle()
{
return threads.length - threadsWorking();
}
/**
* requestsQueued
returns the number of
* Runnable
s currently queued.
*
* @return an int
.
*/
public int requestsQueued()
{
return queue.size();
}
/**
* threadsAlive
returns the number of threads
* currently alive.
*
* @return an int
.
*/
protected int threadsAlive()
{
int aliveCount = 0;
synchronized(threads)
{
for (int i = 0; i < threads.length; i++)
if (threads[i].isAlive())
aliveCount++;
}
return aliveCount;
}
/**
* nextRequest
gets the next Runnable
* from the queue. This method blocks if the queue is empty and
* the pool has not stopped. If the pool has stopped it returns
* null.
*
* @return a Runnable
or null if the pool has been
* stopped.
*/
protected Runnable nextRequest()
{
synchronized(queue)
{
try
{
while (! stopped && queue.isEmpty())
queue.wait();
}
catch (InterruptedException ie) { }
if (stopped)
return null;
else
return (Runnable) queue.removeFirst();
}
}
/**
* PooledThread
is a thread class which works within
* the pool. It sets its boolean flag true when working,
* synchronizing this on the array which contains all the
* PooledThread
s.
*/
private class PooledThread extends Thread
{
boolean working = false;
public void run()
{
while (true)
{
Runnable task = nextRequest();
// If the pool is stopped the queue returns null and
// the thread dies
if (task == null)
break;
// Synchronize on thread array to update state
synchronized(threads)
{
working = true;
}
task.run();
synchronized(threads)
{
working = false;
threads.notify();
}
}
}
}
}