Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Data Structures | File List | Namespace Members | Data Fields | Related Pages

WorkerGroup.h

00001 #ifndef WORKERGROUP_H
00002 #define WORKERGROUP_H
00003 #include <queue>
00004 #include <vector>
00005 #include <algorithm>
00006 #include <boost/thread.hpp>
00007 
00008 #include "Sct/LogicErrors.h"
00009 
00010 namespace SctService{
00011 
00022 template <class T>
00023 class WorkerGroup : private boost::noncopyable {
00024 public:
00025     typedef T DataType;
00026     
00028     void go(unsigned nWorker=1);    
00029 
00034     unsigned queueSize() const;
00038     unsigned nWorkers() const;
00043     unsigned busy() const;
00047     void stop();
00048     /*
00049      * Raise flag to pause all worker threads.
00050      * @param true=pause, false=unpause/play/restart 
00051      */
00052     void setPaused(bool paused=true);
00056     void join();
00060     unsigned paused() const;
00065     void setSleepTime(unsigned msec);
00069     unsigned getSleepTime() const {return m_sleeptime;}
00070     /*
00071      * Pop an object off the front of the queue. @return T to the object
00072      * if the queue isn't empty, otherwise, T()
00073      */
00074     DataType pop();
00078     void push(DataType ob);    
00079     
00081     virtual ~WorkerGroup() {;} 
00082     
00084     WorkerGroup() : m_run(true), m_pause(false), m_sleeptime(1) {}
00085 
00086 protected:
00087     //Inner class to hold thread specific stuff
00088     class Worker {
00089     public:
00090     Worker(WorkerGroup<DataType>& group) : group(group), m_busy(false), m_paused(false) {}
00091     void operator() () {
00092         group.threadMain(*this);
00093     }
00094     WorkerGroup<DataType>& group;
00095     volatile bool m_busy; 
00096     volatile bool m_paused; 
00097     };
00098     
00104     virtual void threadMain(Worker& worker) throw();
00109     virtual bool popAndWork(Worker& worker);
00113     virtual void waitForData();
00117     virtual void work(DataType data) throw() = 0;
00121     void wait(int msec);
00125     void addWorker(Worker& worker);
00126     
00127     typedef typename std::vector<Worker*> WorkerStore;
00128     typedef typename WorkerStore::iterator Iterator;
00129     typedef typename WorkerStore::const_iterator ConstIterator;
00130     
00136      WorkerStore m_workers;
00137 
00142     std::queue<DataType> m_queue;
00143     
00144 
00146     boost::thread_group m_thread_group;
00147     
00148     volatile bool m_run;  
00149     volatile bool m_pause; 
00150     volatile unsigned m_sleeptime; 
00151     
00152     boost::mutex m_queueAccess; 
00153     boost::mutex m_vectorAccess; 
00154     friend class Worker;
00155 };
00156 
00157 
00158 template <class T>
00159 void WorkerGroup<T>::go(unsigned nWorker){    
00160     // create nWorker new threads
00161     for (unsigned i=0; i<nWorker; ++i) {
00162     m_thread_group.create_thread(Worker(*this));
00163     }
00164 }
00165 
00166 template <class T>
00167 void WorkerGroup<T>::threadMain(Worker& worker) throw() { 
00168     //As the worker is owned by some boost thing, we just need to add it.
00169     addWorker(worker);
00170     
00171     while (m_run) {                        // if !m_run, thread will finish.
00172     while (m_pause && m_run) {     // while paused, and not stopped, wait in sleep loop.
00173         worker.m_paused = true;
00174         wait(m_sleeptime);
00175     }
00176     worker.m_paused = false;
00177     if (!m_run) return;                     // if no longer running, stop.
00178     
00179     if (!popAndWork(worker)) {
00180         waitForData();    // if queue is empty, sleep.
00181     }
00182     }
00183     return;
00184 }
00185 
00186 template<class T>
00187 void WorkerGroup<T>::addWorker(Worker& worker) {
00188     boost::mutex::scoped_lock lock(m_vectorAccess);
00189     m_workers.push_back(&worker);
00190 }
00191 
00192 template <class T>
00193 void WorkerGroup<T>::push(DataType ob){
00194     boost::mutex::scoped_lock lock(m_queueAccess);
00195     if (lock.locked()) m_queue.push(ob);
00196     else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00197 }
00198 
00199 template <class T>
00200 T WorkerGroup<T>::pop(){
00201     boost::mutex::scoped_lock lock(m_queueAccess);
00202     if (lock.locked()) {
00203     if (m_queue.size()!=0) {
00204         DataType a=m_queue.front();
00205         m_queue.pop();
00206         return a;
00207     }
00208     return DataType();
00209     } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00210 }
00211 
00212 template <class W>
00213 bool WorkerGroup<W>::popAndWork(Worker& worker){
00214     boost::mutex::scoped_lock lock(m_queueAccess);
00215     if (lock.locked()) {    
00216     if (m_queue.size()!=0) {
00217         DataType data=m_queue.front();
00218         m_queue.pop();
00219         lock.unlock();          //Don't hold onto the lock for too long!
00220         worker.m_busy = true;            // set the busy flag
00221         work(data);                  // call the function to do work.
00222         worker.m_busy = false;           // unset the busy flag 
00223         return true;
00224     } else return false;
00225     } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00226 }
00227 
00228 template <class W>    
00229 unsigned WorkerGroup<W>::queueSize() const{
00230     return m_queue.size();
00231 }
00232 
00233 template <class W>
00234 unsigned WorkerGroup<W>::nWorkers() const{
00235     return m_workers.size();
00236 }
00237 
00238 template <class W>
00239 void WorkerGroup<W>::stop(){
00240     m_run = false;
00241     join();
00242 }
00243 
00244 template <class W>
00245 void WorkerGroup<W>::join(){
00246     m_thread_group.join_all();
00247 }
00248 
00249 template <class W>
00250 void WorkerGroup<W>::setPaused(bool p){
00251     m_pause = p;
00252 }
00253 
00254 template <class W>
00255 unsigned WorkerGroup<W>::paused() const{
00256     unsigned npaused=0;
00257     for (ConstIterator it=m_workers.begin(); it!=m_workers.end(); ++it) 
00258     if ( (*it)->m_paused ) ++npaused;
00259 
00260     return npaused;
00261 }
00262 
00263 template <class W>
00264 unsigned WorkerGroup<W>::busy() const{
00265     unsigned nbusy=0;
00266     for (ConstIterator it=m_workers.begin(); it!=m_workers.end(); ++it) {
00267     if ( (*it)->m_busy ) ++nbusy;
00268     }
00269     return nbusy;
00270 }
00271 
00272 template <class W>
00273 void WorkerGroup<W>::setSleepTime(unsigned t){
00274     m_sleeptime = t;
00275 }
00276 
00277 template <class W>
00278 void WorkerGroup<W>::wait(int wait_time) {
00279     boost::xtime xt;
00280     boost::xtime_get(&xt, boost::TIME_UTC);
00281     xt.nsec += wait_time*1000000;
00282     boost::thread::sleep(xt);
00283 }
00284 
00285 template <class W>
00286 void WorkerGroup<W>::waitForData(){
00287     wait(getSleepTime());
00288 }
00289 
00290 
00291 } // end of namepace SctService
00292 #endif //#ifndef WORKERGROUP_H

Generated on Thu Jul 15 09:51:02 2004 for SCT DAQ/DCS Software - C++ by doxygen 1.3.5