Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Data Structures | File List | Namespace Members | Data Fields | Related Pages

AnalysisWorkerGroup.cpp

00001 #include "AnalysisWorkerGroup.h"
00002 #include "AnalysisAlgorithm.h"
00003 #include "AnalysisAlgorithmMap.h"
00004 #include "Sct/IS/IOManagerIS.h"
00005 #include "Sct/ISProxy/IOManagerISProxy.h"
00006 #include "Sct/IS/IONameIS.h"
00007 #include "Sct/StdExceptionWrapper.h"
00008 #include "SctData/UniqueID.h"
00009 #include "SctData/TestResult.h"
00010 #include "CalibrationController/IS/TestData.h"
00011 #include <boost/timer.hpp>
00012 
00013 using namespace SctService;
00014 using namespace boost;
00015 using namespace std;
00016 using namespace Sct;
00017 using namespace Sct::IS;
00018 using namespace SctData;
00019 
00020 namespace SctAnalysis {
00021 
00022 //TestAlgs methods
00023 
00024 
00025 AnalysisWorkerGroup::TestAlgs::~TestAlgs() {
00026     boost::recursive_mutex::scoped_lock lock(m_access);
00027 
00028     //We need to explicitly delete the list here so the lock is in force
00029     //Note that this just removes the pointer..actual deletion will only occur
00030     //if noone else has grabbed a reference to the object
00031     m_algorithms.clear();
00032 }
00033 
00034 void AnalysisWorkerGroup::TestAlgs::removeAlgorithm(shared_ptr<AnalysisAlgorithm> alg) {
00035     boost::recursive_mutex::scoped_lock lock(m_access);
00036     list<shared_ptr<AnalysisAlgorithm> >::iterator it =
00037         find(m_algorithms.begin(), m_algorithms.end(), alg);
00038 
00039     if ( it != m_algorithms.end() )
00040         m_algorithms.erase(it);
00041 }
00042 
00043 ostream& AnalysisWorkerGroup::TestAlgs::printStatus(ostream& os) const throw() {
00044     boost::recursive_mutex::scoped_lock lock(m_access);
00045     os << "   Test name: "  << getTest().testName 
00046        << ", run number = " << getTest().runNumber
00047        << ", first scan number = " << getTest().startScanNumber
00048        << ", nscans = " << getTest().nScans 
00049        <<  endl;
00050 
00051     return os;
00052 }
00053 
00054 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::findAlgorithm(const string& modulename) {
00055     boost::recursive_mutex::scoped_lock lock(m_access);
00056     shared_ptr<AnalysisAlgorithm> alg;
00057     for (list<shared_ptr<AnalysisAlgorithm> >::const_iterator it=m_algorithms.begin();
00058             it != m_algorithms.end();
00059             ++it) {
00060         if ( (*it)->getModuleName()==modulename ) {
00061             alg=(*it);
00062             break;
00063         }
00064     }
00065     if (!alg) {
00066         alg=addAlgorithm(modulename);
00067     }
00068     return alg;
00069 }
00070 
00071 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::addAlgorithm(const string& modulename) {
00072     boost::recursive_mutex::scoped_lock lock(m_access);
00073     shared_ptr<AnalysisAlgorithm> alg = AnalysisAlgorithmMap::instance().getAlgorithm(getTest(), modulename);
00074     m_algorithms.push_back(alg);
00075     return alg;
00076 }
00077 
00078 
00079 
00080 //AnalysisWorkerGroup methods
00081 
00082 
00083 ostream&  AnalysisWorkerGroup::printStatus(ostream& os) const throw() {
00084     cout << "Getting tests lock" << endl;
00085     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00086     cout << "got tests lock" << endl;
00087     os << "===============================================\n" << endl;
00088     os << "AnalysisWorkerGroup tests:" << endl;
00089     unsigned ntest=0;
00090     for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00091             it != m_tests.end() ; ++it ) {
00092         (*it)->printStatus(os);
00093         os << "-------------------------------------------------"<<endl;
00094         ++ntest;
00095     }
00096     os << endl << "Total number of tests = " << ntest << endl;
00097     cout << "done ostreaming" << endl;
00098     return os;
00099 }
00100 
00101 void AnalysisWorkerGroup::addTest(shared_ptr<const TestData> testdata) {
00102     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00103     m_tests.push_back(shared_ptr<TestAlgs>(new TestAlgs(testdata) ) );
00104 }
00105 
00106 void AnalysisWorkerGroup::removeTestsUpTo(shared_ptr<const TestData> testdata) {
00107     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00108     list<shared_ptr<TestAlgs> >::iterator it = m_tests.begin();
00109 
00110     while( it != m_tests.end() ) { // no ++it here since we need to delete it in the function!
00111         if ((*it)->getTest().runNumber==testdata->runNumber
00112                 && (*it)->getTest().startScanNumber<=testdata->startScanNumber) {
00113             
00114         cout << "Deleting old test for run "
00115          << (*it)->getTest().runNumber << ", start scan "
00116                  << (*it)->getTest().startScanNumber
00117                  << endl;
00118             it=m_tests.erase(it);   // erase returns ++it
00119         } else {
00120             ++it;                   // increment ourselves
00121         }
00122     }
00123 }
00124 
00125 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const TestData& testdata) const throw() {
00126     return findTest(testdata.runNumber, testdata.startScanNumber);
00127 }
00128 
00129 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const unsigned long runno, const unsigned long scanno) const throw() {
00130     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00131     shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs;
00132     for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00133             it != m_tests.end();
00134             ++it ) {
00135         const TestData& td=(*it)->getTest();
00136         if ( runno == td.runNumber &&
00137                 scanno >= td.startScanNumber &&
00138                 scanno < td.startScanNumber + td.nScans) {
00139             talgs=*it;
00140             //      cout << "found test for run " << runno<< ", scan " << scanno << endl;
00141         }
00142     }
00143     return talgs;
00144 }
00145 
00146 
00147 
00148 
00149 void AnalysisWorkerGroup::purge() throw() {
00150     //Stop the workers from trying to get on with things
00151     this->setPaused(true);
00152     while (paused() != nWorkers()) sleep(1);
00153     
00154     // Clear the queue.
00155     shared_ptr<IOName> s;
00156     do {s=pop(); } while(s);
00157     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00158 
00159     // Clear the test data.
00160     cout << "Clearing tests ... " << endl;
00161     m_tests.clear();
00162     cout << " ... done clearing tests" << endl;
00163     this->setPaused(false);    
00164 }
00165 
00166 void AnalysisWorkerGroup::work(shared_ptr<IOName> name) throw() {
00167     using namespace SctData;
00168     timer t;
00169     try {
00170         //      cout << "AnalysisWorker: name = "<< *name << " Q="<<wg->queueSize()<<", busy="<< wg->busy() << endl;        
00171     UniqueID id (name->getUniqueID());
00172         const unsigned runno = id.getRunNumber();
00173         const unsigned scanno = id.getScanNumber();
00174         const string modulename= id.getModule();
00175 
00176         //      cout << "run="<< runno << ", scan="<< scanno << ", modulename="<<modulename
00177         //       << ", classname="<<isname.getClassName()<<endl;
00178 
00179         shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs = findTest(runno, scanno);
00180 
00181         //      cout << "Found test" << endl;
00182 
00183         if (!talgs) {
00184            std::ostringstream oss; oss << "AnalysisWorker::work. No test found for run " 
00185                                << runno << " scan" << scanno;
00186             throw IllegalStateError(oss.str(), __FILE__, __LINE__);
00187         }
00188         // Get a testresult. Add a new testresult if one dosen't exist for this module
00189         shared_ptr<AnalysisAlgorithm> algorithm = talgs->findAlgorithm(modulename);
00190 
00191         //      cout << "algorithm.get()=" << algorithm.get() << endl;
00192 
00193         bool isfit=(name->getClassName()=="SctData::FitScanResult");
00194 
00195         // lock the approprate result and do your stuff, analysis-algorithm.
00196         boost::recursive_mutex::scoped_lock lock (algorithm->getMutex());
00197         if (isfit) {
00198             //      cout << "got fit from queue" << endl;
00199             algorithm->addFitScanResult(name);
00200         } else {
00201             //      cout << "got raw from queue" << endl;
00202             algorithm->addRawScanResult(name);
00203         }
00204     if (algorithm->canAnalyze()) {
00205         if (algorithm->isDone()) throw IllegalStateError("AnalysisWorker::work.  Algorithm has additional, unnecessary Scan - analysis already done", __FILE__, __LINE__);
00206         algorithm->checkTestResultExists();
00207         algorithm->addOverheadTime(t.elapsed());
00208         
00209         t.restart();
00210         algorithm->loadData();
00211         algorithm->addIOTime(t.elapsed());
00212         
00213         t.restart();
00214         algorithm->analyze();
00215         algorithm->addAnalysisTime(t.elapsed());
00216         
00217         t.restart();
00218         algorithm->finish();
00219         algorithm->addIOTime(t.elapsed());
00220         talgs->removeAlgorithm(algorithm);
00221     } else {
00222         algorithm->addOverheadTime(t.elapsed());
00223     }
00224     
00225     } catch (InvalidArgumentError& e) {
00226     e.sendToMrs();
00227     } catch(Throwable& e) {
00228         e.sendToMrs(MRS_ERROR);
00229     } catch(std::exception& e) {
00230     StdExceptionWrapper sew(e);
00231         sew.sendToMrs(MRS_ERROR);        
00232     } catch(...) {
00233     Error e("uncaught unknown exception", __FILE__, __LINE__);
00234     e.sendToMrs(MRS_ERROR);
00235     }
00236     //  cout << "Work done" << endl;
00237 }
00238 
00239 
00240 }// end of namespace SctAnalysis

Generated on Thu Jul 15 09:50:42 2004 for SCT DAQ/DCS Software - C++ by doxygen 1.3.5