00001 #ifndef WORKERGROUP_H
00002 #define WORKERGROUP_H
00003 #include <queue>
00004 #include <vector>
00005 #include <algorithm>
00006 #include <boost/thread.hpp>
00007
00008 #include "Sct/LogicErrors.h"
00009
00010 namespace SctService{
00011
00022 template <class T>
00023 class WorkerGroup : private boost::noncopyable {
00024 public:
00025 typedef T DataType;
00026
00028 void go(unsigned nWorker=1);
00029
00034 unsigned queueSize() const;
00038 unsigned nWorkers() const;
00043 unsigned busy() const;
00047 void stop();
00048
00049
00050
00051
00052 void setPaused(bool paused=true);
00056 void join();
00060 unsigned paused() const;
00065 void setSleepTime(unsigned msec);
00069 unsigned getSleepTime() const {return m_sleeptime;}
00070
00071
00072
00073
00074 DataType pop();
00078 void push(DataType ob);
00079
00081 virtual ~WorkerGroup() {;}
00082
00084 WorkerGroup() : m_run(true), m_pause(false), m_sleeptime(1) {}
00085
00086 protected:
00087
00088 class Worker {
00089 public:
00090 Worker(WorkerGroup<DataType>& group) : group(group), m_busy(false), m_paused(false) {}
00091 void operator() () {
00092 group.threadMain(*this);
00093 }
00094 WorkerGroup<DataType>& group;
00095 volatile bool m_busy;
00096 volatile bool m_paused;
00097 };
00098
00104 virtual void threadMain(Worker& worker) throw();
00109 virtual bool popAndWork(Worker& worker);
00113 virtual void waitForData();
00117 virtual void work(DataType data) throw() = 0;
00121 void wait(int msec);
00125 void addWorker(Worker& worker);
00126
00127 typedef typename std::vector<Worker*> WorkerStore;
00128 typedef typename WorkerStore::iterator Iterator;
00129 typedef typename WorkerStore::const_iterator ConstIterator;
00130
00136 WorkerStore m_workers;
00137
00142 std::queue<DataType> m_queue;
00143
00144
00146 boost::thread_group m_thread_group;
00147
00148 volatile bool m_run;
00149 volatile bool m_pause;
00150 volatile unsigned m_sleeptime;
00151
00152 boost::mutex m_queueAccess;
00153 boost::mutex m_vectorAccess;
00154 friend class Worker;
00155 };
00156
00157
00158 template <class T>
00159 void WorkerGroup<T>::go(unsigned nWorker){
00160
00161 for (unsigned i=0; i<nWorker; ++i) {
00162 m_thread_group.create_thread(Worker(*this));
00163 }
00164 }
00165
00166 template <class T>
00167 void WorkerGroup<T>::threadMain(Worker& worker) throw() {
00168
00169 addWorker(worker);
00170
00171 while (m_run) {
00172 while (m_pause && m_run) {
00173 worker.m_paused = true;
00174 wait(m_sleeptime);
00175 }
00176 worker.m_paused = false;
00177 if (!m_run) return;
00178
00179 if (!popAndWork(worker)) {
00180 waitForData();
00181 }
00182 }
00183 return;
00184 }
00185
00186 template<class T>
00187 void WorkerGroup<T>::addWorker(Worker& worker) {
00188 boost::mutex::scoped_lock lock(m_vectorAccess);
00189 m_workers.push_back(&worker);
00190 }
00191
00192 template <class T>
00193 void WorkerGroup<T>::push(DataType ob){
00194 boost::mutex::scoped_lock lock(m_queueAccess);
00195 if (lock.locked()) m_queue.push(ob);
00196 else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00197 }
00198
00199 template <class T>
00200 T WorkerGroup<T>::pop(){
00201 boost::mutex::scoped_lock lock(m_queueAccess);
00202 if (lock.locked()) {
00203 if (m_queue.size()!=0) {
00204 DataType a=m_queue.front();
00205 m_queue.pop();
00206 return a;
00207 }
00208 return DataType();
00209 } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00210 }
00211
00212 template <class W>
00213 bool WorkerGroup<W>::popAndWork(Worker& worker){
00214 boost::mutex::scoped_lock lock(m_queueAccess);
00215 if (lock.locked()) {
00216 if (m_queue.size()!=0) {
00217 DataType data=m_queue.front();
00218 m_queue.pop();
00219 lock.unlock();
00220 worker.m_busy = true;
00221 work(data);
00222 worker.m_busy = false;
00223 return true;
00224 } else return false;
00225 } else throw Sct::IllegalStateError("Couldn't get queue lock", __FILE__, __LINE__);
00226 }
00227
00228 template <class W>
00229 unsigned WorkerGroup<W>::queueSize() const{
00230 return m_queue.size();
00231 }
00232
00233 template <class W>
00234 unsigned WorkerGroup<W>::nWorkers() const{
00235 return m_workers.size();
00236 }
00237
00238 template <class W>
00239 void WorkerGroup<W>::stop(){
00240 m_run = false;
00241 join();
00242 }
00243
00244 template <class W>
00245 void WorkerGroup<W>::join(){
00246 m_thread_group.join_all();
00247 }
00248
00249 template <class W>
00250 void WorkerGroup<W>::setPaused(bool p){
00251 m_pause = p;
00252 }
00253
00254 template <class W>
00255 unsigned WorkerGroup<W>::paused() const{
00256 unsigned npaused=0;
00257 for (ConstIterator it=m_workers.begin(); it!=m_workers.end(); ++it)
00258 if ( (*it)->m_paused ) ++npaused;
00259
00260 return npaused;
00261 }
00262
00263 template <class W>
00264 unsigned WorkerGroup<W>::busy() const{
00265 unsigned nbusy=0;
00266 for (ConstIterator it=m_workers.begin(); it!=m_workers.end(); ++it) {
00267 if ( (*it)->m_busy ) ++nbusy;
00268 }
00269 return nbusy;
00270 }
00271
00272 template <class W>
00273 void WorkerGroup<W>::setSleepTime(unsigned t){
00274 m_sleeptime = t;
00275 }
00276
00277 template <class W>
00278 void WorkerGroup<W>::wait(int wait_time) {
00279 boost::xtime xt;
00280 boost::xtime_get(&xt, boost::TIME_UTC);
00281 xt.nsec += wait_time*1000000;
00282 boost::thread::sleep(xt);
00283 }
00284
00285 template <class W>
00286 void WorkerGroup<W>::waitForData(){
00287 wait(getSleepTime());
00288 }
00289
00290
00291 }
00292 #endif //#ifndef WORKERGROUP_H