Main Page   Modules   Namespace List   Class Hierarchy   Data Structures   File List   Namespace Members   Data Fields   Globals   Related Pages  

AnalysisService.cpp

Go to the documentation of this file.
00001 #include "AnalysisService.h"
00002 #include "AnalysisAlgorithmMap.h"
00003 #include "AnalysisAlgorithm.h"
00004 #include "AnalysisWorkerGroup.h"
00005 
00006 #include "Sct/SctNames.h"
00007 #include "Sct/IpcObjectException.h"
00008 #include "Sct/IS/IOManagerIS.h"
00009 #include "SctData/TestResult.h"
00010 
00011 #include <boost/scoped_ptr.hpp>
00012 
00013 using namespace Sct;
00014 using namespace SctData;
00015 using boost::scoped_ptr;
00016 
00017 int main(int argc, char** argv) {
00018     using namespace SctAnalysis;
00019     setExceptionHandlers(argv[0]);
00020 
00021     bool multiThreaded = false; //Choose single/multi-threaded
00022     IPCCore::init(multiThreaded);
00023     AnalysisService& s=AnalysisService::instance();
00024 
00025     try {
00026         s.setFitStrategy("NagFitStrategy");
00027     } catch(LogicError& e) {
00028         //No need to do anything - stick with default
00029     }
00030 
00031     try {
00032         s.getFitStrategy().setOptions("NQR");
00033         
00034         if (!s.publish()) {
00035             IpcObjectException e("Failed to publish Analysis Service", __FILE__, __LINE__);
00036             e.sendToMrs(MRS_ERROR);
00037         }
00038         s.run();
00039     } catch (Throwable& e) {
00040         e.sendToMrs(MRS_FATAL);
00041         terminate();
00042     }
00043 }
00044 
00045 namespace SctAnalysis {
00046 
00047 AnalysisService::AnalysisService(const string& fitStrategyName) : IPCObject(AnalysisServiceI_C_AnalysisServiceInterface_instanceName, &getServer()) {
00048     setFitStrategy(fitStrategyName);
00049     workergroup = new AnalysisWorkerGroup();
00050 }
00051 
00052 IPCServer& AnalysisService::getServer() throw() {
00053     static IPCServer server(AnalysisServiceI_C_AnalysisServiceInterface_serverName, SctNames::getPartition());
00054     return server;
00055 }
00056 
00057 void AnalysisService::run() {
00058     ISInfoReceiver r(SctNames::getPartition());
00059 
00060     // start up worker thread.
00061     workergroup->go(1);
00062 
00063     // subscribe to various IS stuff.
00064     r.subscribe(SctNames::getControlDataName().c_str(), ".*TestData.*", testDataCallback, this);
00065     r.subscribe(SctNames::getFittedDataName().c_str(),  ".*FitScanResult.*", scanResultCallback, this);
00066     r.subscribe(SctNames::getEventDataName().c_str(), ".*RawScanResult.*", scanResultCallback, this);
00067     r.run();
00068 }
00069 
00070 char* AnalysisService::ping(AnalysisServiceIStatus* status) throw() {
00071     status->returnCode = AnalysisServiceIReply_Success;
00072     // Have to make a new char* because IPC wants to delete it after use!
00073     char *message = new char[60];
00074     strcpy(message,"Hello from Analysis Service");
00075     return message;
00076 }
00077 
00078 char* AnalysisService::ipcStatus(AnalysisServiceIStatus* status) throw() {
00079     status->returnCode = AnalysisServiceIReply_Success;
00080     // Have to make a new char* because IPC wants to delete it after use!
00081     char *statusMessage = new char[10000];
00082     ostringstream os;
00083     workergroup->printStatus(os);
00084     strcpy(statusMessage, os.str().c_str());
00085     return statusMessage;
00086 }
00087 
00088 void AnalysisService::ipcAnalyzeModule(AnalysisServiceIStatus* status, char* testname, char* modulename) throw() {
00089     try {
00090         shared_ptr<TestData> testdata( new TestData() );
00091         ISInfoDictionary& id = SctNames::getISDictionary();
00092         ISInfo::Status result = id.findValue(testname, *testdata);
00093 
00094         if (result != ISInfo::Success) {
00095             string os = "Error reading from IS server.  Couldn't get: ";
00096             os += testname;
00097             throw IsException(result, os, __FILE__, __LINE__);
00098         }
00099         cout << "read " << testname << " which has #scans=" <<  testdata->nScans << endl;
00100         if (workergroup->findTest(*testdata).get() == 0 ) {
00101             workergroup->addTest(testdata);
00102         }
00103 
00104         for (unsigned iscan=testdata->startScanNumber;
00105                 iscan<(testdata->startScanNumber + testdata->nScans);
00106                 ++iscan ) {
00107             {
00108                 ostringstream name_raw;
00110                 name_raw << "SctData::RawScanResult." <<  testdata->runNumber << "." << iscan << "." << modulename;
00111                 cout << "Looking for " << name_raw.str() << endl;
00112                 ISInfoIterator rawIter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), name_raw.str().c_str());
00113                 while ( rawIter() ) {
00114                     try {
00115                         cout << "trying to add " << rawIter.name() << endl;
00116                         workergroup->push( rawIter.name());
00117                     } catch ( Sct::Throwable& e) {
00118                         e.sendToMrs(MRS_ERROR);
00119                     }
00120                 }
00121             } {
00122                 ostringstream name_fit;
00124                 name_fit << "SctData::FitScanResult." <<  testdata->runNumber << "." << iscan << "." << modulename;
00125                 cout << "Looking for " << name_fit.str() << endl;
00126                 ISInfoIterator fitIter(SctNames::getPartition(), SctNames::getFittedDataName().c_str(), name_fit.str().c_str());
00127                 while ( fitIter() ) {
00128                     try {
00129                         cout << "trying to add " << fitIter.name() << endl;
00130                         workergroup->push(fitIter.name());
00131                     } catch ( Sct::Throwable& e) {
00132                         e.sendToMrs(MRS_ERROR);
00133                     }
00134                 }
00135             }
00136         }
00137         status->returnCode = AnalysisServiceIReply_Success;
00138     } catch (Throwable& e) {
00139         e.sendToMrs(MRS_ERROR);
00140         return;
00141     }
00142 }
00143 
00144 void AnalysisService::ipcPurge(AnalysisServiceIStatus* status) throw() {
00145     try {
00146         cout << "Doing purge" << endl;
00147         workergroup->purge();
00148         cout << "Memory purge completed" << endl;
00149         status->returnCode = AnalysisServiceIReply_Success;
00150     } catch (Throwable& e) {
00151         e.sendToMrs(MRS_ERROR);
00152         return;
00153     }
00154 }
00155 
00156 void AnalysisService::ipcAnalyze(AnalysisServiceIStatus* status, char* name) throw() {
00157     ipcAnalyzeModule(status, name, "*");
00158 }
00159 
00160 void AnalysisService::scanResultCallback(ISCallbackInfo * isc) {
00161     try {
00162         if (isc->reason() != ISInfoCreated )
00163             return;
00165         cout << "AnalysisService scanResultCallback on " << string(isc->name())<<endl;
00166         //cout << "AnalysisService scanResultCallback busy=" << AnalysisWorkerGroupFactory::pointer()->busy()
00167         //<< " Q=" << AnalysisWorkerGroupFactory::pointer()->queueSize()
00168         //<< " Nworker=" << AnalysisWorkerGroupFactory::pointer()->nWorkers() << endl;
00169         instance().workergroup->push(isc->name());
00170         //cout << " scanResultCallback done " << endl;
00171     } catch(Sct::Throwable& e) {
00172         e.sendToMrs(MRS_ERROR);
00173     }
00174 }
00175 
00176 void AnalysisService::testDataCallback(ISCallbackInfo * isc) {
00177     try {
00178         shared_ptr<TestData> testdata( new TestData() );
00179         isc->value(*testdata);
00180 
00181         if (isc->reason() == ISInfoCreated ) {
00182             cout << "Callback for new testdata object "<<endl;
00183             if (instance().workergroup->findTest(*testdata).get() == 0 ) {
00184                 //cout << "Listener adding test" << endl;
00185                 instance().workergroup->addTest(testdata);
00186                 //cout << "Listener added test" << endl;
00187             } else {
00188                 throw Sct::InvalidArgumentError(string("AnalysisService::TestCallback Test already exists: ")+string(isc->name()), __FILE__, __LINE__);
00189             }
00190 
00191     } else if (isc->reason() == ISInfoUpdated && testdata->status == TestData::ABORTED) {
00192         instance().workergroup->removeTestsUpTo(testdata);
00193         
00194     } else if (isc->reason() == ISInfoDeleted ) {
00195             instance().workergroup->removeTestsUpTo(testdata);
00196         }
00197     } catch (Sct::Throwable& e) {
00198         e.sendToMrs(MRS_ERROR);
00199     }
00200 }
00201 
00202 void AnalysisService::setFitStrategy(const string& name) throw(LogicError) {
00203     fitStrategy = SctFitter::FitStrategyFactory::instance().getStrategy(name);
00204 }
00205 
00206 SctFitter::FitStrategy& AnalysisService::getFitStrategy() const throw(LogicError) {
00207     if (!fitStrategy)
00208         throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00209     return *fitStrategy;
00210 }
00211 
00212 AnalysisService& AnalysisService::instance() {
00213     if (!service)
00214         return initialize();
00215     return *service;
00216 }
00217 
00218 AnalysisService& AnalysisService::initialize(const string& fitStrategyName) {
00219     if (service) {
00220         service->setFitStrategy(fitStrategyName);
00221     } else {
00222         service = new AnalysisService(fitStrategyName);
00223     }
00224     return *service;
00225 }
00226 
00227 AnalysisService* AnalysisService::service = 0;
00228 
00229 }// end of namespace SctAnalysis

Generated on Mon Dec 15 19:35:55 2003 for SCT DAQ/DCS Software by doxygen1.3-rc3