AnalysisWorkerGroup.cpp

00001 #include "AnalysisWorkerGroup.h"
00002 #include "AnalysisAlgorithm.h"
00003 #include "AnalysisAlgorithmMap.h"
00004 #include "AnalysisService.h"
00005 #include "DcsInterface.h"
00006 
00007 #include "Sct/IS/IOManagerIS.h"
00008 #include "Sct/ISProxy/IOManagerISProxy.h"
00009 #include "Sct/IS/IONameIS.h"
00010 #include "Sct/StdExceptionWrapper.h"
00011 #include "Sct/SctNames.h"
00012 #include "Sct/UniqueID.h"
00013 #include "SctData/TestResult.h"
00014 #include "SctData/DcsData.h"
00015 
00016 #include "CalibrationController/IS/TestData.h"
00017 #include <boost/timer.hpp>
00018 #include <iomanip>
00019 #include <is/info.h>
00020 #include <is/infodictionary.h>
00021 #include <is/infoiterator.h>
00022 #include <is/inforeceiver.h>
00023 #include <is/infoT.h>
00024 
00025 using namespace SctService;
00026 using namespace boost;
00027 using namespace std;
00028 using namespace Sct;
00029 using namespace Sct::IS;
00030 using namespace SctData;
00031 
00032 namespace SctAnalysis {
00033 
00034 //TestAlgs methods
00035 AnalysisWorkerGroup::TestAlgs::TestAlgs(boost::shared_ptr<const TestData> testdata) : m_testdata(testdata){
00036   bool dcsProblemsReported=false;
00037   for (unsigned imodule=0; imodule<m_testdata->modules_size; ++imodule){
00038     // make dcs snapshots of all modules in test
00039     try{
00040       const std::string& module = m_testdata->modules[imodule];
00041       boost::shared_ptr<AnalysisAlgorithm> alg = findAlgorithm(module);
00042       if (!alg.get()) continue; 
00043       alg->checkTestResultExists();
00044 
00045       // try to get DcsInformation for this module
00046       try{
00047     if (!AnalysisService::instance().getDcsInterface().getConfigurationInterface().empty()) {  
00048       shared_ptr<SctData::DcsData> dcs (AnalysisService::instance().getDcsInterface().getData(module) );
00049       
00050       alg->getTestResult()->setDcsData(dcs);
00051     }
00052       }catch(Sct::Throwable& e){
00053     if (!dcsProblemsReported) {
00054       e.sendToMrs(MRS_DIAGNOSTIC);
00055       dcsProblemsReported=true;
00056     }
00057       }catch(std::exception& e){
00058     if (!dcsProblemsReported) {
00059       StdExceptionWrapper(e,__FILE__,__LINE__).sendToMrs(MRS_DIAGNOSTIC);
00060       dcsProblemsReported=true;
00061     }
00062       }
00063     }catch(Sct::Throwable& e){
00064       e.sendToMrs();
00065     }
00066   }
00067 }
00068 
00069 AnalysisWorkerGroup::TestAlgs::~TestAlgs() {
00070     boost::recursive_mutex::scoped_lock lock(m_access);
00071 
00072     //We need to explicitly delete the list here so the lock is in force
00073     //Note that this just removes the pointer..actual deletion will only occur
00074     //if noone else has grabbed a reference to the object
00075     m_algorithms.clear();
00076 }
00077 
00078 void AnalysisWorkerGroup::TestAlgs::removeAlgorithm(shared_ptr<AnalysisAlgorithm> alg) {
00079     boost::recursive_mutex::scoped_lock lock(m_access);
00080     list<shared_ptr<AnalysisAlgorithm> >::iterator it =
00081         find(m_algorithms.begin(), m_algorithms.end(), alg);
00082 
00083     if ( it != m_algorithms.end() )
00084         m_algorithms.erase(it);
00085 }
00086 
00087 shared_ptr<const TestData>AnalysisWorkerGroup::TestAlgs::getTest() const{
00088   if (!m_testdata.get()) throw Sct::InvariantViolatedError("Attempt to retrieve null TestData - object is not properly constructed",__FILE__,__LINE__);
00089   return m_testdata;
00090 }
00091 
00092 void AnalysisWorkerGroup::TestAlgs::replaceTest(shared_ptr<const TestData> testdata){
00093   boost::recursive_mutex::scoped_lock test_lock(m_access);
00094   m_testdata=testdata;
00095   // also need to replace the TestData in the Algorithms!
00096   for (std::list<shared_ptr<AnalysisAlgorithm> >::iterator it = m_algorithms.begin();
00097        it != m_algorithms.end();
00098        ++it){
00099     // lock the algorithm
00100     boost::recursive_mutex::scoped_lock alg_lock((*it)->getMutex());
00101     (*it)->setTestData(testdata);
00102   }
00103 }
00104 
00105 boost::recursive_mutex& AnalysisWorkerGroup::TestAlgs::getMutex() const {
00106   return m_access;
00107 }
00108 
00109 ostream& AnalysisWorkerGroup::TestAlgs::printStatus(ostream& os) const throw() {
00110     boost::recursive_mutex::scoped_lock lock(m_access);
00111     os << setw(26) << getTest()->testName 
00112        << setw(8) << getTest()->runNumber
00113        << setw(8) << getTest()->startScanNumber
00114        << setw(8) << getTest()->nScans 
00115        << setw(26) << getTest()->analysisAlgorithm
00116        << "\n\t options = [" << getTest()->options << "]"
00117        << endl;
00118 
00119     return os;
00120 }
00121 
00122 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::findAlgorithm(const string& modulename) {
00123     boost::recursive_mutex::scoped_lock lock(m_access);
00124     shared_ptr<AnalysisAlgorithm> alg;
00125     for (list<shared_ptr<AnalysisAlgorithm> >::const_iterator it=m_algorithms.begin();
00126             it != m_algorithms.end();
00127             ++it) {
00128         if ( (*it)->getModuleName()==modulename ) {
00129             alg=(*it);
00130             break;
00131         }
00132     }
00133     if (!alg) {
00134         alg=addAlgorithm(modulename);
00135     }
00136     return alg;
00137 }
00138 
00139 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::addAlgorithm(const string& modulename) {
00140     boost::recursive_mutex::scoped_lock lock(m_access);
00141     shared_ptr<AnalysisAlgorithm> alg = AnalysisAlgorithmMap::instance().getAlgorithm(getTest(), modulename);
00142     m_algorithms.push_back(alg);
00143     return alg;
00144 }
00145 
00146 //AnalysisWorkerGroup methods
00147 AnalysisWorkerGroup::AnalysisWorkerGroup() : m_debug(false){
00148 }
00149 
00150 AnalysisWorkerGroup::~AnalysisWorkerGroup() {
00151 }
00152 
00153 bool AnalysisWorkerGroup::debug() {
00154   return m_debug;
00155 }
00156 
00157 ostream&  AnalysisWorkerGroup::printStatus(ostream& os) const throw() {
00158     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00159     os << "\t === Analysis Service ===\n" << endl;
00160     os << "AnalysisWorkerGroup tests:" << endl;
00161     unsigned ntest=0;
00162     os << setw(26) << "TestName" 
00163        << setw(8) << "Run"
00164        << setw(8) << "Scan"
00165        << setw(8) << "NScans" 
00166        << setw(26) << "AnalysisAlg"
00167        <<  endl;
00168     os << "----------------------------------------------------------------"<<endl;
00169     for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00170             it != m_tests.end() ; ++it ) {
00171         (*it)->printStatus(os);
00172         ++ntest;
00173     }
00174     os << "----------------------------------------------------------------"<<endl;
00175     os << endl;
00176     os << "\tWorkers=" << nWorkers() << "\t  (Busy=" << busy() << ")"
00177        << "\tQueue =" << queueSize();
00178     if (isFifo()) {
00179       os << " [FIFO]\n";
00180     }else{
00181       os << " [FILO]\n";
00182     }
00183     os <<"Total number of tests = " << ntest << endl;
00184     return os;
00185 }
00186 
00187 void AnalysisWorkerGroup::addTest(shared_ptr<const TestData> testdata) {
00188     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00189     boost::shared_ptr<TestAlgs> talgs;
00190     talgs=findTest(*testdata);
00191     if (talgs) {
00192       std::cout << "WARNING  << adding rather than replacing test" << std::endl;
00193       replaceTest(testdata);
00194     }else{
00195       m_tests.push_back(shared_ptr<TestAlgs>(new TestAlgs(testdata) ) );
00196     }
00197 }
00198 
00199 void AnalysisWorkerGroup::removeTestsUpTo(shared_ptr<const TestData> testdata) {
00200     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00201     list<shared_ptr<TestAlgs> >::iterator it = m_tests.begin();
00202 
00203     while( it != m_tests.end() ) { // no ++it here since we need to delete it in the function!
00204       // lock the TestAlgs
00205       boost::recursive_mutex::scoped_lock lock((*it)->getMutex());
00206         if ((*it)->getTest()->runNumber==testdata->runNumber
00207                 && (*it)->getTest()->startScanNumber<=testdata->startScanNumber) {
00208             
00209       std::cout << "Deleting old test for run "
00210            << (*it)->getTest()->runNumber << ", start scan "
00211            << (*it)->getTest()->startScanNumber
00212            << endl;
00213             it=m_tests.erase(it);   // erase returns ++it
00214         } else {
00215             ++it;                   // increment ourselves
00216         }
00217     }
00218 }
00219 
00220 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const TestData& testdata) const throw() {
00221     return findTest(testdata.runNumber, testdata.startScanNumber);
00222 }
00223 
00224 void AnalysisWorkerGroup::replaceTest(shared_ptr<const TestData> testdata){
00225   boost::recursive_mutex::scoped_lock lock(m_tests_access);
00226   if (!testdata) throw Sct::IllegalStateError("Couldnt replace non-existant test",__FILE__,__LINE__);
00227   shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs = findTest(*testdata);
00228   if (!talgs) {
00229     std::cout << "WARNING - adding rather than replacing test!" << std::endl;
00230     addTest(testdata);
00231     talgs = findTest(*testdata);
00232   }
00233   if (!talgs) throw Sct::IllegalStateError("Couldn't find or add test",__FILE__,__LINE__);
00234   talgs->replaceTest(testdata);
00235 }
00236 
00237 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const unsigned long runno, const unsigned long scanno) const throw() {
00238     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00239     shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs;
00240     for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00241             it != m_tests.end();
00242             ++it ) {
00243         boost::recursive_mutex::scoped_lock lock((*it)->getMutex());
00244         shared_ptr<const TestData> td=(*it)->getTest();
00245     
00246         if ( runno == td->runNumber ){
00247       // if full mode, there are three results per scan
00248 #warning "Second number below should be 3!"
00249       int results_per_scan = (td->options.find("full")==std::string::npos) ? 1 : 3;
00250       if (scanno >= td->startScanNumber &&
00251           scanno < td->startScanNumber + td->nScans * results_per_scan) talgs=*it;
00252     }
00253     }
00254     return talgs;
00255 }
00256 
00257 
00258 void AnalysisWorkerGroup::purge() throw() {
00259     //Stop the workers from trying to get on with things
00260     this->setPaused(true);
00261     while (paused() != nWorkers()) sleep(1);
00262     
00263     // Clear the queue.
00264     shared_ptr<IOName> s;
00265     do {s=pop(); } while(s);
00266     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00267 
00268     // Clear the test data.
00269     std::cout << "Clearing tests ... " << endl;
00270     m_tests.clear();
00271     std::cout << " ... done clearing tests" << endl;
00272     this->setPaused(false);    
00273 }
00274 
00275 void AnalysisWorkerGroup::work(shared_ptr<IOName> name) throw() {
00276     using namespace SctData;
00277 
00278     try {
00279     timer t;
00280     UniqueID id (name->getUniqueID());
00281         const unsigned runno = id.getRunNumber();
00282         const unsigned scanno = id.getScanNumber();
00283         const string modulename= id.getModule();
00284 
00285     if (debug()) std::cout << "run="<< runno << ", scan="<< scanno << ", modulename="<<modulename << std::endl;
00286 
00287         shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs = findTest(runno, scanno);
00288 
00289     if (debug()) std::cout << "Found test" << endl;   
00290     if (!talgs) {
00291       std::ostringstream oss; oss << "AnalysisWorker::work. No test found for run " 
00292                       << runno << " scan" << scanno;
00293       throw IllegalStateError(oss.str(), __FILE__, __LINE__);
00294     }
00295     // Get a testresult. Add a new testresult if one dosen't exist for this module
00296     
00297     shared_ptr<AnalysisAlgorithm> algorithm;
00298     
00299     boost::recursive_mutex::scoped_lock talgs_lock(talgs->getMutex());
00300     algorithm =  talgs->findAlgorithm(modulename);
00301 
00302     if (!algorithm.get()) return;
00303     
00304         if (debug()) std::cout << "algorithm.get()=" << algorithm.get() << endl;
00305 
00306         bool isfit=(name->getClassName()=="SctData::FitScanResult");
00307 
00308         // lock the approprate result and do your stuff, analysis-algorithm.
00309         boost::recursive_mutex::scoped_lock algorithm_lock (algorithm->getMutex());
00310         if (isfit) {
00311       if (debug()) std::cout << "got fit from queue" << endl;
00312       boost::recursive_mutex::scoped_lock lock(m_tests_access);
00313       algorithm->addFitScanResult(name);
00314         } else {
00315       if (debug()) std::cout << "got raw from queue" << endl;
00316       boost::recursive_mutex::scoped_lock lock(m_tests_access);
00317       algorithm->addRawScanResult(name);
00318         }
00319     if (debug()) std::cout << "checking if I can analyse algorithm" << endl;
00320     if (algorithm->canAnalyze()) {
00321         if (algorithm->isDone()) throw IllegalStateError("AnalysisWorker::work.  Algorithm has additional, unnecessary Scan - analysis already done", __FILE__, __LINE__);
00322         if (debug()) std::cout << "checking existance" << std::endl;
00323         algorithm->checkTestResultExists();
00324         algorithm->addOverheadTime(t.elapsed());
00325         
00326         t.restart();
00327         if (debug()) std::cout << "loading data" << std::endl;
00328         algorithm->loadData();
00329         algorithm->addIOTime(t.elapsed());
00330         
00331         t.restart();
00332         if (debug()) std::cout << "analyzing" << std::endl;
00333         algorithm->analyze();
00334         algorithm->addAnalysisTime(t.elapsed());
00335         
00336         t.restart();
00337         if (debug()) std::cout << "finishing" << std::endl;
00338         algorithm->finish();
00339         algorithm->addIOTime(t.elapsed());
00340         if (debug()) std::cout << "removing algorithm" << std::endl;
00341         talgs->removeAlgorithm(algorithm);
00342     } else {
00343         algorithm->addOverheadTime(t.elapsed());
00344     }
00345     
00346     } catch (InvalidArgumentError& e) {
00347     e.sendToMrs();
00348     } catch(Throwable& e) {
00349         e.sendToMrs(MRS_ERROR);
00350     } catch(std::exception& e) {
00351     StdExceptionWrapper sew(e);
00352         sew.sendToMrs(MRS_ERROR);        
00353     } catch(...) {
00354     Error e("uncaught unknown exception", __FILE__, __LINE__);
00355     e.sendToMrs(MRS_ERROR);
00356     }
00357 }
00358 
00359 
00360 }// end of namespace SctAnalysis

Generated on Mon Feb 6 14:01:16 2006 for SCT DAQ/DCS Software - C++ by  doxygen 1.4.6