00001 #ifndef WORKERGROUP_H
00002 #define WORKERGROUP_H
00003 #include <list>
00004 #include <vector>
00005 #include <algorithm>
00006
00007 #include <boost/thread.hpp>
00008 #include <boost/timer.hpp>
00009
00010 #include "Sct/LogicErrors.h"
00011 #include "Sct/ISUtilities.h"
00012 #include "Sct/SctNames.h"
00013 #include "IS/ServiceStatus.h"
00014
00015 namespace SctService{
00016
00027 template <class T>
00028 class WorkerGroup : private boost::noncopyable {
00029 public:
00030 typedef T DataType;
00031
00033 void go(unsigned nWorker=1);
00034
00039 unsigned queueSize() const;
00043 unsigned nWorkers() const;
00047 unsigned long nDone() const;
00052 unsigned busy() const;
00056 void stop();
00057
00058
00059
00060
00061 void setPaused(bool paused=true);
00065 void join();
00069 unsigned paused() const;
00074 void setSleepTime(unsigned msec);
00078 unsigned getSleepTime() const {return m_sleeptime;}
00079
00080
00081
00082
00083 DataType pop();
00087 void push(DataType ob);
00088
00089
00090
00091
00092
00093
00094 void setFifo(bool val);
00095
00096
00097
00098 bool isFifo() const;
00100 virtual ~WorkerGroup() {;}
00101
00103 WorkerGroup() : m_run(true), m_pause(false), m_sleeptime(1), m_reportInterval(1000),
00104 m_nDone(0), m_fifo(true) {}
00105
00107 void reportTo(std::string isname);
00109 void reportInterval(int msec);
00110 protected:
00112 class Worker {
00113 public:
00114 Worker(WorkerGroup<DataType>& group) : group(group), m_busy(false), m_paused(false) {
00115 }
00116
00117 Worker(const Worker& w) : group(w.group), m_busy(false), m_paused(false) {}
00118
00119 void operator() () {
00120 group.threadMain(*this);
00121 }
00122 WorkerGroup<DataType>& group;
00124 bool busy();
00126 bool paused();
00128 void setBusy(bool v);
00130 void setPaused(bool v);
00132
00133 private:
00134 boost::mutex m_statusMutex;
00135 volatile bool m_busy;
00136 volatile bool m_paused;
00137 };
00138
00140 class Reporter {
00141 public:
00142 Reporter(WorkerGroup<DataType>& group) : group(group){
00143 }
00144 void operator() (){
00145 group.reportThreadMain(*this);
00146 }
00147 WorkerGroup<DataType>& group;
00148 ServiceStatus last;
00149 };
00150
00152 virtual void reportThreadMain(Reporter& reporter) throw();
00153
00159 virtual void threadMain(Worker& worker) throw();
00164 virtual bool popAndWork(Worker& worker);
00168 virtual void waitForData();
00172 virtual void work(DataType data) throw() = 0;
00176 void wait(int msec);
00180 void addWorker(Worker& worker);
00181
00182 typedef typename std::vector<Worker*> WorkerStore;
00183 typedef typename WorkerStore::iterator Iterator;
00184 typedef typename WorkerStore::const_iterator ConstIterator;
00185
00191 WorkerStore m_workers;
00192
00197 std::list<DataType> m_queue;
00198
00199
00201 boost::thread_group m_thread_group;
00202
00203 volatile bool m_run;
00204 volatile bool m_pause;
00205 volatile unsigned m_sleeptime;
00206 std::string m_reportTo;
00207 volatile int m_reportInterval;
00208
00209 mutable boost::mutex m_queueAccess;
00210 mutable boost::mutex m_vectorAccess;
00211 mutable boost::recursive_mutex m_reportAccess;
00212 volatile unsigned long m_nDone;
00213 volatile double m_time;
00214 friend class Worker;
00215 friend class Reporter;
00216 void addTime(double t);
00217 bool m_fifo;
00218
00221 DataType unlockedPopFrontOrBack();
00222 };
00223
00224
00225 template <class T>
00226 inline void WorkerGroup<T>::go(unsigned nWorker){
00227
00228 for (unsigned i=0; i<nWorker; ++i) {
00229 m_thread_group.create_thread(Worker(*this));
00230 }
00231 m_thread_group.create_thread(Reporter(*this));
00232 }
00233
00234 template <class T>
00235 inline void WorkerGroup<T>::threadMain(Worker& worker) throw() {
00236
00237 addWorker(worker);
00238
00239 while (m_run) {
00240 while (m_pause && m_run) {
00241 worker.setPaused(true);
00242 wait(m_sleeptime);
00243 }
00244 worker.setPaused(false);
00245 if (!m_run) return;
00246
00247 if (!popAndWork(worker)) {
00248 waitForData();
00249 }
00250 }
00251 return;
00252 }
00253
00254 template <class T>
00255 inline void WorkerGroup<T>::reportThreadMain(Reporter& reporter) throw() {
00256 bool problemsToMrs=false;
00257 bool first=true;
00258 std::cout << "Status to: [" << m_reportTo << "]" << std::endl;
00259 while (m_run) {
00260 while (m_pause && m_run) {
00261 wait(m_reportInterval);
00262 }
00263 if (!m_run) return;
00264 wait(m_reportInterval);
00265 {
00266 boost::recursive_mutex::scoped_lock lock(m_reportAccess);
00267 if (!lock.locked()) {
00268 continue;
00269 }
00270 bool modified = (reporter.last.queue != queueSize()||
00271 reporter.last.done != nDone() || first);
00272 if (first) first=false;
00273 if (modified){
00274 reporter.last.averageTimeEach=0.;
00275 reporter.last.timeLeft=0.;
00276 reporter.last.queue = queueSize();
00277 reporter.last.done = nDone();
00278 if (reporter.last.done!=0) {
00279 reporter.last.averageTimeEach = static_cast<double>(m_time)/static_cast<double>(reporter.last.done);
00280 }
00281 unsigned nw=nWorkers();
00282 if (nw!=0) {
00283 reporter.last.timeLeft = reporter.last.queue*reporter.last.averageTimeEach / static_cast<double>(nw);
00284 }
00285 if (m_reportTo!="") {
00286 try {Sct::ISUtilities::addOrUpdateOrThrow(m_reportTo, reporter.last,
00287 __FILE__, __LINE__, MRS_DIAGNOSTIC);
00288 }catch(Sct::Throwable& e){
00289 if (!problemsToMrs){
00290 problemsToMrs=true;
00291 e.sendToMrs(MRS_DIAGNOSTIC);
00292 }
00293 }
00294 }
00295 }
00296 }
00297 }
00298 return;
00299 }
00300
00301 template<class T>
00302 inline void WorkerGroup<T>::addWorker(Worker& worker) {
00303 boost::mutex::scoped_lock lock(m_vectorAccess);
00304 m_workers.push_back(&worker);
00305 }
00306
00307 template <class T>
00308 inline void WorkerGroup<T>::push(DataType ob){
00309 boost::mutex::scoped_lock lock(m_queueAccess);
00310 if (lock.locked()) m_queue.push_back(ob);
00311 else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00312 }
00313
00314 template <class T>
00315 inline T WorkerGroup<T>::unlockedPopFrontOrBack(){
00316 if (m_queue.empty()) throw Sct::IllegalStateError("Queue is emtpy!", __FILE__, __LINE__);
00317 if (m_fifo){
00318 DataType data=m_queue.front();
00319 m_queue.pop_front();
00320 return data;
00321 }else{
00322 DataType data=m_queue.back();
00323 m_queue.pop_back();
00324 return data;
00325 }
00326 }
00327
00328 template <class T>
00329 inline T WorkerGroup<T>::pop(){
00330 boost::mutex::scoped_lock lock(m_queueAccess);
00331 if (lock.locked()) {
00332 return unlockedPopFrontOrBack();
00333 } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00334 }
00335
00336 template <class W>
00337 inline bool WorkerGroup<W>::popAndWork(Worker& worker){
00338 boost::mutex::scoped_lock lock(m_queueAccess);
00339 if (lock.locked()) {
00340 if (!m_queue.empty()) {
00341 DataType data=unlockedPopFrontOrBack();
00342 lock.unlock();
00343 worker.setBusy(true);
00344 boost::timer t;
00345 work(data);
00346 addTime(t.elapsed());
00347 worker.setBusy(false);
00348 return true;
00349 } else return false;
00350 } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00351 }
00352
00353 template <class W>
00354 inline unsigned WorkerGroup<W>::queueSize() const{
00355 return m_queue.size();
00356 }
00357
00358 template <class W>
00359 inline unsigned long WorkerGroup<W>::nDone() const{
00360 return m_nDone;
00361 }
00362
00363 template <class W>
00364 inline unsigned WorkerGroup<W>::nWorkers() const{
00365 return m_workers.size();
00366 }
00367
00368 template <class W>
00369 inline void WorkerGroup<W>::stop(){
00370 m_run = false;
00371 join();
00372 }
00373
00374 template <class W>
00375 inline void WorkerGroup<W>::join(){
00376 m_thread_group.join_all();
00377 }
00378
00379 template <class W>
00380 inline void WorkerGroup<W>::setPaused(bool p){
00381 m_pause = p;
00382 }
00383
00384 template <class W>
00385 inline unsigned WorkerGroup<W>::paused() const{
00386 unsigned npaused=0;
00387 for (ConstIterator it=m_workers.begin(); it!=m_workers.end(); ++it)
00388 if ( (*it)->paused() ) ++npaused;
00389
00390 return npaused;
00391 }
00392
00393 template <class W>
00394 inline unsigned WorkerGroup<W>::busy() const{
00395 unsigned nbusy=0;
00396 for (ConstIterator it=m_workers.begin(); it!=m_workers.end(); ++it) {
00397 if ( (*it)->busy() ) ++nbusy;
00398 }
00399 return nbusy;
00400 }
00401
00402 template <class W>
00403 inline void WorkerGroup<W>::setSleepTime(unsigned t){
00404 m_sleeptime = t;
00405 }
00406
00407 template <class W>
00408 inline void WorkerGroup<W>::wait(int wait_time) {
00409 boost::xtime xt;
00410 boost::xtime_get(&xt, boost::TIME_UTC);
00411 xt.nsec += wait_time*1000000;
00412 boost::thread::sleep(xt);
00413 }
00414
00415 template <class W>
00416 inline void WorkerGroup<W>::waitForData(){
00417 wait(getSleepTime());
00418 }
00419
00420 template <class T>
00421 void WorkerGroup<T>::reportTo(std::string isname){
00422 boost::recursive_mutex::scoped_lock lock(m_reportAccess);
00423 m_reportTo=isname;
00424 }
00425
00426 template <class T>
00427 inline void WorkerGroup<T>::reportInterval(int msec){
00428 boost::recursive_mutex::scoped_lock lock(m_reportAccess);
00429 if (msec>100){
00430 m_reportInterval=msec;
00431 }
00432 }
00433
00434 template <class W>
00435 inline void WorkerGroup<W>::addTime(double t){
00436 boost::recursive_mutex::scoped_lock lock(m_reportAccess);
00437 m_time+=t;
00438 ++m_nDone;
00439 }
00440
00441 template <class W>
00442 inline void WorkerGroup<W>::setFifo(bool val){
00443 boost::mutex::scoped_lock lock(m_queueAccess);
00444 if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00445 m_fifo=val;
00446 }
00447
00448 template <class W>
00449 inline bool WorkerGroup<W>::isFifo()const{
00450 boost::mutex::scoped_lock lock(m_queueAccess);
00451 if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00452 return m_fifo;
00453 }
00454
00455
00456
00457 template <class W>
00458 inline bool WorkerGroup<W>::Worker::busy(){
00459 boost::mutex::scoped_lock lock(m_statusMutex);
00460 if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00461 return m_busy;
00462 }
00463
00464 template <class W>
00465 inline bool WorkerGroup<W>::Worker::paused(){
00466 boost::mutex::scoped_lock lock(m_statusMutex);
00467 if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00468 return m_paused;
00469 }
00470
00471 template <class W>
00472 void WorkerGroup<W>::Worker::setBusy(bool val){
00473 boost::mutex::scoped_lock lock(m_statusMutex);
00474 if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00475 m_busy=val;
00476 }
00477
00478 template <class W>
00479 inline void WorkerGroup<W>::Worker::setPaused(bool val){
00480 boost::mutex::scoped_lock lock(m_statusMutex);
00481 if (!lock.locked()) throw Sct::IllegalStateError("Could not get status lock",__FILE__, __LINE__);
00482 m_paused=val;
00483 }
00484
00485 }
00486 #endif //#ifndef WORKERGROUP_H