00001 #include "AnalysisWorkerGroup.h"
00002 #include "AnalysisAlgorithm.h"
00003 #include "AnalysisAlgorithmMap.h"
00004 #include "AnalysisService.h"
00005 #include "DcsInterface.h"
00006
00007 #include "Sct/IS/IOManagerIS.h"
00008 #include "Sct/ISProxy/IOManagerISProxy.h"
00009 #include "Sct/IS/IONameIS.h"
00010 #include "Sct/StdExceptionWrapper.h"
00011 #include "Sct/SctNames.h"
00012 #include "Sct/UniqueID.h"
00013 #include "SctData/TestResult.h"
00014 #include "SctData/DcsData.h"
00015
00016 #include "CalibrationController/IS/TestData.h"
00017 #include <boost/timer.hpp>
00018 #include <iomanip>
00019 #include <is/info.h>
00020 #include <is/infodictionary.h>
00021 #include <is/infoiterator.h>
00022 #include <is/inforeceiver.h>
00023 #include <is/infoT.h>
00024
00025 using namespace SctService;
00026 using namespace boost;
00027 using namespace std;
00028 using namespace Sct;
00029 using namespace Sct::IS;
00030 using namespace SctData;
00031
00032 namespace SctAnalysis {
00033
00034
00035 AnalysisWorkerGroup::TestAlgs::TestAlgs(boost::shared_ptr<const TestData> testdata) : m_testdata(testdata){
00036 bool dcsProblemsReported=false;
00037 for (unsigned imodule=0; imodule<m_testdata->modules_size; ++imodule){
00038
00039 try{
00040 const std::string& module = m_testdata->modules[imodule];
00041 boost::shared_ptr<AnalysisAlgorithm> alg = findAlgorithm(module);
00042 if (!alg.get()) continue;
00043 alg->checkTestResultExists();
00044
00045
00046 try{
00047 if (!AnalysisService::instance().getDcsInterface().getConfigurationInterface().empty()) {
00048 shared_ptr<SctData::DcsData> dcs (AnalysisService::instance().getDcsInterface().getData(module) );
00049
00050 alg->getTestResult()->setDcsData(dcs);
00051 }
00052 }catch(Sct::Throwable& e){
00053 if (!dcsProblemsReported) {
00054 e.sendToMrs(MRS_DIAGNOSTIC);
00055 dcsProblemsReported=true;
00056 }
00057 }catch(std::exception& e){
00058 if (!dcsProblemsReported) {
00059 StdExceptionWrapper(e,__FILE__,__LINE__).sendToMrs(MRS_DIAGNOSTIC);
00060 dcsProblemsReported=true;
00061 }
00062 }
00063 }catch(Sct::Throwable& e){
00064 e.sendToMrs();
00065 }
00066 }
00067 }
00068
00069 AnalysisWorkerGroup::TestAlgs::~TestAlgs() {
00070 boost::recursive_mutex::scoped_lock lock(m_access);
00071
00072
00073
00074
00075 m_algorithms.clear();
00076 }
00077
00078 void AnalysisWorkerGroup::TestAlgs::removeAlgorithm(shared_ptr<AnalysisAlgorithm> alg) {
00079 boost::recursive_mutex::scoped_lock lock(m_access);
00080 list<shared_ptr<AnalysisAlgorithm> >::iterator it =
00081 find(m_algorithms.begin(), m_algorithms.end(), alg);
00082
00083 if ( it != m_algorithms.end() )
00084 m_algorithms.erase(it);
00085 }
00086
00087 shared_ptr<const TestData>AnalysisWorkerGroup::TestAlgs::getTest() const{
00088 if (!m_testdata.get()) throw Sct::InvariantViolatedError("Attempt to retrieve null TestData - object is not properly constructed",__FILE__,__LINE__);
00089 return m_testdata;
00090 }
00091
00092 void AnalysisWorkerGroup::TestAlgs::replaceTest(shared_ptr<const TestData> testdata){
00093 boost::recursive_mutex::scoped_lock test_lock(m_access);
00094 m_testdata=testdata;
00095
00096 for (std::list<shared_ptr<AnalysisAlgorithm> >::iterator it = m_algorithms.begin();
00097 it != m_algorithms.end();
00098 ++it){
00099
00100 boost::recursive_mutex::scoped_lock alg_lock((*it)->getMutex());
00101 (*it)->setTestData(testdata);
00102 }
00103 }
00104
00105 boost::recursive_mutex& AnalysisWorkerGroup::TestAlgs::getMutex() const {
00106 return m_access;
00107 }
00108
00109 ostream& AnalysisWorkerGroup::TestAlgs::printStatus(ostream& os) const throw() {
00110 boost::recursive_mutex::scoped_lock lock(m_access);
00111 os << setw(26) << getTest()->testName
00112 << setw(8) << getTest()->runNumber
00113 << setw(8) << getTest()->startScanNumber
00114 << setw(8) << getTest()->nScans
00115 << setw(26) << getTest()->analysisAlgorithm
00116 << "\n\t options = [" << getTest()->options << "]"
00117 << endl;
00118
00119 return os;
00120 }
00121
00122 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::findAlgorithm(const string& modulename) {
00123 boost::recursive_mutex::scoped_lock lock(m_access);
00124 shared_ptr<AnalysisAlgorithm> alg;
00125 for (list<shared_ptr<AnalysisAlgorithm> >::const_iterator it=m_algorithms.begin();
00126 it != m_algorithms.end();
00127 ++it) {
00128 if ( (*it)->getModuleName()==modulename ) {
00129 alg=(*it);
00130 break;
00131 }
00132 }
00133 if (!alg) {
00134 alg=addAlgorithm(modulename);
00135 }
00136 return alg;
00137 }
00138
00139 shared_ptr<AnalysisAlgorithm> AnalysisWorkerGroup::TestAlgs::addAlgorithm(const string& modulename) {
00140 boost::recursive_mutex::scoped_lock lock(m_access);
00141 shared_ptr<AnalysisAlgorithm> alg = AnalysisAlgorithmMap::instance().getAlgorithm(getTest(), modulename);
00142 m_algorithms.push_back(alg);
00143 return alg;
00144 }
00145
00146
00147 AnalysisWorkerGroup::AnalysisWorkerGroup() : m_debug(false){
00148 }
00149
00150 AnalysisWorkerGroup::~AnalysisWorkerGroup() {
00151 }
00152
00153 bool AnalysisWorkerGroup::debug() {
00154 return m_debug;
00155 }
00156
00157 ostream& AnalysisWorkerGroup::printStatus(ostream& os) const throw() {
00158 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00159 os << "\t === Analysis Service ===\n" << endl;
00160 os << "AnalysisWorkerGroup tests:" << endl;
00161 unsigned ntest=0;
00162 os << setw(26) << "TestName"
00163 << setw(8) << "Run"
00164 << setw(8) << "Scan"
00165 << setw(8) << "NScans"
00166 << setw(26) << "AnalysisAlg"
00167 << endl;
00168 os << "----------------------------------------------------------------"<<endl;
00169 for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00170 it != m_tests.end() ; ++it ) {
00171 (*it)->printStatus(os);
00172 ++ntest;
00173 }
00174 os << "----------------------------------------------------------------"<<endl;
00175 os << endl;
00176 os << "\tWorkers=" << nWorkers() << "\t (Busy=" << busy() << ")"
00177 << "\tQueue =" << queueSize();
00178 if (isFifo()) {
00179 os << " [FIFO]\n";
00180 }else{
00181 os << " [FILO]\n";
00182 }
00183 os <<"Total number of tests = " << ntest << endl;
00184 return os;
00185 }
00186
00187 void AnalysisWorkerGroup::addTest(shared_ptr<const TestData> testdata) {
00188 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00189 boost::shared_ptr<TestAlgs> talgs;
00190 talgs=findTest(*testdata);
00191 if (talgs) {
00192 std::cout << "WARNING << adding rather than replacing test" << std::endl;
00193 replaceTest(testdata);
00194 }else{
00195 m_tests.push_back(shared_ptr<TestAlgs>(new TestAlgs(testdata) ) );
00196 }
00197 }
00198
00199 void AnalysisWorkerGroup::removeTestsUpTo(shared_ptr<const TestData> testdata) {
00200 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00201 list<shared_ptr<TestAlgs> >::iterator it = m_tests.begin();
00202
00203 while( it != m_tests.end() ) {
00204
00205 boost::recursive_mutex::scoped_lock lock((*it)->getMutex());
00206 if ((*it)->getTest()->runNumber==testdata->runNumber
00207 && (*it)->getTest()->startScanNumber<=testdata->startScanNumber) {
00208
00209 std::cout << "Deleting old test for run "
00210 << (*it)->getTest()->runNumber << ", start scan "
00211 << (*it)->getTest()->startScanNumber
00212 << endl;
00213 it=m_tests.erase(it);
00214 } else {
00215 ++it;
00216 }
00217 }
00218 }
00219
00220 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const TestData& testdata) const throw() {
00221 return findTest(testdata.runNumber, testdata.startScanNumber);
00222 }
00223
00224 void AnalysisWorkerGroup::replaceTest(shared_ptr<const TestData> testdata){
00225 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00226 if (!testdata) throw Sct::IllegalStateError("Couldnt replace non-existant test",__FILE__,__LINE__);
00227 shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs = findTest(*testdata);
00228 if (!talgs) {
00229 std::cout << "WARNING - adding rather than replacing test!" << std::endl;
00230 addTest(testdata);
00231 talgs = findTest(*testdata);
00232 }
00233 if (!talgs) throw Sct::IllegalStateError("Couldn't find or add test",__FILE__,__LINE__);
00234 talgs->replaceTest(testdata);
00235 }
00236
00237 shared_ptr<AnalysisWorkerGroup::TestAlgs> AnalysisWorkerGroup::findTest(const unsigned long runno, const unsigned long scanno) const throw() {
00238 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00239 shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs;
00240 for (list<shared_ptr<TestAlgs> >::const_iterator it = m_tests.begin();
00241 it != m_tests.end();
00242 ++it ) {
00243 boost::recursive_mutex::scoped_lock lock((*it)->getMutex());
00244 shared_ptr<const TestData> td=(*it)->getTest();
00245
00246 if ( runno == td->runNumber ){
00247
00248 #warning "Second number below should be 3!"
00249 int results_per_scan = (td->options.find("full")==std::string::npos) ? 1 : 3;
00250 if (scanno >= td->startScanNumber &&
00251 scanno < td->startScanNumber + td->nScans * results_per_scan) talgs=*it;
00252 }
00253 }
00254 return talgs;
00255 }
00256
00257
00258 void AnalysisWorkerGroup::purge() throw() {
00259
00260 this->setPaused(true);
00261 while (paused() != nWorkers()) sleep(1);
00262
00263
00264 shared_ptr<IOName> s;
00265 do {s=pop(); } while(s);
00266 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00267
00268
00269 std::cout << "Clearing tests ... " << endl;
00270 m_tests.clear();
00271 std::cout << " ... done clearing tests" << endl;
00272 this->setPaused(false);
00273 }
00274
00275 void AnalysisWorkerGroup::work(shared_ptr<IOName> name) throw() {
00276 using namespace SctData;
00277
00278 try {
00279 timer t;
00280 UniqueID id (name->getUniqueID());
00281 const unsigned runno = id.getRunNumber();
00282 const unsigned scanno = id.getScanNumber();
00283 const string modulename= id.getModule();
00284
00285 if (debug()) std::cout << "run="<< runno << ", scan="<< scanno << ", modulename="<<modulename << std::endl;
00286
00287 shared_ptr<AnalysisWorkerGroup::TestAlgs> talgs = findTest(runno, scanno);
00288
00289 if (debug()) std::cout << "Found test" << endl;
00290 if (!talgs) {
00291 std::ostringstream oss; oss << "AnalysisWorker::work. No test found for run "
00292 << runno << " scan" << scanno;
00293 throw IllegalStateError(oss.str(), __FILE__, __LINE__);
00294 }
00295
00296
00297 shared_ptr<AnalysisAlgorithm> algorithm;
00298
00299 boost::recursive_mutex::scoped_lock talgs_lock(talgs->getMutex());
00300 algorithm = talgs->findAlgorithm(modulename);
00301
00302 if (!algorithm.get()) return;
00303
00304 if (debug()) std::cout << "algorithm.get()=" << algorithm.get() << endl;
00305
00306 bool isfit=(name->getClassName()=="SctData::FitScanResult");
00307
00308
00309 boost::recursive_mutex::scoped_lock algorithm_lock (algorithm->getMutex());
00310 if (isfit) {
00311 if (debug()) std::cout << "got fit from queue" << endl;
00312 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00313 algorithm->addFitScanResult(name);
00314 } else {
00315 if (debug()) std::cout << "got raw from queue" << endl;
00316 boost::recursive_mutex::scoped_lock lock(m_tests_access);
00317 algorithm->addRawScanResult(name);
00318 }
00319 if (debug()) std::cout << "checking if I can analyse algorithm" << endl;
00320 if (algorithm->canAnalyze()) {
00321 if (algorithm->isDone()) throw IllegalStateError("AnalysisWorker::work. Algorithm has additional, unnecessary Scan - analysis already done", __FILE__, __LINE__);
00322 if (debug()) std::cout << "checking existance" << std::endl;
00323 algorithm->checkTestResultExists();
00324 algorithm->addOverheadTime(t.elapsed());
00325
00326 t.restart();
00327 if (debug()) std::cout << "loading data" << std::endl;
00328 algorithm->loadData();
00329 algorithm->addIOTime(t.elapsed());
00330
00331 t.restart();
00332 if (debug()) std::cout << "analyzing" << std::endl;
00333 algorithm->analyze();
00334 algorithm->addAnalysisTime(t.elapsed());
00335
00336 t.restart();
00337 if (debug()) std::cout << "finishing" << std::endl;
00338 algorithm->finish();
00339 algorithm->addIOTime(t.elapsed());
00340 if (debug()) std::cout << "removing algorithm" << std::endl;
00341 talgs->removeAlgorithm(algorithm);
00342 } else {
00343 algorithm->addOverheadTime(t.elapsed());
00344 }
00345
00346 } catch (InvalidArgumentError& e) {
00347 e.sendToMrs();
00348 } catch(Throwable& e) {
00349 e.sendToMrs(MRS_ERROR);
00350 } catch(std::exception& e) {
00351 StdExceptionWrapper sew(e);
00352 sew.sendToMrs(MRS_ERROR);
00353 } catch(...) {
00354 Error e("uncaught unknown exception", __FILE__, __LINE__);
00355 e.sendToMrs(MRS_ERROR);
00356 }
00357 }
00358
00359
00360 }