Main Page   Modules   Namespace List   Class Hierarchy   Data Structures   File List   Namespace Members   Data Fields   Globals   Related Pages  

AnalysisWorkerGroup.cpp

Go to the documentation of this file.
00001 #include "AnalysisWorkerGroup.h"
00002 #include "AnalysisAlgorithm.h"
00003 #include "AnalysisAlgorithmMap.h"
00004 #include "Sct/IS/IOManagerIS.h"
00005 #include "Sct/ISProxy/IOManagerISProxy.h"
00006 #include "Sct/IS/IONameIS.h"
00007 #include "Sct/StdExceptionWrapper.h"
00008 
00009 using namespace SctService;
00010 using namespace boost;
00011 
00012 namespace SctAnalysis {
00013 
00014 //TestAlgs methods
00015 
00016 
00017 AnalysisWorkerGroup::TestAlgs::~TestAlgs() {
00018     boost::recursive_mutex::scoped_lock lock(m_access);
00019 
00020     //We need to explicitly delete the list here so the lock is in force
00021     //Note that this just removes the pointer..actual deletion will only occur
00022     //if noone else has grabbed a reference to the object
00023     m_algorithms.clear();
00024 }
00025 
00026 void AnalysisWorkerGroup::TestAlgs::removeAlgorithm(shared_ptr<AnalysisAlgorithm> alg) {
00027     boost::recursive_mutex::scoped_lock lock(m_access);
00028     list<shared_ptr<AnalysisAlgorithm> >::iterator it =
00029         find(m_algorithms.begin(), m_algorithms.end(), alg);
00030 
00031     if ( it != m_algorithms.end() )
00032         m_algorithms.erase(it);
00033 }
00034 
00035 ostream& AnalysisWorkerGroup::TestAlgs::printStatus(ostream& os) const throw() {
00036     boost::recursive_mutex::scoped_lock lock(m_access);
00037     os << "   Test name: "  << getTest().testName 
00038        << ", run number = " << getTest().runNumber
00039        << ", first scan number = " << getTest().startScanNumber
00040        << ", nscans = " << getTest().nScans 
00041        <<  endl;
00042 
00043     for ( list<shared_ptr<AnalysisAlgorithm> >::const_iterator it = m_algorithms.begin();
00044             it != m_algorithms.end(); ++it ) {
00045         os << "      " << (*it)->getModuleName()
00046         << "  # of scans: " << (*it)->getTestResult()->getNScans() << " of " << getTest().nScans << endl;
00047     }
00048     return os;
00049 }
00050 
00051 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::findAlgorithm(const string& modulename) {
00052     boost::recursive_mutex::scoped_lock lock(m_access);
00053     shared_ptr<AnalysisAlgorithm> alg;
00054     for (list<shared_ptr<AnalysisAlgorithm> >::const_iterator it=m_algorithms.begin();
00055             it != m_algorithms.end();
00056             ++it) {
00057         if ( (*it)->getModuleName()==modulename ) {
00058             alg=(*it);
00059             break;
00060         }
00061     }
00062     if (!alg) {
00063         alg=addAlgorithm(modulename);
00064     }
00065     return alg;
00066 }
00067 
00068 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::addAlgorithm(const string& modulename) {
00069     boost::recursive_mutex::scoped_lock lock(m_access);
00070     shared_ptr<AnalysisAlgorithm> alg = AnalysisAlgorithmMap::instance().getAlgorithm(getTest(), modulename);
00071     m_algorithms.push_back(alg);
00072     return alg;
00073 }
00074 
00075 
00076 
00077 //AnalysisWorkerGroup methods
00078 
00079 
00080 ostream&  AnalysisWorkerGroup::printStatus(ostream& os) const throw() {
00081     cout << "Getting tests lock" << endl;
00082     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00083     cout << "got tests lock" << endl;
00084     os << "===============================================\n" << endl;
00085     os << "AnalysisWorkerGroup tests:" << endl;
00086     unsigned ntest=0;
00087     for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00088             it != m_tests.end() ; ++it ) {
00089         (*it)->printStatus(os);
00090         os << "-------------------------------------------------"<<endl;
00091         ++ntest;
00092     }
00093     os << endl << "Total number of tests = " << ntest << endl;
00094     cout << "done ostreaming" << endl;
00095     return os;
00096 }
00097 
00098 void AnalysisWorkerGroup::addTest(shared_ptr<const TestData> testdata) {
00099     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00100     m_tests.push_back(shared_ptr<TestAlgs>(new TestAlgs(testdata) ) );
00101 }
00102 
00103 void AnalysisWorkerGroup::removeTestsUpTo(shared_ptr<const TestData> testdata) {
00104     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00105     list<shared_ptr<TestAlgs> >::iterator it = m_tests.begin();
00106 
00107     while( it != m_tests.end() ) { // no ++it here since we need to delete it in the function!
00108         if ((*it)->getTest().runNumber==testdata->runNumber
00109                 && (*it)->getTest().startScanNumber<=testdata->startScanNumber) {
00110             
00111         cout << "Deleting old test for run "
00112          << (*it)->getTest().runNumber << ", start scan "
00113                  << (*it)->getTest().startScanNumber
00114                  << endl;
00115             it=m_tests.erase(it);   // erase returns ++it
00116         } else {
00117             ++it;                   // increment ourselves
00118         }
00119     }
00120 }
00121 
00122 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const TestData& testdata) const throw() {
00123     return findTest(testdata.runNumber, testdata.startScanNumber);
00124 }
00125 
00126 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const unsigned long runno, const unsigned long scanno) const throw() {
00127     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00128     shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs;
00129     for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00130             it != m_tests.end();
00131             ++it ) {
00132         const TestData& td=(*it)->getTest();
00133         if ( runno == td.runNumber &&
00134                 scanno >= td.startScanNumber &&
00135                 scanno < td.startScanNumber + td.nScans) {
00136             talgs=*it;
00137             //      cout << "found test for run " << runno<< ", scan " << scanno << endl;
00138         }
00139     }
00140     return talgs;
00141 }
00142 
00143 
00144 
00145 
00146 void AnalysisWorkerGroup::purge() throw() {
00147     //Stop the workers from trying to get on with things
00148     this->setPaused(true);
00149     while (paused() != nWorkers()) sleep(1);
00150     
00151     // Clear the queue.
00152     std::string s;
00153     do {s=pop(); } while( s.length() != 0 );
00154     boost::recursive_mutex::scoped_lock lock(m_tests_access);
00155 
00156     // Clear the test data.
00157     cout << "Clearing tests ... " << endl;
00158     m_tests.clear();
00159     cout << " ... done clearing tests" << endl;
00160     this->setPaused(false);    
00161 }
00162 
00163 void AnalysisWorkerGroup::work(std::string name) throw() {
00164     using namespace SctData;
00165     try {
00166         //      cout << "AnalysisWorker: name = "<< *name << " Q="<<wg->queueSize()<<", busy="<< wg->busy() << endl;
00167 
00168         Sct::IS::IONameIS isname(name);
00169         const unsigned runno = isname.getRunNumber();
00170         const unsigned scanno = isname.getScanNumber();
00171         const string modulename= isname.getModuleName();
00172 
00173         //      cout << "run="<< runno << ", scan="<< scanno << ", modulename="<<modulename
00174         //       << ", classname="<<isname.getClassName()<<endl;
00175 
00176         shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs = findTest(runno, scanno);
00177 
00178         //      cout << "Found test" << endl;
00179 
00180         if (!talgs)
00181             throw InvalidArgumentError("AnalysisWorker::work. No test found for scan", __FILE__, __LINE__);
00182 
00183         // Get a testresult. Add a new testresult if one dosen't exist for this module
00184         shared_ptr<AnalysisAlgorithm> algorithm = talgs->findAlgorithm( modulename );
00185 
00186 
00187         //      cout << "algorithm.get()=" << algorithm.get() << endl;
00188 
00189         bool isfit=(isname.getClassName()==string("SctData::FitScanResult"));
00190 
00191         // lock the approprate result and do your stuff, analysis-algorithm.
00192         boost::recursive_mutex::scoped_lock lock (algorithm->getMutex());
00193         if (isfit) {
00194             //      cout << "got fit from queue" << endl;
00195             algorithm->canAddFitScanResult(name);
00196         } else {
00197             //      cout << "got raw from queue" << endl;
00198             algorithm->canAddRawScanResult(name);
00199         }
00200         if ( algorithm->isDone() ) {
00201             //      cout << "Removing algorithm" << endl;
00202         talgs->removeAlgorithm(algorithm);
00203         }
00204     } catch (InvalidArgumentError& e) {
00205     e.sendToMrs();
00206     } catch(Throwable& e) {
00207         e.sendToMrs(MRS_ERROR);
00208     } catch(std::exception& e) {
00209     StdExceptionWrapper sew(e);
00210         sew.sendToMrs(MRS_ERROR);        
00211     } catch(...) {
00212     Error e("uncaught unknown exception", __FILE__, __LINE__);
00213     e.sendToMrs(MRS_ERROR);
00214     }
00215     //  cout << "Work done" << endl;
00216 }
00217 
00218 
00219 }// end of namespace SctAnalysis

Generated on Mon Dec 15 19:35:55 2003 for SCT DAQ/DCS Software by doxygen1.3-rc3