_EVENT-DRIVEN THREADS IN C++_ by Dan Ford Listing One // Thread.h #if !defined(THREADS_INC) #define THREADS_INC //---------------------- Constants and Types ----------------------- const int THRDS_DEF_STACK = 8192; // default stack size typedef void FNTHREADPROC (VOID * ulArg); // thread procedure type typedef FNTHREADPROC* PFNTHREADPROC; //------------------------------ Class ----------------------------- class Thread { public: Thread( PFNTHREADPROC pfnThread, // Constructor ULONG ulStack=THRDS_DEF_STACK); virtual ~Thread(); // Destructor virtual VOID Start (ULONG arg=0L); VOID Stop() { DosKillThread(idThread); } VOID Resume() { DosResumeThread(idThread); } VOID Suspend() { DosSuspendThread(idThread); } TID GetTID() { return idThread; } private: ULONG ulStackSize; TID idThread; PFNTHREADPROC pfnThreadProc; }; #endif Listing Two // Thread.cpp //------------------------- Includes ------------------------------ #define INCL_DOS #include #include #include "Thread.h" //--------------------------- code -------------------------------- Thread::Thread(PFNTHREADPROC pfnThread, ULONG ulStack) { ulStackSize = ulStack; pfnThreadProc = pfnThread; } Thread::~Thread() {} // empty implementation VOID Thread::Start (ULONG arg) { idThread = _beginthread(pfnThreadProc, ulStackSize, (void*)arg); } Listing Three // QThread.h #if !defined(QTHREAD_INC) #define QTHREAD_INC //----------------------------- Includes --------------------------- #include "Thread.h" //----------------------------- defines ---------------------------- const int QTHRD_DEF_QSIZE = 0L; //------------------------------ Types ----------------------------- class QThread; // forward declaration typedef VOID FNQTHPROC (QThread *, ULONG); // QThread Procedure type typedef FNQTHPROC * PFNQTHPROC; //------------------------------ Class ----------------------------- class QThread : public Thread { public: QThread ( ULONG ulQueueSize=QTHRD_DEF_QSIZE, // Constructor ULONG ulStackSize=THRDS_DEF_STACK); ~QThread(); // Destructor VOID Start(ULONG ulArg=0L); virtual VOID SendMsg(ULONG objAddr, ULONG msg, MPARAM mp1, MPARAM mp2) = 0; protected: virtual VOID MsgLoop(); virtual BOOL GetMessage(QMSG & qmsg) = 0; // pure virtual virtual VOID DispatchMsg (QMSG & qmsg) = 0; // pure virtual virtual VOID Startup(ULONG ulArg) = 0; // pure virtual virtual BOOL Shutdown(ULONG ulArg) = 0; // pure virtual ULONG ulQSize; ULONG ulParam; // initial argument passed in when thread is started private: static VOID threadProc(QThread*); // static thread procedure }; #endif Listing Four // QThread.cpp //------------------------- Includes ------------------------------ #define INCL_WIN #define INCL_DOS #include #include "QThread.h" //--------------------------- code -------------------------------- QThread::QThread ( ULONG ulQueueSize, ULONG ulStack): Thread((PFNTHREADPROC)this->threadProc, ulStack), ulQSize(ulQueueSize) {} QThread::~QThread() {} VOID QThread::Start(ULONG ulArg) { ulParam = ulArg; this -> Thread::Start((ULONG)this); } VOID QThread::MsgLoop() { QMSG qmsg; while (this -> GetMessage(qmsg)) this -> DispatchMsg(qmsg); } VOID QThread::threadProc (QThread* pQThrd) { pQThrd->Startup(pQThrd->ulParam); pQThrd->MsgLoop(); pQThrd->Shutdown(pQThrd->ulParam); } Listing Five // MsgThrd.h #if !defined(MSGTHREAD_INC) #define MSGTHREAD_INC //----------------------------- Includes --------------------------- #include "QThread.h" #include "MsgQ.h" //----------------------------- defines ---------------------------- const USHORT MSG_DEF_QSIZE = 10; //---------------------------------------------------- // The following two values are reserved messages ID's. All MsgThread's must be // prepared to receive them. All other message ID's are user defined. const ULONG MSG_THRD_SHUTDOWN = 0; // Received during shutdown const ULONG MSG_THRD_STARTUP = 1; // Received at startup const ULONG MSG_THRD_USER = 2; // First user defined msg ID //------------------------------ Types ----------------------------- typedef VOID FNMSGTHRDPROC (ULONG objAddr, ULONG msgID, MPARAM mp1, MPARAM mp2, ULONG ulParam); typedef FNMSGTHRDPROC* PFNMSGTHRDPROC; //------------------------------ Class ----------------------------- class MsgThread : public QThread { public: MsgThread ( PFNMSGTHRDPROC pfn, USHORT usQSize=MSG_DEF_QSIZE, ULONG ulStack=THRDS_DEF_STACK); ~MsgThread (); VOID SendMsg (ULONG objAddr, ULONG msgID, MPARAM mp1, MPARAM mp2) { pMsgQ->PostMsg(objAddr, msgID, mp1, mp2); } protected: BOOL GetMessage (QMSG & qmsg) { return pMsgQ->WaitMsg(qmsg); } VOID DispatchMsg (QMSG & qmsg) { pfnMsg((ULONG)qmsg.hwnd,qmsg.msg,qmsg.mp1,qmsg.mp2,ulParam); } VOID Startup (ULONG ulArg) { pfnMsg((ULONG)this, MSG_THRD_STARTUP, (MPARAM)ulArg, (MPARAM)NULL,ulArg); } BOOL Shutdown(ULONG ulArg); private: MsgQueue* pMsgQ; // pointer to msg queue PFNMSGTHRDPROC pfnMsg; // pointer to client thread proc }; #endif Listing Six // MsgThrd.cpp //------------------------- Includes ------------------------------ #define INCL_WIN #define INCL_DOS #include #include "MsgThrd.h" //--------------------------- code -------------------------------- MsgThread::MsgThread ( PFNMSGTHRDPROC pfn, USHORT usQSize, ULONG ulStack) : QThread(usQSize,ulStack), pfnMsg(pfn) { pMsgQ = new MsgQueue(usQSize); } MsgThread::~MsgThread () { delete(pMsgQ); } BOOL MsgThread::Shutdown(ULONG ulArg) { pfnMsg((ULONG)NULL, MSG_THRD_SHUTDOWN, 0L, 0L, ulArg); return TRUE; } Listing Seven // Msgq.h #if !defined(MSGQUEUE_INC) #define MSGQUEUE_INC //-------------------------- defines ------------------------------- const USHORT MQ_DEF_QSIZE = 10; //------------------------------ Class ----------------------------- class MsgQueue { public: MsgQueue (USHORT usQSz=MQ_DEF_QSIZE); ~MsgQueue (); //-------------------------------------------------------------- // This method blocks until it acquires the mutual exclusion // semaphore for the queue. It then calls the private // method QPut to add the message to the queue. VOID PostMsg (ULONG hobj, ULONG msg, MPARAM mp1, MPARAM mp2); //-------------------------------------------------------------- // This method blocks until a message is available on the queue. // It then obtains the necessary mutual exclusion semaphores // before calling the private method QGet. BOOL WaitMsg(QMSG & qmsg); private: BOOL QEmpty(); // returns TRUE if queue is empty //-------------------------------------------------------------------- // This function puts a message in the queue. This function is private // because it assumes that the proper mutual exclusion semaphores have // already been acquired. If the queue is full it will automatically // grow, so it cannot overflow until memory is exhausted. VOID QPut( ULONG hobj, // hwnd or object handle ULONG msg, // msg ID MPARAM mp1, // parameter 1 MPARAM mp2); // parameter 2 //-------------------------------------------------------------------- // This function extracts a waiting message from the queue and fills // the QMSG structure. This is a private function because it does // no mutual exclusion and assumes a msg is indeed waiting at the // Front of the queue (it returns whatever is there, valid or not). // This function does not block. VOID QGet (QMSG & pqmsg); HEV hevItmRdy; // Semaphore to indicate item ready HMTX hmtx; // Mutual exclusion semaphore USHORT Front, Rear; // Queue pointers USHORT usQSize; // Maximum number of queue entries QMSG *msgs; // Array of QMSG structures }; #endif Listing Eight // MsgQ.cpp //------------------------- Includes ------------------------------ #define INCL_WIN #define INCL_DOS #include #include "MsgQ.h" //-------------------------- defines ------------------------------ const USHORT MQ_INCREMENT = 5; //--------------------------- code -------------------------------- MsgQueue::MsgQueue (USHORT usQSz) : usQSize(usQSz), Front(0), Rear(0) { msgs = new QMSG[usQSize]; DosCreateMutexSem (NULL, &hmtx, DC_SEM_SHARED, FALSE); DosCreateEventSem (NULL, &hevItmRdy, DC_SEM_SHARED, FALSE); } MsgQueue::~MsgQueue() { DosCloseEventSem (hevItmRdy); DosCloseMutexSem (hmtx); delete msgs; } VOID MsgQueue::PostMsg (ULONG hobj, ULONG msg, MPARAM mp1, MPARAM mp2) { DosRequestMutexSem (hmtx, SEM_INDEFINITE_WAIT); QPut(hobj, msg, mp1, mp2); DosReleaseMutexSem (hmtx); DosPostEventSem (hevItmRdy); // wake up whoever is waiting for msgs } BOOL MsgQueue::WaitMsg(QMSG & qmsg) { ULONG ulNPosts; DosWaitEventSem (hevItmRdy, SEM_INDEFINITE_WAIT); DosRequestMutexSem (hmtx, SEM_INDEFINITE_WAIT); QGet (qmsg); if (QEmpty()) DosResetEventSem (hevItmRdy, &ulNPosts); DosReleaseMutexSem (hmtx); return (qmsg.msg); } BOOL MsgQueue::QEmpty() { return (Front == Rear); } VOID MsgQueue::QPut(ULONG hobj, ULONG msg, MPARAM mp1,MPARAM mp2) { USHORT usNxtR, usNQSize, idxF, i; QMSG *p; msgs[Rear].hwnd = (HWND)hobj; msgs[Rear].msg = msg; msgs[Rear].mp1 = mp1; msgs[Rear].mp2 = mp2; // If queue has filled up, then reallocate a larger queue // and transfer the contents to the new queue usNxtR = (Rear+1) % usQSize; if (usNxtR == Front) { usNQSize = usQSize + MQ_INCREMENT; p = new QMSG[usNQSize]; idxF = Front; for (i=0; i < usQSize; i++) { p[i] = msgs[idxF++]; if (idxF == usQSize) idxF = 0; } Front = 0; Rear = usQSize; delete msgs; usQSize = usNQSize; msgs = p; } else Rear = usNxtR; } VOID MsgQueue::QGet (QMSG & qmsg) { qmsg.hwnd = msgs[Front].hwnd; qmsg.msg = msgs[Front].msg; qmsg.mp1 = msgs[Front].mp1; qmsg.mp2 = msgs[Front].mp2; Front = (++Front % usQSize); } Listing Nine // PMThread.h #if !defined(PMTHREAD_INC) #define PMTHREAD_INC //----------------------------- Includes --------------------------- #include "QThread.h" //----------------------------- defines ---------------------------- const ULONG PMTHRD_DEF_STACKSIZE = 8192; //--------------------------- Public Types ------------------------- // Type for the procedure that is supplied to perform initialization and // shutdown for the PM thread. Usually this proc registers user classes // and/or creates the main window or windows. class PMThread; // forward declaration typedef VOID FNPROC (BOOL start, ULONG ulArg, PMThread* pmThrd); typedef FNPROC* PFNPROC; //------------------------------ Class ----------------------------- class PMThread : public QThread { public: PMThread ( PFNPROC pfn, USHORT usQSize=0, ULONG ulStackSize=PMTHRD_DEF_STACKSIZE); ~PMThread (); VOID Startup (ULONG ulArg); BOOL Shutdown(ULONG ulArg); VOID SendMsg( ULONG objAddr, ULONG msg, MPARAM mp1, MPARAM mp2); BOOL GetMessage(QMSG & qmsg) { return WinGetMsg(hab, &qmsg, NULLHANDLE, 0,0); } VOID DispatchMsg (QMSG & qmsg) { WinDispatchMsg (hab, &qmsg); } HAB QueryHAB() { return hab; } HMQ QueryHMQ() { return hmq; } private: HAB hab; // PM Anchor block handle HMQ hmq; // Message Queue handle PFNPROC pfnProc; }; #endif Listing Ten // PMThread.cpp //------------------------- Includes ------------------------------ #define INCL_WIN #define INCL_DOS #include #include "PMThread.h" //--------------------------- code -------------------------------- PMThread::PMThread (PFNPROC pfn, USHORT usQSize, ULONG ulStackSize) : QThread (usQSize, ulStackSize), pfnProc(pfn) {} PMThread::~PMThread() {} VOID PMThread::Startup(ULONG ulArg) { hab = WinInitialize(0); hmq = WinCreateMsgQueue (hab, ulQSize); pfnProc(TRUE, ulArg, this); } BOOL PMThread::Shutdown(ULONG ulArg) { pfnProc(FALSE, ulArg, this); WinDestroyMsgQueue(hmq); WinTerminate(hab); return TRUE; } VOID PMThread::SendMsg( ULONG objAddr, ULONG msg, MPARAM mp1, MPARAM mp2) { if (objAddr) WinPostMsg ((HWND)objAddr, msg, mp1, mp2); else WinPostQueueMsg (hmq, msg, mp1, mp2); } Example 1: ThreadA() { // do some stuff block on semaphore(a) // wait for a specific event // do some more stuff } ThreadB() { // do some stuff clear semaphore(a) // signals ThreadA // do more stuff } Example 2: MsgThreadA() { while (TRUE) { msg = getMsgFromQueue(); // do something with msg } } Example 3: while (this->GetMessage(qmsg)) this->DispatchMsg(qmsg);