00001 #include "AnalysisService.h"
00002 #include "AnalysisAlgorithmMap.h"
00003 #include "AnalysisAlgorithm.h"
00004 #include "AnalysisWorkerGroup.h"
00005
00006 #include "Sct/SctNames.h"
00007 #include "Sct/IpcObjectException.h"
00008 #include "Sct/IS/IOManagerIS.h"
00009 #include "SctData/TestResult.h"
00010
00011 #include <boost/scoped_ptr.hpp>
00012
00013 using namespace Sct;
00014 using namespace SctData;
00015 using boost::scoped_ptr;
00016
00017 int main(int argc, char** argv) {
00018 using namespace SctAnalysis;
00019 setExceptionHandlers(argv[0]);
00020
00021 bool multiThreaded = false;
00022 IPCCore::init(multiThreaded);
00023 AnalysisService& s=AnalysisService::instance();
00024
00025 try {
00026 s.setFitStrategy("NagFitStrategy");
00027 } catch(LogicError& e) {
00028
00029 }
00030
00031 try {
00032 s.getFitStrategy().setOptions("NQR");
00033
00034 if (!s.publish()) {
00035 IpcObjectException e("Failed to publish Analysis Service", __FILE__, __LINE__);
00036 e.sendToMrs(MRS_ERROR);
00037 }
00038 s.run();
00039 } catch (Throwable& e) {
00040 e.sendToMrs(MRS_FATAL);
00041 terminate();
00042 }
00043 }
00044
00045 namespace SctAnalysis {
00046
00047 AnalysisService::AnalysisService(const string& fitStrategyName) : IPCObject(AnalysisServiceI_C_AnalysisServiceInterface_instanceName, &getServer()) {
00048 setFitStrategy(fitStrategyName);
00049 workergroup = new AnalysisWorkerGroup();
00050 }
00051
00052 IPCServer& AnalysisService::getServer() throw() {
00053 static IPCServer server(AnalysisServiceI_C_AnalysisServiceInterface_serverName, SctNames::getPartition());
00054 return server;
00055 }
00056
00057 void AnalysisService::run() {
00058 ISInfoReceiver r(SctNames::getPartition());
00059
00060
00061 workergroup->go(1);
00062
00063
00064 r.subscribe(SctNames::getControlDataName().c_str(), ".*TestData.*", testDataCallback, this);
00065 r.subscribe(SctNames::getFittedDataName().c_str(), ".*FitScanResult.*", scanResultCallback, this);
00066 r.subscribe(SctNames::getEventDataName().c_str(), ".*RawScanResult.*", scanResultCallback, this);
00067 r.run();
00068 }
00069
00070 char* AnalysisService::ping(AnalysisServiceIStatus* status) throw() {
00071 status->returnCode = AnalysisServiceIReply_Success;
00072
00073 char *message = new char[60];
00074 strcpy(message,"Hello from Analysis Service");
00075 return message;
00076 }
00077
00078 char* AnalysisService::ipcStatus(AnalysisServiceIStatus* status) throw() {
00079 status->returnCode = AnalysisServiceIReply_Success;
00080
00081 char *statusMessage = new char[10000];
00082 ostringstream os;
00083 workergroup->printStatus(os);
00084 strcpy(statusMessage, os.str().c_str());
00085 return statusMessage;
00086 }
00087
00088 void AnalysisService::ipcAnalyzeModule(AnalysisServiceIStatus* status, char* testname, char* modulename) throw() {
00089 try {
00090 shared_ptr<TestData> testdata( new TestData() );
00091 ISInfoDictionary& id = SctNames::getISDictionary();
00092 ISInfo::Status result = id.findValue(testname, *testdata);
00093
00094 if (result != ISInfo::Success) {
00095 string os = "Error reading from IS server. Couldn't get: ";
00096 os += testname;
00097 throw IsException(result, os, __FILE__, __LINE__);
00098 }
00099 cout << "read " << testname << " which has #scans=" << testdata->nScans << endl;
00100 if (workergroup->findTest(*testdata).get() == 0 ) {
00101 workergroup->addTest(testdata);
00102 }
00103
00104 for (unsigned iscan=testdata->startScanNumber;
00105 iscan<(testdata->startScanNumber + testdata->nScans);
00106 ++iscan ) {
00107 {
00108 ostringstream name_raw;
00110 name_raw << "SctData::RawScanResult." << testdata->runNumber << "." << iscan << "." << modulename;
00111 cout << "Looking for " << name_raw.str() << endl;
00112 ISInfoIterator rawIter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), name_raw.str().c_str());
00113 while ( rawIter() ) {
00114 try {
00115 cout << "trying to add " << rawIter.name() << endl;
00116 workergroup->push( rawIter.name());
00117 } catch ( Sct::Throwable& e) {
00118 e.sendToMrs(MRS_ERROR);
00119 }
00120 }
00121 } {
00122 ostringstream name_fit;
00124 name_fit << "SctData::FitScanResult." << testdata->runNumber << "." << iscan << "." << modulename;
00125 cout << "Looking for " << name_fit.str() << endl;
00126 ISInfoIterator fitIter(SctNames::getPartition(), SctNames::getFittedDataName().c_str(), name_fit.str().c_str());
00127 while ( fitIter() ) {
00128 try {
00129 cout << "trying to add " << fitIter.name() << endl;
00130 workergroup->push(fitIter.name());
00131 } catch ( Sct::Throwable& e) {
00132 e.sendToMrs(MRS_ERROR);
00133 }
00134 }
00135 }
00136 }
00137 status->returnCode = AnalysisServiceIReply_Success;
00138 } catch (Throwable& e) {
00139 e.sendToMrs(MRS_ERROR);
00140 return;
00141 }
00142 }
00143
00144 void AnalysisService::ipcPurge(AnalysisServiceIStatus* status) throw() {
00145 try {
00146 cout << "Doing purge" << endl;
00147 workergroup->purge();
00148 cout << "Memory purge completed" << endl;
00149 status->returnCode = AnalysisServiceIReply_Success;
00150 } catch (Throwable& e) {
00151 e.sendToMrs(MRS_ERROR);
00152 return;
00153 }
00154 }
00155
00156 void AnalysisService::ipcAnalyze(AnalysisServiceIStatus* status, char* name) throw() {
00157 ipcAnalyzeModule(status, name, "*");
00158 }
00159
00160 void AnalysisService::scanResultCallback(ISCallbackInfo * isc) {
00161 try {
00162 if (isc->reason() != ISInfoCreated )
00163 return;
00165 cout << "AnalysisService scanResultCallback on " << string(isc->name())<<endl;
00166
00167
00168
00169 instance().workergroup->push(isc->name());
00170
00171 } catch(Sct::Throwable& e) {
00172 e.sendToMrs(MRS_ERROR);
00173 }
00174 }
00175
00176 void AnalysisService::testDataCallback(ISCallbackInfo * isc) {
00177 try {
00178 shared_ptr<TestData> testdata( new TestData() );
00179 isc->value(*testdata);
00180
00181 if (isc->reason() == ISInfoCreated ) {
00182 cout << "Callback for new testdata object "<<endl;
00183 if (instance().workergroup->findTest(*testdata).get() == 0 ) {
00184
00185 instance().workergroup->addTest(testdata);
00186
00187 } else {
00188 throw Sct::InvalidArgumentError(string("AnalysisService::TestCallback Test already exists: ")+string(isc->name()), __FILE__, __LINE__);
00189 }
00190
00191 } else if (isc->reason() == ISInfoUpdated && testdata->status == TestData::ABORTED) {
00192 instance().workergroup->removeTestsUpTo(testdata);
00193
00194 } else if (isc->reason() == ISInfoDeleted ) {
00195 instance().workergroup->removeTestsUpTo(testdata);
00196 }
00197 } catch (Sct::Throwable& e) {
00198 e.sendToMrs(MRS_ERROR);
00199 }
00200 }
00201
00202 void AnalysisService::setFitStrategy(const string& name) throw(LogicError) {
00203 fitStrategy = SctFitter::FitStrategyFactory::instance().getStrategy(name);
00204 }
00205
00206 SctFitter::FitStrategy& AnalysisService::getFitStrategy() const throw(LogicError) {
00207 if (!fitStrategy)
00208 throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00209 return *fitStrategy;
00210 }
00211
00212 AnalysisService& AnalysisService::instance() {
00213 if (!service)
00214 return initialize();
00215 return *service;
00216 }
00217
00218 AnalysisService& AnalysisService::initialize(const string& fitStrategyName) {
00219 if (service) {
00220 service->setFitStrategy(fitStrategyName);
00221 } else {
00222 service = new AnalysisService(fitStrategyName);
00223 }
00224 return *service;
00225 }
00226
00227 AnalysisService* AnalysisService::service = 0;
00228
00229 }