_JAVA Q&A_ by Cliff Berg Listing One /* Scheduler.java -- A scheduler for scheduling asynchronous future events. * Provides for specifying event execution priority, and for recurring events. */ /** Callback object interface. */ interface Schedulable { public void onEvent(Object arg); public void onAbort(); } /** A scheduler for scheduling asynchronous future events. For each scheduled * event, a thread is allocated from a thread pool, and then the thread waits * until the event time has arrived. The thread then invokes onEvent() on the * specified object. When onEvent() completes, the thread is returned to the * thread pool. There are also provisions for specifying an event execution * priority; and for recurring events. If an event is recurring, its thread is * permanently allocated, and is never returned to the thread pool. */ class Scheduler { protected EventThread[] threads; // the thread pool protected boolean[] allocated; // allocation flag for each thread /* Constructor. Parameter specifies how large thread pool should be. */ public Scheduler(int noOfThreads) { threads = new EventThread[noOfThreads]; allocated = new boolean[noOfThreads]; // Create the thread pool for (int i = 0; i < threads.length; i++) { allocated[i] = false; EventThread thread = new EventThread(this); threads[i] = thread; synchronized (thread) { thread.start(); // this will start a separate thread, // which will then call wait() and block (which // releases its monitor so we can get past the next line... try { thread.wait(); // this blocks until the thread calls // notify() and then blocks (which releases its monitor, // allowing this statement to return) } catch (InterruptedException ex) { } } } // We can now be sure that all threads are created and are in a blocked // state, waiting to call notify(). That is what schedule() does. } /** Gracefully stop the threads in the thread pool. */ public void finalize() { // Stop all the threads that are not blocked... for (int i = 0; i < threads.length; i++) { if (threads[i].isAlive()) threads[i].stop(); } } /** Schedule thread to run at a future time. The Schedulable.onEvent() * method will be called for object after a real-time delay of "delay" * milliseconds. The thread's handle is returned, which we can later * use to reschedule the thread. If "recurring" is true, then a new * event will be scheduled automatically each time onEvent() is called. * This method must be synchronized in case multiple threads in an * application call this method for this Scheduler object. */ public synchronized EventThread schedule(long delay, Schedulable object, Object arg, int priority, boolean recurring) throws InsufficientThreadsException { // Pick a thread from the thread pool EventThread thread = allocateAThread(); if (thread == null) throw new InsufficientThreadsException(); // Schedule an event for that thread thread.schedule(delay, object, arg, priority, recurring); // Return a unique identifier for the event return thread; } /** Unschedule the future event for a thread, and reschedule it. */ public void reschedule(EventThread thread, long delay, Schedulable object, Object arg, int priority, boolean recurring) throws InsufficientThreadsException { thread.reschedule(delay, object, arg, priority, recurring); } /** Allocate a thread from the thread pool. */ protected EventThread allocateAThread() { for (int i = 0; i < threads.length; i++) { if (allocated[i]) continue; allocated[i] = true; return threads[i]; } return null; } /** Return a thread to thread pool. This method must be synchronized * in case multiple threads in an application call this method for this * Scheduler object, and also because threads call this to return * themselves to the thread pool. */ public synchronized void deallocateThread(EventThread thread) { for (int i = 0; i < threads.length; i++) { if (threads[i] == thread) { allocated[i] = false; return; } } } } /** A schedulable thread. */ class EventThread extends Thread { protected Scheduler scheduler; // the scheduler for this thread protected long delay; // wait for this time protected Schedulable object; // then call onEvent() for this object protected Object arg; // a parameter for onEvent() protected int priority; // execute onEvent() at this priority protected boolean recurring; // reschedule the event every time /** The constructor. */ public EventThread(Scheduler scheduler) { this.scheduler = scheduler; } /** The thread's run method */ public synchronized void run() { notify(); // this notifies the scheduler, which is waiting to be // notified, that it should unfreeze itself in the thread's // monitor queue. for (;;) // ever { // Wait to be called upon; blocks until a call to schedule() // results in a notify() System.out.println(Thread.currentThread().getName() + " going to sleep"); try { if (! recurring) wait(); } catch (InterruptedException ex) { } System.out.println(Thread.currentThread().getName() +" awake now"); // Sleep until the specified time has elapsed try { // This is the real wait, requested by the scheduler System.out.println(Thread.currentThread().getName() + " waiting for " + delay + " milliseconds"); wait(delay); } catch (InterruptedException ex) { // We are being rescheduled: abort the current sleep System.out.println(Thread.currentThread().getName() + " unscheduled"); recurring = false; notify(); continue; } // Do the callback, at the specified priority int savep = getPriority(); setPriority(priority); try { object.onEvent(arg); } catch (ThreadDeath td) { object.onAbort(); throw td; } setPriority(savep); // Thread's work is done; return it to the pool if (! recurring) scheduler.deallocateThread(this); } } /** Schedule a wakeup for this thread. Note that this is called from * other threads, including possibly from the main thread. */ public synchronized void schedule(long delay, Schedulable object, Object arg, int priority, boolean recurring) { this.delay = delay; this.object = object; this.arg = arg; this.priority = priority; this.recurring = recurring; // Now, resume this thread to tell it how long to wait for System.out.println( Thread.currentThread().getName() +" waking up thread "+ getName()); notify(); } /** Reschedule this thread. */ public synchronized void reschedule(long delay, Schedulable object, Object arg, int priority, boolean recurring) throws InsufficientThreadsException { stop(new InterruptedException()); try { wait(); // wait until the stopped thread blocks } catch (InterruptedException ex) { } schedule(delay, object, arg, priority, recurring); } } /** If there are insufficient threads in the thread pool. */ class InsufficientThreadsException extends Exception { public InsufficientThreadsException() { super("InsufficientThreadsException"); } public InsufficientThreadsException(String s) { super(s); } } Listing Two /** Test the thread Scheduler. */ public class test implements Schedulable { Scheduler scheduler = new Scheduler(2); public static void main(String args[]) { test t = new test(); int eid = 0; try { System.out.println("1:"); t.scheduler.schedule(3000, t, "A", 8, false); System.out.println("2:"); EventThread th = t.scheduler.schedule(3000, t, "B", 3, true); System.out.println("3:"); t.scheduler.reschedule(th, 8000, t, "X", 5, false); } catch (InsufficientThreadsException ex) { System.out.println(ex); } } public void onEvent(Object arg) { System.out.println(Thread.currentThread().getName() + ":Hi there " + (String)arg); } public void onAbort() { System.out.println("Clean up whatever we were doing!"); } }