Pipes, Channels, and Perl-Win32 by Jean-Louis Leroy Listing One # ifndef channel_defined # define channel_defined # include # include # include template class channel { public: class lock { public: lock(CRITICAL_SECTION& critsec) : critsec(critsec) { EnterCriticalSection(&critsec); } ~lock() { LeaveCriticalSection(&critsec); } private: CRITICAL_SECTION& critsec; }; channel(); ~channel(); void open(); void write(const T& item); int read(T& item); int blocking_read(T& item); BOOL wait(DWORD timeout = INFINITE); int pending() const; BOOL is_open() const; BOOL is_closed() const; void close(); void write(const T* iter, const T* last); void write(const T* iter, int n); T* read(T* iter, T* last); int read(T* iter, int n); T* blocking_read(T* iter, T* last); int blocking_read(T* iter, int n); private: mutable CRITICAL_SECTION critsec; HANDLE event; queue >, allocator > buf; BOOL opened; friend class lock; }; template inline BOOL channel::is_open() const { lock access(critsec); return opened; } template inline BOOL channel::is_closed() const { lock access(critsec); return !opened; } template inline int channel::pending() const { lock access(critsec); return buf.size(); } template channel::channel() : opened(FALSE) { // do inits in ctor (not in open) because other thread might // wait on this channel after it's been closed InitializeCriticalSection(&critsec); event = CreateEvent(NULL, TRUE, FALSE, NULL); } template void channel::open() { lock access(critsec); while (buf.size()) buf.pop(); ResetEvent(event); opened = TRUE; } template channel::~channel() { DeleteCriticalSection(&critsec); if (event) CloseHandle(event); } template void channel::write(const T& item) { lock access(critsec); assert(opened); int prev = buf.size(); buf.push(item); if (!prev); SetEvent(event); } template void channel::write(const T* iter, const T* last) { if (iter == last) return; lock access(critsec); assert(opened); int prev = buf.size(); while (iter != last) { buf.push(*iter); ++iter; } if (!prev); SetEvent(event); } template void channel::write(const T* iter, int n) { if (!n) return; lock access(critsec); assert(opened); int prev = buf.size(); while (n) { buf.push(*iter); ++iter; --n; } if (!prev); SetEvent(event); } template int channel::read(T& item) { lock access(critsec); if (!buf.size()) return 0; item = buf.front(); buf.pop(); if (!buf.size() && opened) ResetEvent(event); return 1; } template T* channel::read(T* iter, T* last) { lock access(critsec); while (buf.size() && iter != last) { *iter = buf.front(); ++iter; buf.pop(); } if (!buf.size() && opened) ResetEvent(event); return iter; } template int channel::read(T* iter, int n) { lock access(critsec); int i = 0; while (buf.size() && i < n) { *iter = buf.front(); ++iter; ++i; buf.pop(); } if (!buf.size() && opened) ResetEvent(event); return i; } template int channel::blocking_read(T& item) { while (!read(item)) if (!wait()) return 0; return 1; } template T* channel::blocking_read(T* iter, T* last) { do { iter = read(iter, last); } while (iter != last && wait()); return iter; } template int channel::blocking_read(T* iter, int n) { int res = 0; do { int r = read(iter, n); iter += r; res += r; n -= r; } while (n && wait()); return res; } template BOOL channel::wait(DWORD timeout) { // channel can be closed at this point; however it may still contain data WaitForSingleObject(event, timeout); lock access(critsec); return buf.size() || opened; } template void channel::close() { // critical section { // signal we're closed lock access(critsec); opened = FALSE; } // there is 'data' in the channel; i.e. information that there's no more SetEvent(event); } # endif Listing Two # ifndef channelstream_defined # define channelstream_defined # include # include # include "channel.h" template class basic_channelbuf : public basic_streambuf > { public: basic_channelbuf(); ~basic_channelbuf(); void open(channel& c, BOOL autoclose); void close(); void close_channel(); BOOL is_open() const { return pc && pc->is_open(); } protected: virtual int_type underflow(); virtual int_type overflow(int_type c = EOF); virtual int_type sync(); private: channel* pc; enum { ibufsize = 1024, obufsize = 1024 }; Char ibuf[ibufsize]; Char obuf[obufsize]; BOOL autoclose; }; typedef basic_channelbuf channelbuf; typedef basic_channelbuf wchannelbuf; template class channelstream_ : public Base { public: channelstream_() : Base(&buf) { } channelstream_(channel& c,BOOL autoclose=Autoclose):Base(&buf) { open(c, autoclose); } void open(channel& c, BOOL autoclose = Autoclose) { buf.open(c, autoclose); } BOOL is_open() const { return buf.is_open(); } void close() { buf.close(); } void close_channel() { buf.close_channel(); } private: basic_channelbuf buf; }; typedef channelstream_ ochannelstream; typedef channelstream_ ichannelstream; typedef channelstream_ channelstream; typedef channelstream_ wochannelstream; typedef channelstream_ wichannelstream; typedef channelstream_ wchannelstream; template basic_channelbuf::basic_channelbuf() : pc(NULL) { } template basic_channelbuf::~basic_channelbuf() { if (pc) close(); } template void basic_channelbuf::open(channel& c, BOOL ac) { pc = &c; if (!c.is_open()) c.open(); autoclose = ac; setp(obuf, obuf + obufsize); } template void basic_channelbuf::close() { sync(); if (autoclose && pc->is_open()) pc->close(); pc = NULL; } template void basic_channelbuf::close_channel() { sync(); if (pc->is_open()) pc->close(); pc = NULL; } template basic_channelbuf::int_type basic_channelbuf::underflow() { if (!pc->blocking_read(ibuf, 1)) return traits_type::eof(); setg(ibuf, ibuf, pc->read(ibuf + 1, ibuf + ibufsize)); return traits_type::to_int_type(*gptr()); } template basic_channelbuf::int_type basic_channelbuf::sync() { return overflow(); } template basic_channelbuf::int_type basic_channelbuf::overflow(int_type c) { if (!is_open()) return traits_type::eof(); pc->write(pbase(), pptr()); if (c != traits_type::eof()) { Char tmp = traits_type::to_char_type(c); pc->write(tmp); } setp(obuf, obuf + obufsize); return 1; } # endif 7