00001 #ifndef WORKERGROUP_H
00002 #define WORKERGROUP_H
00003 #include <queue>
00004 #include <vector>
00005 #include <algorithm>
00006 #include <boost/thread.hpp>
00007
00008 #include "Sct/LogicErrors.h"
00009
00010 namespace SctService{
00011
00022 template <class T>
00023 class WorkerGroup : private boost::noncopyable {
00024 public:
00025 typedef T DataType;
00026
00028 void go(unsigned nWorker=1);
00029
00034 unsigned queueSize();
00038 unsigned nWorkers();
00043 unsigned busy();
00047 void stop();
00048
00049
00050
00051
00052 void setPaused(bool paused=true);
00056 void join();
00060 unsigned paused();
00065 void setSleepTime(unsigned msec);
00069 unsigned getSleepTime() {return m_sleeptime;}
00070
00071
00072
00073
00074 DataType pop();
00078 void push(DataType ob);
00079
00081 virtual ~WorkerGroup() {;}
00082
00084 WorkerGroup() : m_run(true), m_pause(false), m_sleeptime(1) {}
00085
00086 protected:
00087
00088 class Worker {
00089 public:
00090 Worker(WorkerGroup<DataType>& group) : group(group), m_busy(false), m_paused(false) {}
00091 void operator() () {
00092 group.threadMain(*this);
00093 }
00094 WorkerGroup<DataType>& group;
00095 volatile bool m_busy;
00096 volatile bool m_paused;
00097 };
00098
00104 virtual void threadMain(Worker& worker) throw();
00109 virtual bool popAndWork(Worker& worker);
00113 virtual void waitForData();
00117 virtual void work(DataType data) throw() = 0;
00121 void wait(int msec);
00125 void addWorker(Worker& worker);
00126
00127 typedef typename std::vector<Worker*> WorkerStore;
00128 typedef typename WorkerStore::iterator Iterator;
00129
00135 WorkerStore m_workers;
00136
00141 std::queue<DataType> m_queue;
00142
00143
00145 boost::thread_group m_thread_group;
00146
00147 volatile bool m_run;
00148 volatile bool m_pause;
00149 volatile unsigned m_sleeptime;
00150
00151 boost::mutex m_queueAccess;
00152 boost::mutex m_vectorAccess;
00153 friend class Worker;
00154 };
00155
00156
00157 template <class T>
00158 void WorkerGroup<T>::go(unsigned nWorker){
00159
00160 for (unsigned i=0; i<nWorker; ++i) {
00161 m_thread_group.create_thread(Worker(*this));
00162 }
00163 }
00164
00165 template <class T>
00166 void WorkerGroup<T>::threadMain(Worker& worker) throw() {
00167
00168 addWorker(worker);
00169
00170 while (m_run) {
00171 while (m_pause && m_run) {
00172 worker.m_paused = true;
00173 wait(m_sleeptime);
00174 }
00175 worker.m_paused = false;
00176 if (!m_run) return;
00177
00178 if (!popAndWork(worker)) {
00179 waitForData();
00180 }
00181 }
00182 return;
00183 }
00184
00185 template<class T>
00186 void WorkerGroup<T>::addWorker(Worker& worker) {
00187 boost::mutex::scoped_lock lock(m_vectorAccess);
00188 m_workers.push_back(&worker);
00189 }
00190
00191 template <class T>
00192 void WorkerGroup<T>::push(DataType ob){
00193 boost::mutex::scoped_lock lock(m_queueAccess);
00194 if (lock.locked()) m_queue.push(ob);
00195 else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00196 }
00197
00198 template <class T>
00199 T WorkerGroup<T>::pop(){
00200 boost::mutex::scoped_lock lock(m_queueAccess);
00201 if (lock.locked()) {
00202 if (m_queue.size()!=0) {
00203 DataType a=m_queue.front();
00204 m_queue.pop();
00205 return a;
00206 }
00207 return DataType();
00208 } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00209 }
00210
00211 template <class W>
00212 bool WorkerGroup<W>::popAndWork(Worker& worker){
00213 boost::mutex::scoped_lock lock(m_queueAccess);
00214 if (lock.locked()) {
00215 if (m_queue.size()!=0) {
00216 DataType data=m_queue.front();
00217 m_queue.pop();
00218 lock.unlock();
00219 worker.m_busy = true;
00220 work(data);
00221 worker.m_busy = false;
00222 return true;
00223 } else return false;
00224 } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00225 }
00226
00227 template <class W>
00228 unsigned WorkerGroup<W>::queueSize(){
00229 return m_queue.size();
00230 }
00231
00232 template <class W>
00233 unsigned WorkerGroup<W>::nWorkers(){
00234 return m_workers.size();
00235 }
00236
00237 template <class W>
00238 void WorkerGroup<W>::stop(){
00239 m_run = false;
00240 join();
00241 }
00242
00243 template <class W>
00244 void WorkerGroup<W>::join(){
00245 m_thread_group.join_all();
00246 }
00247
00248 template <class W>
00249 void WorkerGroup<W>::setPaused(bool p){
00250 m_pause = p;
00251 }
00252
00253 template <class W>
00254 unsigned WorkerGroup<W>::paused(){
00255 unsigned npaused=0;
00256 for (Iterator it=m_workers.begin(); it!=m_workers.end(); ++it)
00257 if ( (*it)->m_paused ) ++npaused;
00258
00259 return npaused;
00260 }
00261
00262 template <class W>
00263 unsigned WorkerGroup<W>::busy(){
00264 unsigned nbusy=0;
00265 for (Iterator it=m_workers.begin(); it!=m_workers.end(); ++it) {
00266 if ( (*it)->m_busy ) ++nbusy;
00267 }
00268 return nbusy;
00269 }
00270
00271 template <class W>
00272 void WorkerGroup<W>::setSleepTime(unsigned t){
00273 m_sleeptime = t;
00274 }
00275
00276 template <class W>
00277 void WorkerGroup<W>::wait(int wait_time) {
00278 boost::xtime xt;
00279 boost::xtime_get(&xt, boost::TIME_UTC);
00280 xt.nsec += wait_time*1000000;
00281 boost::thread::sleep(xt);
00282 }
00283
00284 template <class W>
00285 void WorkerGroup<W>::waitForData(){
00286 wait(getSleepTime());
00287 }
00288
00289
00290 }
00291 #endif //#ifndef WORKERGROUP_H