///////////////////////////////////////////////////////////////////// // threads.cpp - class supporting creation of threads // // ver 3.7 // // Language: Visual C++, ver 7.1, SP 2 // // Platform: Dell Dimension 8300, Win XP, SP2 // // Application: CSE687 - Object Oriented Design // // Author: Jim Fawcett, CST 2-187, Syracuse Univ // // (315) 443-3948, jfawcett@twcny.rr.com // // // ///////////////////////////////////////////////////////////////////// #include "threads.h" #include "locks.h" #include "blockingQueue.h" #include syncOut sout(std::cout); // globally available sout object // mimics cout //----< Win32 thread is created here >------------------------------- void thread::start() { if(th) CloseHandle(th); // in case we restart the thread _alive = true; th = (HANDLE)_beginthreadex( NULL, // default security attributes 0, // default stack size threadProc, // thread processing function (void*)this, // pointer to this thread object 0, // create thread in running state &threadID // thread identifier assigned by OS ); } //----< destroy thread >--------------------------------------------- thread::~thread() { _alive = false; CloseHandle(th); th = 0; delete _pProc; // std::cout << "\n I won't be back - I've been terminated"; // remove comment above to see self destruction } //----< thread proc wrapper for derived class's run function >------- unsigned int __stdcall thread::threadProc(void *pSelf) { ((thread*)pSelf)->_pProc->run(); ((thread*)pSelf)->_alive = false; if(((thread*)pSelf)->_suicidal) delete (thread*)pSelf; return 0; } // //----< wait until thread terminates >------------------------------- void thread::wait() { WaitForSingleObject(th,INFINITE); } //----< set thread priority >---------------------------------------- void thread::setPriority(thread::priority pr) { switch(pr) { case thread::low : ::SetThreadPriority(th,THREAD_PRIORITY_LOWEST); break; case thread::normal : ::SetThreadPriority(th,THREAD_PRIORITY_NORMAL); break; case thread::high : ::SetThreadPriority(th,THREAD_PRIORITY_HIGHEST); break; default : ::SetThreadPriority(th,THREAD_PRIORITY_NORMAL); } } //----< is this thread still alive? >-------------------------------- // // thread becomes alive when start() is called and becomes // not alive when its run() function completes // bool thread::alive() { return _alive; } //----< increment suspend count >------------------------------------ // // puts thread to sleep if not already asleep // bool thread::suspend() { if(_canSuspend) { ::SuspendThread(th); } return _canSuspend; } // //----< test stub >-------------------------------------------------- #ifdef TEST_THREADS #include #include using namespace std; /////////////////////////////////////////////////////////////// // Note: // You must create derived classes to define all your // application's processing that you want allocated to the // thread. Examples are given here and on the next page. // // Each derived class must provide a run function to supply // application specific process and a clone function to // allow the thread to make its own copy without having to // know the derived type. /////////////////////////////////////////////////////////////// // non-queued processing for first demonstration class appProc1 : public processing { public: processing* clone() { return new appProc1(*this); } void run() { sout << locker << "\n current thread id = " << thread::currentThread() << unlocker; for(int i=0; i<10; i++) { sout << locker << "\n thread #1: " << _str.c_str() << unlocker; ::Sleep(20); // relinquish control } } void setString(const string &s) { _str = s; } private: string _str; }; /////////////////////////////////////////////////////////////// // non-queued processing for first demonstration class appProc2 : public processing { public: processing* clone() { return new appProc2(*this); } void run() { for(int i=0; i<10; i++) { sout << locker << "\n thread #2: " << _str.c_str() << unlocker; ::Sleep(1); // relinquish control } } void setString(const string &s) { _str = s; } private: string _str; }; // /////////////////////////////////////////////////////////////// // queued processing for second demonstration class Qproc : public processing { public: Qproc(BQueue& Q) : _Q(Q) {} processing* clone() { return new Qproc(*this); } void run() { string temp; do { temp = _Q.deQ(); sout << locker << "\n child thread deQ'd: " << temp.c_str() << unlocker; } while(temp != "quit"); } private: BQueue& _Q; }; /////////////////////////////////////////////////////////////// // spawn new thread in threadproc class InProc : public processing { public: processing* clone() { return new InProc(*this); } void run() { sout << "\n inner thread processing with InProc object"; } }; class OutProc : public processing { public: processing* clone() { return new OutProc(*this); } void run() { sout << "\n outer thread processing with OutProc object"; sout << "\n creating child thread"; InProc inProc; thread t(inProc); t.start(); t.wait(); } }; // ///////////////////////////////////////////////////////////////////// // spawn new thread on heap in threadproc class OutProcOnHeap : public processing { public: processing* clone() { return new OutProcOnHeap(*this); } void run() { sout << "\n outer thread processing with OutProc object"; sout << "\n creating child thread"; InProc inProc; thread* pThread = new thread(inProc); pThread->willSelfDestruct(); pThread->start(); pThread->wait(); } }; //----< helper conversion function >--------------------------------- template std::string ToString(const T& t) { std::ostringstream converter; converter << t; return converter.str(); } // ///////////////////////////////////////////////////////////////////// // thread processing that shares a locked string class sharedString1 : public processing { public: sharedString1() : myCount(++count) {} processing* clone() { return new sharedString1(*this); } void run() { const string blanks(41,' '); for(int i=0; i<10; ++i) { gl.lock(); if(shared.length() > 10) { cout << "\n " << blanks; shared = ""; } shared += ToString(myCount); cout << "\n " << shared; gl.unlock(); } } private: static int count; int myCount; static std::string shared; GLock<1> gl; }; int sharedString1::count = 0; std::string sharedString1::shared = ""; // ///////////////////////////////////////////////////////////////////// // thread processing that shares a string using lockingPtr class sharedString2 : public processing { public: sharedString2() : myCount(++count) {} processing* clone() { return new sharedString2(*this); } void addString(const std::string& str) { shared += str; } void clearString() { shared = ""; } void run() { const string blanks(41,' '); for(int i=0; i<10; ++i) { if(shared.length() > 10) { sout << locker << "\n " << blanks << unlocker; lockingPtr< sharedString2, GLock<1> >(*this,gl)->clearString(); } lockingPtr< sharedString2, GLock<1> >(*this,gl) ->addString(ToString(myCount)); sout << locker << "\n " << shared << unlocker; } } private: static int count; int myCount; static std::string shared; GLock<1> gl; }; int sharedString2::count = 0; std::string sharedString2::shared = ""; // //----< test stub >-------------------------------------------------- void main() { cout << "\n Demonstrating thread with processing class " << "\n ============================================\n"; cout.flush(); GLock<1> l; sout << "\n main thread id = " << thread::currentThread(); appProc1 proc1; proc1.setString("a short message"); thread t1(proc1); t1.start(); appProc2 proc2; proc2.setString("a somewhat longer, and rather boring, message"); thread t2(proc2); t2.start(); t1.wait(); t2.wait(); sout << "\n\n"; cout << "\n Demonstrating thread using blocking Queue passed to proc " << "\n ---------------------------------------------------------\n"; // Create processing object, defined above, // then create and start thread. BQueue Q; Qproc proc3(Q); thread t3(proc3); t3.start(); for(int i=0; i<10; i++) { // make a message string string temp("message #"); temp += ToString(i+1); // Now enQ a message to be handled by the child thread. // Note that no synchronization is necessary. That's // handled by the interface enQ(...) and deQ() functions. Q.enQ(temp); sout << locker << "\n main thread enQ'd: " << temp << unlocker; // Sleep used to slow down main thread so enQs and deQs interleave. ::Sleep(15); } string temp("quit"); Q.enQ(temp); sout << locker << "\n main thread enQ'd: " << temp << unlocker; t3.wait(); sout << "\n\n"; sout << "\f\n"; // sout << locker << "\n restarting the thread with queue " << "\n ----------------------------------\n" << unlocker; t3.start(); Q.enQ("started again"); Q.enQ("now quitting"); Q.enQ("quit"); t3.wait(); sout << "\n\n"; sout << "\n running thread inside thread proc " << "\n -----------------------------------\n"; OutProc outer; thread t4(outer); t4.start(); t4.wait(); sout << "\n\n"; sout << locker << "\n running thread, created on heap, inside thread proc " << "\n -----------------------------------------------------\n" << unlocker; OutProcOnHeap hOuter; thread* pThread = new thread(hOuter); pThread->willSelfDestruct(); pThread->start(); pThread->wait(); sout << "\n\n"; sout << locker << "\n loop creating threads on the heap " << "\n -----------------------------------\n" << unlocker; for(int i=0; i<2; ++i) { appProc1 proc1; proc1.setString("Hi from heap"); pThread = new thread(proc1); pThread->willSelfDestruct(); pThread->start(); } pThread->wait(); sout << "\n\n"; // cout << "\n Demonstrating string sharing between threads " << "\n ---------------------------------------------\n"; // Create processing object, defined above, // then create and start thread. std::cout << "\n using Glock<1> for synchronization"; sharedString1 ss1, ss2; thread t5(ss1); thread t6(ss2); t5.start(); t6.start(); t5.wait(); t6.wait(); std::cout << "\n\n"; std::cout << "\n using lockingPtr for synchronization"; sharedString2 ss3, ss4; thread t7(ss3); thread t8(ss4); t7.start(); t8.start(); t7.wait(); t8.wait(); std::cout << "\n\n"; } #endif