00001 #include "AnalysisService.h"
00002 #include "AnalysisAlgorithmMap.h"
00003 #include "AnalysisAlgorithm.h"
00004 #include "AnalysisWorkerGroup.h"
00005
00006 #include "Sct/SctNames.h"
00007 #include "Sct/IpcObjectException.h"
00008 #include "Sct/IS/IOManagerIS.h"
00009 #include "Sct/IS/IONameIS.h"
00010 #include "Sct/ConfigurationException.h"
00011 #include "SctData/TestResult.h"
00012 #include "CalibrationController/IS/TestData.h"
00013
00014 #include <pmg/pmg_initSync.h>
00015 #include <boost/scoped_ptr.hpp>
00016
00017 using namespace Sct;
00018 using namespace Sct::IS;
00019 using namespace SctData;
00020 using boost::scoped_ptr;
00021
00022 void pmgSynch(void *) {
00023 pmg_initSync();
00024 }
00025
00026 int main(int argc, char** argv) {
00027 using namespace SctAnalysis;
00028 setExceptionHandlers(argv[0]);
00029
00030 bool multiThreaded = true;
00031 IPCCore::init(multiThreaded);
00032 AnalysisService& s=AnalysisService::instance();
00033
00034 try {
00035 s.setFitStrategy("NagFitStrategy");
00036 } catch(LogicError& e) {
00037
00038 }
00039
00040 try {
00041 s.getFitStrategy().setOptions("NQR");
00042
00043 if (!s.publish()) {
00044 IpcObjectException e("Failed to publish Analysis Service", __FILE__, __LINE__);
00045 e.sendToMrs(MRS_ERROR);
00046 }
00047 s.getServer().doSoon(pmgSynch, NULL);
00048 s.run();
00049 s.getServer().run();
00050 s.withdraw();
00051 } catch (Throwable& e) {
00052 e.sendToMrs(MRS_FATAL);
00053 terminate();
00054 }
00055 }
00056
00057 namespace SctAnalysis {
00058
00059 AnalysisService::AnalysisService(const string& fitStrategyName) : IPCObject(AnalysisServiceI_C_AnalysisServiceInterface_instanceName, &getServer()),
00060 infoReceiver(new ISInfoReceiver(SctNames::getPartition())) {
00061 if (!infoReceiver.get())
00062 throw ConfigurationException("AnalysisService::AnalysisService can't make infoReceiver ", __FILE__, __LINE__) ;
00063 setFitStrategy(fitStrategyName);
00064 workergroup = new AnalysisWorkerGroup();
00065 }
00066
00067 IPCServer& AnalysisService::getServer() throw() {
00068 static IPCServer server(AnalysisServiceI_C_AnalysisServiceInterface_serverName, SctNames::getPartition());
00069 return server;
00070 }
00071
00072 void AnalysisService::run() {
00073
00074 workergroup->go(1);
00075
00076
00077 infoReceiver->subscribe(SctNames::getControlDataName().c_str(), ".*TestData.*", testDataCallback, this);
00078 infoReceiver->subscribe(SctNames::getFittedDataName().c_str(), ".*FitScanResult.*", scanResultCallback, this);
00079 infoReceiver->subscribe(SctNames::getEventDataName().c_str(), ".*RawScanResult.*", scanResultCallback, this);
00080 }
00081
00082 ilu_ShortInteger AnalysisService::busy(AnalysisServiceIStatus* status) {
00083 boost::recursive_mutex::scoped_lock lock(m_status_access);
00084 status->returnCode = AnalysisServiceIReply_Success;
00085 return workergroup->busy();
00086 }
00087
00088 ilu_ShortInteger AnalysisService::queueLength(AnalysisServiceIStatus* status) {
00089 boost::recursive_mutex::scoped_lock lock(m_status_access);
00090 status->returnCode = AnalysisServiceIReply_Success;
00091 return workergroup->busy();
00092 }
00093
00094 char* AnalysisService::status(AnalysisServiceIStatus* status) throw() {
00095 boost::recursive_mutex::scoped_lock lock(m_status_access);
00096 status->returnCode = AnalysisServiceIReply_Success;
00097
00098 ostringstream os;
00099 workergroup->printStatus(os);
00100 os << "\n" << AnalysisAlgorithmMap::instance().getAllStatus() << endl;
00101 const unsigned length = os.str().length()+1;
00102 char *statusMessage = new char[length];
00103 strcpy(statusMessage, os.str().c_str());
00104 return statusMessage;
00105 }
00106
00107 void AnalysisService::analyzeModule(AnalysisServiceIStatus* status, char* testname, char* modulename) throw() {
00108 try {
00109 shared_ptr<TestData> testdata( new TestData() );
00110 ISInfoDictionary& id = SctNames::getISDictionary();
00111 ISInfo::Status result = id.findValue(testname, *testdata);
00112
00113 if (result != ISInfo::Success) {
00114 string os = "Error reading from IS server. Couldn't get: ";
00115 os += testname;
00116 throw IsException(result, os, __FILE__, __LINE__);
00117 }
00118 cout << "read " << testname << " which has #scans=" << testdata->nScans << endl;
00119 if (workergroup->findTest(*testdata).get() == 0 ) {
00120 workergroup->addTest(testdata);
00121 }
00122
00123 for (unsigned iscan=testdata->startScanNumber;
00124 iscan<(testdata->startScanNumber + testdata->nScans);
00125 ++iscan ) {
00126 {
00127 ostringstream name_raw;
00129 name_raw << "SctData::RawScanResult." << testdata->runNumber << "." << iscan << "." << modulename;
00130 cout << "Looking for " << name_raw.str() << endl;
00131 ISInfoIterator rawIter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), name_raw.str().c_str());
00132 while ( rawIter() ) {
00133 try {
00134 cout << "trying to add " << rawIter.name() << endl;
00135 workergroup->push(shared_ptr<IOName>(new IONameIS(rawIter.name())));
00136 } catch ( Sct::Throwable& e) {
00137 e.sendToMrs(MRS_ERROR);
00138 }
00139 }
00140 } {
00141 ostringstream name_fit;
00143 name_fit << "SctData::FitScanResult." << testdata->runNumber << "." << iscan << "." << modulename;
00144 cout << "Looking for " << name_fit.str() << endl;
00145 ISInfoIterator fitIter(SctNames::getPartition(), SctNames::getFittedDataName().c_str(), name_fit.str().c_str());
00146 while ( fitIter() ) {
00147 try {
00148 cout << "trying to add " << fitIter.name() << endl;
00149 workergroup->push(shared_ptr<IOName>(new IONameIS(fitIter.name())));
00150 } catch ( Sct::Throwable& e) {
00151 e.sendToMrs(MRS_ERROR);
00152 }
00153 }
00154 }
00155 }
00156 status->returnCode = AnalysisServiceIReply_Success;
00157 } catch (Throwable& e) {
00158 e.sendToMrs(MRS_ERROR);
00159 return;
00160 }
00161 }
00162
00163 void AnalysisService::purge(AnalysisServiceIStatus* status) throw() {
00164 try {
00165 cout << "Doing purge" << endl;
00166 workergroup->purge();
00167 cout << "Memory purge completed" << endl;
00168 status->returnCode = AnalysisServiceIReply_Success;
00169 } catch (Throwable& e) {
00170 e.sendToMrs(MRS_ERROR);
00171 return;
00172 }
00173 }
00174
00175 void AnalysisService::analyze(AnalysisServiceIStatus* status, char* name) throw() {
00176 analyzeModule(status, name, "*");
00177 }
00178
00179 void AnalysisService::scanResultCallback(ISCallbackInfo * isc) {
00180 try {
00181 if (isc->reason() != ISInfoCreated )
00182 return;
00184 cout << "AnalysisService scanResultCallback on " << string(isc->name())<<endl;
00185
00186
00187
00188 instance().workergroup->push(shared_ptr<IOName>(new IONameIS(isc->name())));
00189
00190 } catch(Sct::Throwable& e) {
00191 e.sendToMrs(MRS_ERROR);
00192 }
00193 }
00194
00195 void AnalysisService::testDataCallback(ISCallbackInfo * isc) {
00196 try {
00197 shared_ptr<TestData> testdata( new TestData() );
00198 isc->value(*testdata);
00199
00200 if (isc->reason() == ISInfoCreated ) {
00201 cout << "Callback for new testdata object "<<endl;
00202 if (instance().workergroup->findTest(*testdata).get() == 0 ) {
00203
00204 instance().workergroup->addTest(testdata);
00205
00206 } else {
00207 throw Sct::InvalidArgumentError(string("AnalysisService::TestCallback Test already exists: ")+string(isc->name()), __FILE__, __LINE__);
00208 }
00209
00210 } else if (isc->reason() == ISInfoUpdated && testdata->status == TestData::ABORTED) {
00211 instance().workergroup->removeTestsUpTo(testdata);
00212
00213 } else if (isc->reason() == ISInfoDeleted ) {
00214 instance().workergroup->removeTestsUpTo(testdata);
00215 }
00216 } catch (Sct::Throwable& e) {
00217 e.sendToMrs(MRS_ERROR);
00218 }
00219 }
00220
00221 void AnalysisService::setFitStrategy(const string& name) throw(LogicError) {
00222 fitStrategy = SctFitter::FitStrategyFactory::instance().getStrategy(name);
00223 }
00224
00225 SctFitter::FitStrategy& AnalysisService::getFitStrategy() const throw(LogicError) {
00226 if (!fitStrategy)
00227 throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00228 return *fitStrategy;
00229 }
00230
00231 AnalysisService& AnalysisService::instance() {
00232 if (!service)
00233 return initialize();
00234 return *service;
00235 }
00236
00237 AnalysisService& AnalysisService::initialize(const string& fitStrategyName) {
00238 if (service) {
00239 service->setFitStrategy(fitStrategyName);
00240 } else {
00241 service = new AnalysisService(fitStrategyName);
00242 }
00243 return *service;
00244 }
00245
00246 AnalysisService* AnalysisService::service = 0;
00247
00248 }