00001 #include "AnalysisWorkerGroup.h"
00002 #include "AnalysisAlgorithm.h"
00003 #include "AnalysisAlgorithmMap.h"
00004 #include "Sct/IS/IOManagerIS.h"
00005 #include "Sct/ISProxy/IOManagerISProxy.h"
00006 #include "Sct/IS/IONameIS.h"
00007 #include "Sct/StdExceptionWrapper.h"
00008 #include "SctData/UniqueID.h"
00009 #include "SctData/TestResult.h"
00010 #include "CalibrationController/IS/TestData.h"
00011 #include <boost/timer.hpp>
00012
00013 using namespace SctService;
00014 using namespace boost;
00015 using namespace std;
00016 using namespace Sct;
00017 using namespace Sct::IS;
00018 using namespace SctData;
00019
00020 namespace SctAnalysis {
00021
00022
00023
00024
00025 AnalysisWorkerGroup::TestAlgs::~TestAlgs() {
00026 boost::recursive_mutex::scoped_lock lock(m_access);
00027
00028
00029
00030
00031 m_algorithms.clear();
00032 }
00033
00034 void AnalysisWorkerGroup::TestAlgs::removeAlgorithm(shared_ptr<AnalysisAlgorithm> alg) {
00035 boost::recursive_mutex::scoped_lock lock(m_access);
00036 list<shared_ptr<AnalysisAlgorithm> >::iterator it =
00037 find(m_algorithms.begin(), m_algorithms.end(), alg);
00038
00039 if ( it != m_algorithms.end() )
00040 m_algorithms.erase(it);
00041 }
00042
00043 ostream& AnalysisWorkerGroup::TestAlgs::printStatus(ostream& os) const throw() {
00044 boost::recursive_mutex::scoped_lock lock(m_access);
00045 os << " Test name: " << getTest().testName
00046 << ", run number = " << getTest().runNumber
00047 << ", first scan number = " << getTest().startScanNumber
00048 << ", nscans = " << getTest().nScans
00049 << endl;
00050
00051 return os;
00052 }
00053
00054 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::findAlgorithm(const string& modulename) {
00055 boost::recursive_mutex::scoped_lock lock(m_access);
00056 shared_ptr<AnalysisAlgorithm> alg;
00057 for (list<shared_ptr<AnalysisAlgorithm> >::const_iterator it=m_algorithms.begin();
00058 it != m_algorithms.end();
00059 ++it) {
00060 if ( (*it)->getModuleName()==modulename ) {
00061 alg=(*it);
00062 break;
00063 }
00064 }
00065 if (!alg) {
00066 alg=addAlgorithm(modulename);
00067 }
00068 return alg;
00069 }
00070
00071 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::addAlgorithm(const string& modulename) {
00072 boost::recursive_mutex::scoped_lock lock(m_access);
00073 shared_ptr<AnalysisAlgorithm> alg = AnalysisAlgorithmMap::instance().getAlgorithm(getTest(), modulename);
00074 m_algorithms.push_back(alg);
00075 return alg;
00076 }
00077
00078
00079
00080
00081
00082
00083 ostream& AnalysisWorkerGroup::printStatus(ostream& os) const throw() {
00084 cout << "Getting tests lock" << endl;
00085 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00086 cout << "got tests lock" << endl;
00087 os << "===============================================\n" << endl;
00088 os << "AnalysisWorkerGroup tests:" << endl;
00089 unsigned ntest=0;
00090 for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00091 it != m_tests.end() ; ++it ) {
00092 (*it)->printStatus(os);
00093 os << "-------------------------------------------------"<<endl;
00094 ++ntest;
00095 }
00096 os << endl << "Total number of tests = " << ntest << endl;
00097 cout << "done ostreaming" << endl;
00098 return os;
00099 }
00100
00101 void AnalysisWorkerGroup::addTest(shared_ptr<const TestData> testdata) {
00102 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00103 m_tests.push_back(shared_ptr<TestAlgs>(new TestAlgs(testdata) ) );
00104 }
00105
00106 void AnalysisWorkerGroup::removeTestsUpTo(shared_ptr<const TestData> testdata) {
00107 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00108 list<shared_ptr<TestAlgs> >::iterator it = m_tests.begin();
00109
00110 while( it != m_tests.end() ) {
00111 if ((*it)->getTest().runNumber==testdata->runNumber
00112 && (*it)->getTest().startScanNumber<=testdata->startScanNumber) {
00113
00114 cout << "Deleting old test for run "
00115 << (*it)->getTest().runNumber << ", start scan "
00116 << (*it)->getTest().startScanNumber
00117 << endl;
00118 it=m_tests.erase(it);
00119 } else {
00120 ++it;
00121 }
00122 }
00123 }
00124
00125 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const TestData& testdata) const throw() {
00126 return findTest(testdata.runNumber, testdata.startScanNumber);
00127 }
00128
00129 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const unsigned long runno, const unsigned long scanno) const throw() {
00130 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00131 shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs;
00132 for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00133 it != m_tests.end();
00134 ++it ) {
00135 const TestData& td=(*it)->getTest();
00136 if ( runno == td.runNumber &&
00137 scanno >= td.startScanNumber &&
00138 scanno < td.startScanNumber + td.nScans) {
00139 talgs=*it;
00140
00141 }
00142 }
00143 return talgs;
00144 }
00145
00146
00147
00148
00149 void AnalysisWorkerGroup::purge() throw() {
00150
00151 this->setPaused(true);
00152 while (paused() != nWorkers()) sleep(1);
00153
00154
00155 shared_ptr<IOName> s;
00156 do {s=pop(); } while(s);
00157 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00158
00159
00160 cout << "Clearing tests ... " << endl;
00161 m_tests.clear();
00162 cout << " ... done clearing tests" << endl;
00163 this->setPaused(false);
00164 }
00165
00166 void AnalysisWorkerGroup::work(shared_ptr<IOName> name) throw() {
00167 using namespace SctData;
00168 timer t;
00169 try {
00170
00171 UniqueID id (name->getUniqueID());
00172 const unsigned runno = id.getRunNumber();
00173 const unsigned scanno = id.getScanNumber();
00174 const string modulename= id.getModule();
00175
00176
00177
00178
00179 shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs = findTest(runno, scanno);
00180
00181
00182
00183 if (!talgs) {
00184 std::ostringstream oss; oss << "AnalysisWorker::work. No test found for run "
00185 << runno << " scan" << scanno;
00186 throw IllegalStateError(oss.str(), __FILE__, __LINE__);
00187 }
00188
00189 shared_ptr<AnalysisAlgorithm> algorithm = talgs->findAlgorithm(modulename);
00190
00191
00192
00193 bool isfit=(name->getClassName()=="SctData::FitScanResult");
00194
00195
00196 boost::recursive_mutex::scoped_lock lock (algorithm->getMutex());
00197 if (isfit) {
00198
00199 algorithm->addFitScanResult(name);
00200 } else {
00201
00202 algorithm->addRawScanResult(name);
00203 }
00204 if (algorithm->canAnalyze()) {
00205 if (algorithm->isDone()) throw IllegalStateError("AnalysisWorker::work. Algorithm has additional, unnecessary Scan - analysis already done", __FILE__, __LINE__);
00206 algorithm->checkTestResultExists();
00207 algorithm->addOverheadTime(t.elapsed());
00208
00209 t.restart();
00210 algorithm->loadData();
00211 algorithm->addIOTime(t.elapsed());
00212
00213 t.restart();
00214 algorithm->analyze();
00215 algorithm->addAnalysisTime(t.elapsed());
00216
00217 t.restart();
00218 algorithm->finish();
00219 algorithm->addIOTime(t.elapsed());
00220 talgs->removeAlgorithm(algorithm);
00221 } else {
00222 algorithm->addOverheadTime(t.elapsed());
00223 }
00224
00225 } catch (InvalidArgumentError& e) {
00226 e.sendToMrs();
00227 } catch(Throwable& e) {
00228 e.sendToMrs(MRS_ERROR);
00229 } catch(std::exception& e) {
00230 StdExceptionWrapper sew(e);
00231 sew.sendToMrs(MRS_ERROR);
00232 } catch(...) {
00233 Error e("uncaught unknown exception", __FILE__, __LINE__);
00234 e.sendToMrs(MRS_ERROR);
00235 }
00236
00237 }
00238
00239
00240 }