WorkerGroup.h

00001 #ifndef WORKERGROUP_H
00002 #define WORKERGROUP_H
00003 #include <list>
00004 #include <vector>
00005 #include <algorithm>
00006 
00007 #include <boost/thread.hpp>
00008 #include <boost/timer.hpp>
00009 
00010 #include "Sct/LogicErrors.h"
00011 #include "Sct/ISUtilities.h"
00012 #include "Sct/SctNames.h"
00013 #include "IS/ServiceStatus.h"
00014 
00015 namespace SctService{
00016 
00027 template <class T>
00028 class WorkerGroup : private boost::noncopyable {
00029 public:
00030     typedef T DataType;
00031     
00033     void go(unsigned nWorker=1);    
00034 
00039     unsigned queueSize() const;
00043     unsigned nWorkers() const;
00047     unsigned long nDone() const;
00052     unsigned busy() const;
00056     void stop();
00057     /*
00058      * Raise flag to pause all worker threads.
00059      * @param true=pause, false=unpause/play/restart 
00060      */
00061     void setPaused(bool paused=true);
00065     void join();
00069     unsigned paused() const;
00074     void setSleepTime(unsigned msec);
00078     unsigned getSleepTime() const {return m_sleeptime;}
00079     /*
00080      * Pop an object off the front of the queue. @return T to the object
00081      * if the queue isn't empty, otherwise, T()
00082      */
00083     DataType pop();
00087     void push(DataType ob);    
00088 
00089     /*
00090       Set First In First Out (if true) or First In Last Out (if false).
00091       FILO is more performant, since less disk swap is necessary, 
00092       but a bit less intuitive for the user!
00093     */
00094     void setFifo(bool val);
00095     /*
00096       get FIFO (true) or FILO (false) state.
00097     */
00098     bool isFifo() const;
00100     virtual ~WorkerGroup() {;} 
00101     
00103     WorkerGroup() : m_run(true), m_pause(false), m_sleeptime(1), m_reportInterval(1000), 
00104       m_nDone(0), m_fifo(true) {}
00105 
00107     void reportTo(std::string isname);
00109     void reportInterval(int msec);
00110 protected:
00112     class Worker {
00113     public:
00114     Worker(WorkerGroup<DataType>& group) : group(group), m_busy(false), m_paused(false) {
00115     }
00116     
00117     Worker(const Worker& w) : group(w.group), m_busy(false), m_paused(false) {}
00118 
00119     void operator() () {
00120         group.threadMain(*this);
00121     }
00122     WorkerGroup<DataType>& group;
00124     bool busy();
00126     bool paused();
00128     void setBusy(bool v);
00130     void setPaused(bool v);
00132     //Worker& operator=(const Worker& w)
00133     private:
00134     boost::mutex m_statusMutex;      
00135     volatile bool m_busy;            
00136     volatile bool m_paused;          
00137     };
00138 
00140     class Reporter {
00141     public:
00142       Reporter(WorkerGroup<DataType>& group) : group(group){ 
00143       }
00144       void operator() (){
00145     group.reportThreadMain(*this);
00146       }
00147       WorkerGroup<DataType>& group;
00148       ServiceStatus last;
00149     };
00150 
00152     virtual void reportThreadMain(Reporter& reporter) throw();
00153     
00159     virtual void threadMain(Worker& worker) throw();
00164     virtual bool popAndWork(Worker& worker);
00168     virtual void waitForData();
00172     virtual void work(DataType data) throw() = 0;
00176     void wait(int msec);
00180     void addWorker(Worker& worker);
00181     
00182     typedef typename std::vector<Worker*> WorkerStore;
00183     typedef typename WorkerStore::iterator Iterator;
00184     typedef typename WorkerStore::const_iterator ConstIterator;
00185     
00191      WorkerStore m_workers;
00192 
00197     std::list<DataType> m_queue;
00198     
00199 
00201     boost::thread_group m_thread_group;
00202     
00203     volatile bool m_run;  
00204     volatile bool m_pause; 
00205     volatile unsigned m_sleeptime; 
00206     std::string m_reportTo; 
00207     volatile int m_reportInterval;   
00208     
00209     mutable boost::mutex m_queueAccess;  
00210     mutable boost::mutex m_vectorAccess; 
00211     mutable boost::recursive_mutex m_reportAccess; 
00212     volatile unsigned long m_nDone;   
00213     volatile double m_time;   
00214     friend class Worker;
00215     friend class Reporter;
00216     void addTime(double t);            
00217     bool m_fifo;                       
00218 
00221     DataType unlockedPopFrontOrBack();
00222 };
00223 
00224 
00225 template <class T>
00226 inline void WorkerGroup<T>::go(unsigned nWorker){    
00227     // create nWorker new threads
00228     for (unsigned i=0; i<nWorker; ++i) {
00229     m_thread_group.create_thread(Worker(*this));
00230     }
00231     m_thread_group.create_thread(Reporter(*this));
00232 }
00233 
00234 template <class T>
00235 inline void WorkerGroup<T>::threadMain(Worker& worker) throw() { 
00236     //As the worker is owned by some boost thing, we just need to add it.
00237     addWorker(worker);
00238     
00239     while (m_run) {                        // if !m_run, thread will finish.
00240     while (m_pause && m_run) {     // while paused, and not stopped, wait in sleep loop.
00241         worker.setPaused(true);
00242         wait(m_sleeptime);
00243     }
00244     worker.setPaused(false);
00245     if (!m_run) return;                     // if no longer running, stop.
00246     
00247     if (!popAndWork(worker)) {
00248         waitForData();    // if queue is empty, sleep.
00249     }
00250     }
00251     return;
00252 }
00253 
00254 template <class T>
00255 inline void WorkerGroup<T>::reportThreadMain(Reporter& reporter) throw() { 
00256   bool problemsToMrs=false;
00257   bool first=true;
00258   std::cout << "Status to: [" << m_reportTo << "]" << std::endl;
00259   while (m_run) {
00260     while (m_pause && m_run) {
00261       wait(m_reportInterval);
00262     }
00263     if (!m_run) return; 
00264     wait(m_reportInterval);
00265     {
00266       boost::recursive_mutex::scoped_lock lock(m_reportAccess);
00267       if (!lock.locked()) {
00268     continue;
00269       }
00270       bool modified = (reporter.last.queue != queueSize()||
00271                reporter.last.done != nDone() || first);
00272       if (first) first=false;
00273       if (modified){
00274     reporter.last.averageTimeEach=0.;
00275     reporter.last.timeLeft=0.;
00276     reporter.last.queue = queueSize();
00277     reporter.last.done  = nDone();
00278     if (reporter.last.done!=0) {
00279       reporter.last.averageTimeEach = static_cast<double>(m_time)/static_cast<double>(reporter.last.done);
00280     }
00281     unsigned nw=nWorkers();
00282     if (nw!=0) {
00283       reporter.last.timeLeft = reporter.last.queue*reporter.last.averageTimeEach / static_cast<double>(nw);
00284     }
00285     if (m_reportTo!="") {
00286       try {Sct::ISUtilities::addOrUpdateOrThrow(m_reportTo, reporter.last,
00287                             __FILE__, __LINE__, MRS_DIAGNOSTIC);
00288       }catch(Sct::Throwable& e){
00289         if (!problemsToMrs){
00290           problemsToMrs=true;
00291           e.sendToMrs(MRS_DIAGNOSTIC);
00292         }
00293       }
00294     }
00295       }
00296     }
00297   }
00298   return;
00299 }
00300 
00301 template<class T>
00302 inline void WorkerGroup<T>::addWorker(Worker& worker) {
00303     boost::mutex::scoped_lock lock(m_vectorAccess);
00304     m_workers.push_back(&worker);
00305 }
00306 
00307 template <class T>
00308 inline void WorkerGroup<T>::push(DataType ob){
00309     boost::mutex::scoped_lock lock(m_queueAccess);
00310     if (lock.locked()) m_queue.push_back(ob);
00311     else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00312 }
00313 
00314 template <class T>
00315 inline T WorkerGroup<T>::unlockedPopFrontOrBack(){
00316   if (m_queue.empty()) throw Sct::IllegalStateError("Queue is emtpy!", __FILE__, __LINE__);
00317   if (m_fifo){
00318     DataType data=m_queue.front();
00319     m_queue.pop_front();
00320     return data;
00321   }else{
00322     DataType data=m_queue.back();
00323     m_queue.pop_back();
00324     return data;
00325   }
00326 }
00327 
00328 template <class T>
00329 inline T WorkerGroup<T>::pop(){
00330     boost::mutex::scoped_lock lock(m_queueAccess);
00331     if (lock.locked()) {
00332       return unlockedPopFrontOrBack();
00333     } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00334 }
00335 
00336 template <class W>
00337 inline bool WorkerGroup<W>::popAndWork(Worker& worker){
00338     boost::mutex::scoped_lock lock(m_queueAccess);
00339     if (lock.locked()) {    
00340     if (!m_queue.empty()) {
00341             DataType data=unlockedPopFrontOrBack();
00342         lock.unlock();           //Don't hold onto the lock for too long!
00343         worker.setBusy(true);        // set the busy flag
00344         boost::timer t;
00345         work(data);                  // call the function to do work.
00346         addTime(t.elapsed());
00347         worker.setBusy(false);       // unset the busy flag 
00348         return true;
00349     } else return false;
00350     } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00351 }
00352 
00353 template <class W>    
00354 inline unsigned WorkerGroup<W>::queueSize() const{
00355     return m_queue.size();
00356 }
00357 
00358 template <class W>    
00359 inline unsigned long WorkerGroup<W>::nDone() const{
00360     return m_nDone;
00361 }
00362 
00363 template <class W>
00364 inline unsigned WorkerGroup<W>::nWorkers() const{
00365     return m_workers.size();
00366 }
00367 
00368 template <class W>
00369 inline void WorkerGroup<W>::stop(){
00370     m_run = false;
00371     join();
00372 }
00373 
00374 template <class W>
00375 inline void WorkerGroup<W>::join(){
00376     m_thread_group.join_all();
00377 }
00378 
00379 template <class W>
00380 inline void WorkerGroup<W>::setPaused(bool p){
00381     m_pause = p;
00382 }
00383 
00384 template <class W>
00385 inline unsigned WorkerGroup<W>::paused() const{
00386     unsigned npaused=0;
00387     for (ConstIterator it=m_workers.begin(); it!=m_workers.end(); ++it) 
00388     if ( (*it)->paused() ) ++npaused;
00389 
00390     return npaused;
00391 }
00392 
00393 template <class W>
00394 inline unsigned WorkerGroup<W>::busy() const{
00395     unsigned nbusy=0;
00396     for (ConstIterator it=m_workers.begin(); it!=m_workers.end(); ++it) {
00397     if ( (*it)->busy() ) ++nbusy;
00398     }
00399     return nbusy;
00400 }
00401 
00402 template <class W>
00403 inline void WorkerGroup<W>::setSleepTime(unsigned t){
00404     m_sleeptime = t;
00405 }
00406 
00407 template <class W>
00408 inline void WorkerGroup<W>::wait(int wait_time) {
00409     boost::xtime xt;
00410     boost::xtime_get(&xt, boost::TIME_UTC);
00411     xt.nsec += wait_time*1000000;
00412     boost::thread::sleep(xt);
00413 }
00414 
00415 template <class W>
00416 inline void WorkerGroup<W>::waitForData(){
00417     wait(getSleepTime());
00418 }
00419 
00420 template <class T>
00421 void WorkerGroup<T>::reportTo(std::string isname){
00422   boost::recursive_mutex::scoped_lock lock(m_reportAccess);
00423   m_reportTo=isname;
00424 }
00425 
00426 template <class T>
00427 inline void WorkerGroup<T>::reportInterval(int msec){
00428   boost::recursive_mutex::scoped_lock lock(m_reportAccess);
00429   if (msec>100){
00430     m_reportInterval=msec;
00431   }
00432 }
00433 
00434 template <class W>
00435 inline void WorkerGroup<W>::addTime(double t){
00436   boost::recursive_mutex::scoped_lock lock(m_reportAccess);
00437   m_time+=t;
00438   ++m_nDone;
00439 }
00440 
00441 template <class W>
00442 inline void WorkerGroup<W>::setFifo(bool val){
00443   boost::mutex::scoped_lock lock(m_queueAccess);
00444   if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00445   m_fifo=val;
00446 }
00447 
00448 template <class W>
00449 inline bool WorkerGroup<W>::isFifo()const{
00450   boost::mutex::scoped_lock lock(m_queueAccess);
00451   if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00452   return m_fifo;
00453 }
00454 
00455 
00456 
00457 template <class W>
00458 inline bool WorkerGroup<W>::Worker::busy(){
00459   boost::mutex::scoped_lock lock(m_statusMutex);
00460   if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00461   return m_busy;
00462 }
00463 
00464 template <class W>
00465 inline bool WorkerGroup<W>::Worker::paused(){
00466   boost::mutex::scoped_lock lock(m_statusMutex);
00467   if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00468   return m_paused;
00469 }
00470 
00471 template <class W>
00472 void WorkerGroup<W>::Worker::setBusy(bool val){
00473   boost::mutex::scoped_lock lock(m_statusMutex);
00474   if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00475   m_busy=val;
00476 }
00477 
00478 template <class W>
00479 inline void WorkerGroup<W>::Worker::setPaused(bool val){
00480   boost::mutex::scoped_lock lock(m_statusMutex);
00481   if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00482   m_paused=val;
00483 }
00484 
00485 } // end of namepace SctService
00486 #endif //#ifndef WORKERGROUP_H

Generated on Mon Feb 6 14:01:37 2006 for SCT DAQ/DCS Software - C++ by  doxygen 1.4.6