Main Page   Modules   Namespace List   Class Hierarchy   Data Structures   File List   Namespace Members   Data Fields   Globals   Related Pages  

WorkerGroup.h

Go to the documentation of this file.
00001 #ifndef WORKERGROUP_H
00002 #define WORKERGROUP_H
00003 #include <queue>
00004 #include <vector>
00005 #include <algorithm>
00006 #include <boost/thread.hpp>
00007 
00008 #include "Sct/LogicErrors.h"
00009 
00010 namespace SctService{
00011 
00022 template <class T>
00023 class WorkerGroup : private boost::noncopyable {
00024 public:
00025     typedef T DataType;
00026     
00028     void go(unsigned nWorker=1);    
00029 
00034     unsigned queueSize();
00038     unsigned nWorkers();
00043     unsigned busy();
00047     void stop();
00048     /*
00049      * Raise flag to pause all worker threads.
00050      * @param true=pause, false=unpause/play/restart 
00051      */
00052     void setPaused(bool paused=true);
00056     void join();
00060     unsigned paused();
00065     void setSleepTime(unsigned msec);
00069     unsigned getSleepTime() {return m_sleeptime;}
00070     /*
00071      * Pop an object off the front of the queue. @return T to the object
00072      * if the queue isn't empty, otherwise, T()
00073      */
00074     DataType pop();
00078     void push(DataType ob);    
00079     
00081     virtual ~WorkerGroup() {;} 
00082     
00084     WorkerGroup() : m_run(true), m_pause(false), m_sleeptime(1) {}
00085 
00086 protected:
00087     //Inner class to hold thread specific stuff
00088     class Worker {
00089     public:
00090     Worker(WorkerGroup<DataType>& group) : group(group), m_busy(false), m_paused(false) {}
00091     void operator() () {
00092         group.threadMain(*this);
00093     }
00094     WorkerGroup<DataType>& group;
00095     volatile bool m_busy; 
00096     volatile bool m_paused; 
00097     };
00098     
00104     virtual void threadMain(Worker& worker) throw();
00109     virtual bool popAndWork(Worker& worker);
00113     virtual void waitForData();
00117     virtual void work(DataType data) throw() = 0;
00121     void wait(int msec);
00125     void addWorker(Worker& worker);
00126     
00127     typedef typename std::vector<Worker*> WorkerStore;
00128     typedef typename WorkerStore::iterator Iterator;    
00129     
00135      WorkerStore m_workers;
00136 
00141     std::queue<DataType> m_queue;
00142     
00143 
00145     boost::thread_group m_thread_group;
00146     
00147     volatile bool m_run;  
00148     volatile bool m_pause; 
00149     volatile unsigned m_sleeptime; 
00150     
00151     boost::mutex m_queueAccess; 
00152     boost::mutex m_vectorAccess; 
00153     friend class Worker;
00154 };
00155 
00156 
00157 template <class T>
00158 void WorkerGroup<T>::go(unsigned nWorker){    
00159     // create nWorker new threads
00160     for (unsigned i=0; i<nWorker; ++i) {
00161     m_thread_group.create_thread(Worker(*this));
00162     }
00163 }
00164 
00165 template <class T>
00166 void WorkerGroup<T>::threadMain(Worker& worker) throw() { 
00167     //As the worker is owned by some boost thing, we just need to add it.
00168     addWorker(worker);
00169     
00170     while (m_run) {                        // if !m_run, thread will finish.
00171     while (m_pause && m_run) {     // while paused, and not stopped, wait in sleep loop.
00172         worker.m_paused = true;
00173         wait(m_sleeptime);
00174     }
00175     worker.m_paused = false;
00176     if (!m_run) return;                     // if no longer running, stop.
00177     
00178     if (!popAndWork(worker)) {
00179         waitForData();    // if queue is empty, sleep.
00180     }
00181     }
00182     return;
00183 }
00184 
00185 template<class T>
00186 void WorkerGroup<T>::addWorker(Worker& worker) {
00187     boost::mutex::scoped_lock lock(m_vectorAccess);
00188     m_workers.push_back(&worker);
00189 }
00190 
00191 template <class T>
00192 void WorkerGroup<T>::push(DataType ob){
00193     boost::mutex::scoped_lock lock(m_queueAccess);
00194     if (lock.locked()) m_queue.push(ob);
00195     else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00196 }
00197 
00198 template <class T>
00199 T WorkerGroup<T>::pop(){
00200     boost::mutex::scoped_lock lock(m_queueAccess);
00201     if (lock.locked()) {
00202     if (m_queue.size()!=0) {
00203         DataType a=m_queue.front();
00204         m_queue.pop();
00205         return a;
00206     }
00207     return DataType();
00208     } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00209 }
00210 
00211 template <class W>
00212 bool WorkerGroup<W>::popAndWork(Worker& worker){
00213     boost::mutex::scoped_lock lock(m_queueAccess);
00214     if (lock.locked()) {    
00215     if (m_queue.size()!=0) {
00216         DataType data=m_queue.front();
00217         m_queue.pop();
00218         lock.unlock();          //Don't hold onto the lock for too long!
00219         worker.m_busy = true;            // set the busy flag
00220         work(data);                  // call the function to do work.
00221         worker.m_busy = false;           // unset the busy flag 
00222         return true;
00223     } else return false;
00224     } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00225 }
00226 
00227 template <class W>    
00228 unsigned WorkerGroup<W>::queueSize(){
00229     return m_queue.size();
00230 }
00231 
00232 template <class W>
00233 unsigned WorkerGroup<W>::nWorkers(){
00234     return m_workers.size();
00235 }
00236 
00237 template <class W>
00238 void WorkerGroup<W>::stop(){
00239     m_run = false;
00240     join();
00241 }
00242 
00243 template <class W>
00244 void WorkerGroup<W>::join(){
00245     m_thread_group.join_all();
00246 }
00247 
00248 template <class W>
00249 void WorkerGroup<W>::setPaused(bool p){
00250     m_pause = p;
00251 }
00252 
00253 template <class W>
00254 unsigned WorkerGroup<W>::paused(){
00255     unsigned npaused=0;
00256     for (Iterator it=m_workers.begin(); it!=m_workers.end(); ++it) 
00257     if ( (*it)->m_paused ) ++npaused;
00258 
00259     return npaused;
00260 }
00261 
00262 template <class W>
00263 unsigned WorkerGroup<W>::busy(){
00264     unsigned nbusy=0;
00265     for (Iterator it=m_workers.begin(); it!=m_workers.end(); ++it) {
00266     if ( (*it)->m_busy ) ++nbusy;
00267     }
00268     return nbusy;
00269 }
00270 
00271 template <class W>
00272 void WorkerGroup<W>::setSleepTime(unsigned t){
00273     m_sleeptime = t;
00274 }
00275 
00276 template <class W>
00277 void WorkerGroup<W>::wait(int wait_time) {
00278     boost::xtime xt;
00279     boost::xtime_get(&xt, boost::TIME_UTC);
00280     xt.nsec += wait_time*1000000;
00281     boost::thread::sleep(xt);
00282 }
00283 
00284 template <class W>
00285 void WorkerGroup<W>::waitForData(){
00286     wait(getSleepTime());
00287 }
00288 
00289 
00290 } // end of namepace SctService
00291 #endif //#ifndef WORKERGROUP_H

Generated on Mon Dec 15 19:36:24 2003 for SCT DAQ/DCS Software by doxygen1.3-rc3