00001 #include "AnalysisWorkerGroup.h"
00002 #include "AnalysisAlgorithm.h"
00003 #include "AnalysisAlgorithmMap.h"
00004 #include "Sct/IS/IOManagerIS.h"
00005 #include "Sct/ISProxy/IOManagerISProxy.h"
00006 #include "Sct/IS/IONameIS.h"
00007 #include "Sct/StdExceptionWrapper.h"
00008
00009 using namespace SctService;
00010 using namespace boost;
00011
00012 namespace SctAnalysis {
00013
00014
00015
00016
00017 AnalysisWorkerGroup::TestAlgs::~TestAlgs() {
00018 boost::recursive_mutex::scoped_lock lock(m_access);
00019
00020
00021
00022
00023 m_algorithms.clear();
00024 }
00025
00026 void AnalysisWorkerGroup::TestAlgs::removeAlgorithm(shared_ptr<AnalysisAlgorithm> alg) {
00027 boost::recursive_mutex::scoped_lock lock(m_access);
00028 list<shared_ptr<AnalysisAlgorithm> >::iterator it =
00029 find(m_algorithms.begin(), m_algorithms.end(), alg);
00030
00031 if ( it != m_algorithms.end() )
00032 m_algorithms.erase(it);
00033 }
00034
00035 ostream& AnalysisWorkerGroup::TestAlgs::printStatus(ostream& os) const throw() {
00036 boost::recursive_mutex::scoped_lock lock(m_access);
00037 os << " Test name: " << getTest().testName
00038 << ", run number = " << getTest().runNumber
00039 << ", first scan number = " << getTest().startScanNumber
00040 << ", nscans = " << getTest().nScans
00041 << endl;
00042
00043 for ( list<shared_ptr<AnalysisAlgorithm> >::const_iterator it = m_algorithms.begin();
00044 it != m_algorithms.end(); ++it ) {
00045 os << " " << (*it)->getModuleName()
00046 << " # of scans: " << (*it)->getTestResult()->getNScans() << " of " << getTest().nScans << endl;
00047 }
00048 return os;
00049 }
00050
00051 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::findAlgorithm(const string& modulename) {
00052 boost::recursive_mutex::scoped_lock lock(m_access);
00053 shared_ptr<AnalysisAlgorithm> alg;
00054 for (list<shared_ptr<AnalysisAlgorithm> >::const_iterator it=m_algorithms.begin();
00055 it != m_algorithms.end();
00056 ++it) {
00057 if ( (*it)->getModuleName()==modulename ) {
00058 alg=(*it);
00059 break;
00060 }
00061 }
00062 if (!alg) {
00063 alg=addAlgorithm(modulename);
00064 }
00065 return alg;
00066 }
00067
00068 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::addAlgorithm(const string& modulename) {
00069 boost::recursive_mutex::scoped_lock lock(m_access);
00070 shared_ptr<AnalysisAlgorithm> alg = AnalysisAlgorithmMap::instance().getAlgorithm(getTest(), modulename);
00071 m_algorithms.push_back(alg);
00072 return alg;
00073 }
00074
00075
00076
00077
00078
00079
00080 ostream& AnalysisWorkerGroup::printStatus(ostream& os) const throw() {
00081 cout << "Getting tests lock" << endl;
00082 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00083 cout << "got tests lock" << endl;
00084 os << "===============================================\n" << endl;
00085 os << "AnalysisWorkerGroup tests:" << endl;
00086 unsigned ntest=0;
00087 for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00088 it != m_tests.end() ; ++it ) {
00089 (*it)->printStatus(os);
00090 os << "-------------------------------------------------"<<endl;
00091 ++ntest;
00092 }
00093 os << endl << "Total number of tests = " << ntest << endl;
00094 cout << "done ostreaming" << endl;
00095 return os;
00096 }
00097
00098 void AnalysisWorkerGroup::addTest(shared_ptr<const TestData> testdata) {
00099 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00100 m_tests.push_back(shared_ptr<TestAlgs>(new TestAlgs(testdata) ) );
00101 }
00102
00103 void AnalysisWorkerGroup::removeTestsUpTo(shared_ptr<const TestData> testdata) {
00104 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00105 list<shared_ptr<TestAlgs> >::iterator it = m_tests.begin();
00106
00107 while( it != m_tests.end() ) {
00108 if ((*it)->getTest().runNumber==testdata->runNumber
00109 && (*it)->getTest().startScanNumber<=testdata->startScanNumber) {
00110
00111 cout << "Deleting old test for run "
00112 << (*it)->getTest().runNumber << ", start scan "
00113 << (*it)->getTest().startScanNumber
00114 << endl;
00115 it=m_tests.erase(it);
00116 } else {
00117 ++it;
00118 }
00119 }
00120 }
00121
00122 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const TestData& testdata) const throw() {
00123 return findTest(testdata.runNumber, testdata.startScanNumber);
00124 }
00125
00126 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const unsigned long runno, const unsigned long scanno) const throw() {
00127 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00128 shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs;
00129 for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00130 it != m_tests.end();
00131 ++it ) {
00132 const TestData& td=(*it)->getTest();
00133 if ( runno == td.runNumber &&
00134 scanno >= td.startScanNumber &&
00135 scanno < td.startScanNumber + td.nScans) {
00136 talgs=*it;
00137
00138 }
00139 }
00140 return talgs;
00141 }
00142
00143
00144
00145
00146 void AnalysisWorkerGroup::purge() throw() {
00147
00148 this->setPaused(true);
00149 while (paused() != nWorkers()) sleep(1);
00150
00151
00152 std::string s;
00153 do {s=pop(); } while( s.length() != 0 );
00154 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00155
00156
00157 cout << "Clearing tests ... " << endl;
00158 m_tests.clear();
00159 cout << " ... done clearing tests" << endl;
00160 this->setPaused(false);
00161 }
00162
00163 void AnalysisWorkerGroup::work(std::string name) throw() {
00164 using namespace SctData;
00165 try {
00166
00167
00168 Sct::IS::IONameIS isname(name);
00169 const unsigned runno = isname.getRunNumber();
00170 const unsigned scanno = isname.getScanNumber();
00171 const string modulename= isname.getModuleName();
00172
00173
00174
00175
00176 shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs = findTest(runno, scanno);
00177
00178
00179
00180 if (!talgs)
00181 throw InvalidArgumentError("AnalysisWorker::work. No test found for scan", __FILE__, __LINE__);
00182
00183
00184 shared_ptr<AnalysisAlgorithm> algorithm = talgs->findAlgorithm( modulename );
00185
00186
00187
00188
00189 bool isfit=(isname.getClassName()==string("SctData::FitScanResult"));
00190
00191
00192 boost::recursive_mutex::scoped_lock lock (algorithm->getMutex());
00193 if (isfit) {
00194
00195 algorithm->canAddFitScanResult(name);
00196 } else {
00197
00198 algorithm->canAddRawScanResult(name);
00199 }
00200 if ( algorithm->isDone() ) {
00201
00202 talgs->removeAlgorithm(algorithm);
00203 }
00204 } catch (InvalidArgumentError& e) {
00205 e.sendToMrs();
00206 } catch(Throwable& e) {
00207 e.sendToMrs(MRS_ERROR);
00208 } catch(std::exception& e) {
00209 StdExceptionWrapper sew(e);
00210 sew.sendToMrs(MRS_ERROR);
00211 } catch(...) {
00212 Error e("uncaught unknown exception", __FILE__, __LINE__);
00213 e.sendToMrs(MRS_ERROR);
00214 }
00215
00216 }
00217
00218
00219 }