_Improving Server Performance_ by John Calcote Listing One /* Ping.cpp */ #define WIN32_EXTRA_LEAN #include #include #include #include #include "aes.h" #define SERVER_SOCKET 0x3200 SOCKET g_skt; BOOL g_serverUp; AES *g_aesp; AES::AESHandle g_hEvent; void ServerThread(void *) { if (!g_serverUp) return; fd_set rdSkts; FD_ZERO(&rdSkts); FD_SET(g_skt, &rdSkts); struct timeval tv = {5,0}; int rval = select(0, &rdSkts, 0, 0, &tv); // schedule another pool thread to listen g_hEvent = g_aesp->ScheduleEvent(0, ServerThread, 0); // use this thread to process client request to completion if (rval < 0) printf("Winsock: select returned %d.\n", WSAGetLastError()); else if (rval > 0) { SOCKADDR_IPX addr; int recvLen, addrLen = sizeof addr; char recvBuf[5]; if ((recvLen = recvfrom(g_skt, recvBuf, sizeof recvBuf, 0, (struct sockaddr *)&addr, &addrLen)) == SOCKET_ERROR) { printf("Winsock: recvfrom returned %d.\n",WSAGetLastError()); return; } char *resp = strncmp(recvBuf,"Ping",5)?"101 Invalid Request.":"Pong"; int respLen = strlen(resp) + 1; if (sendto(g_skt, resp, respLen, 0, (struct sockaddr *)&addr, sizeof addr) == SOCKET_ERROR) printf("Winsock: sendto returned %d.\n", WSAGetLastError()); } return; } BOOL StartServer(WORD wSocket) { int err; SOCKADDR_IPX addr; WSADATA wsaData; if (!(g_aesp = new AES)) { printf("Unable to create AES object.\n"); return FALSE; } if (err = WSAStartup(MAKEWORD(1,1), &wsaData)) { printf("WSAStartup returned %d.\n", err); goto errout1; } if (LOBYTE(wsaData.wVersion) < 1 || HIBYTE(wsaData.wVersion) < 1) { printf("Winsock version too low.\n"); goto errout2; } if ((g_skt = socket(AF_IPX, SOCK_DGRAM, NSPROTO_IPX)) == INVALID_SOCKET) { printf("Windsock: socket returned %d.\n", WSAGetLastError()); goto errout2; } memset(&addr, 0, sizeof(SOCKADDR_IPX)); addr.sa_family = AF_IPX; addr.sa_socket = htons(wSocket); if (bind(g_skt, (struct sockaddr *) &addr, sizeof(SOCKADDR_IPX)) != 0) { printf("Winsock: bind returned %d.\n", WSAGetLastError()); goto errout3; } g_serverUp = TRUE; if (!(g_hEvent = g_aesp->ScheduleEvent(0, ServerThread, 0))) { printf("AES: Unable to schedule event.\n"); g_serverUp = FALSE; goto errout3; } return TRUE; errout3: closesocket(g_skt); errout2: WSACleanup(); errout1: delete g_aesp; return FALSE; } void StopServer(void) { g_serverUp = FALSE; g_aesp->CancelEvent(g_hEvent); delete g_aesp; closesocket(g_skt); WSACleanup(); return; } void main(void) { if (StartServer(SERVER_SOCKET)) { printf("Server running, press any key to terminate...\n"); getch(); StopServer(); } printf("Server terminated, press any key to exit...\n"); getch(); return; } Listing Two /* AES.cpp */ #include "AES.h" #include #include #include unsigned __stdcall AES::PoolThread::Worker(void *ref) { PoolThread& This = *(PoolThread *)ref; while (!This.dying) { Event *event = This.event; This.event = 0; if (!event) WaitForSingleObject(This.sem, INFINITE); else { event->Work(); delete event; } } return 0; } AES::PoolThread::PoolThread(int stack /* = 0 */ ) : dying(FALSE), sem(0), handle(0), event(0), InitState(0) { if (!(sem = CreateSemaphore(0, 1, 0, 0)) || !(handle = (HANDLE)_beginthreadex(0, stack, Worker, 0, 0, 0))) InitState = ERR_CANT_CREATE_HANDLE; return; } AES::PoolThread::~PoolThread(void) { dying = TRUE; if (sem) { if (handle) { ReleaseSemaphore(sem, 1, 0); WaitForSingleObject(handle, INFINITE); CloseHandle(handle); } CloseHandle(sem); } return; } unsigned __stdcall AES::Monitor(void *ref) { AES &This = *(AES*)ref; _CrtSetReportMode(_CRT_WARN, _CRTDBG_MODE_DEBUG); while (!This.dying) { int eventsWaiting = 0; ULONG now = ULONG(clock() / CLOCKS_PER_SEC); ULONG nextEventTime = ULONG_MAX, lastComplaintTime = 0; EnterCriticalSection(&This.critSec); for (EventList::iterator eli = This.eventList.begin(); eli != This.eventList.end(); /**/ ) { for (int i = 0; i < This.size && This.pool[i].Busy(); i++) ; // find next available worker if (i < This.size && (*eli)->When() <= now) { Event *event = *eli; eli = This.eventList.erase(eli); This.pool[i].Alloc(event); } else { if ((*eli)->When() <= now) eventsWaiting++; else if ((*eli)->When() < nextEventTime) nextEventTime = (*eli)->When(); eli++; } } LeaveCriticalSection(&This.critSec); if (!eventsWaiting) WaitForSingleObject(This.sem, (nextEventTime - now) * 1000); else { if (lastComplaintTime + 3 < now) { _CrtDbgReport(_CRT_WARN, 0, 0, 0, "Thread pool overflow. %d events delayed.", eventsWaiting); lastComplaintTime = now; } Sleep(100); } } return 0; } AES::AESHandle AES::ScheduleEvent(ULONG ulDelay, void (*pFunc)(void*), void *pData) { Event *event; _ASSERT(pFunc); if (!(event = new Event(ulDelay, pFunc, pData))) return AESHandle(0); EnterCriticalSection(&critSec); eventList.push_front(event); LeaveCriticalSection(&critSec); ReleaseSemaphore(sem, 1, 0); return AESHandle(event); } BOOL AES::CancelEvent(AESHandle hEvent) { Event *event = (Event *)hEvent; BOOL cancelled = FALSE; EnterCriticalSection(&critSec); for (EventList::iterator eli = eventList.begin(); eli != eventList.end(); eli++) if (*eli == event) { eventList.erase(eli); delete event; cancelled = TRUE; break; } LeaveCriticalSection(&critSec); return cancelled; } AES::AES(int threads /* = 10 */) : dying(FALSE), sem(0), handle(0), size(threads), pool(0), InitState(0) { // Initialize event list critical section InitializeCriticalSection(&critSec); // Allocate thread pool if (!(pool = new PoolThread[size])) { InitState = ERR_INSUFFICIENT_MEMORY; return; } // Check init state of threads in pool for (int i = 0; i < size; i++) if (pool[i].InitState) { InitState = pool[i].InitState; return; } // Create monitor semaphore and thread if (!(sem = CreateSemaphore(0, 0, 1, 0)) || !(handle = (HANDLE)_beginthreadex(0, 0, Monitor, this, 0, 0))) InitState = ERR_CANT_CREATE_HANDLE; return; } AES::~AES(void) { dying = TRUE; if (sem) { if (handle) { ReleaseSemaphore(sem, 1, 0); WaitForSingleObject(handle, INFINITE); CloseHandle(handle); } CloseHandle(sem); } delete [] pool; DeleteCriticalSection(&critSec); return; }