Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Data Structures | File List | Namespace Members | Data Fields | Related Pages

AnalysisService.cpp

00001 #include "AnalysisService.h"
00002 #include "AnalysisAlgorithmMap.h"
00003 #include "AnalysisAlgorithm.h"
00004 #include "AnalysisWorkerGroup.h"
00005 
00006 #include "Sct/SctNames.h"
00007 #include "Sct/IpcObjectException.h"
00008 #include "Sct/IS/IOManagerIS.h"
00009 #include "Sct/IS/IONameIS.h"
00010 #include "Sct/ConfigurationException.h"
00011 #include "SctData/TestResult.h"
00012 #include "CalibrationController/IS/TestData.h"
00013 
00014 #include <pmg/pmg_initSync.h>
00015 #include <boost/scoped_ptr.hpp>
00016 
00017 using namespace Sct;
00018 using namespace Sct::IS;
00019 using namespace SctData;
00020 using boost::scoped_ptr;
00021 
00022 void pmgSynch(void *) {
00023     pmg_initSync();
00024 }
00025 
00026 int main(int argc, char** argv) {
00027     using namespace SctAnalysis;
00028     setExceptionHandlers(argv[0]);
00029 
00030     bool multiThreaded = true; //Choose single/multi-threaded
00031     IPCCore::init(multiThreaded);
00032     AnalysisService& s=AnalysisService::instance();
00033 
00034     try {
00035         s.setFitStrategy("NagFitStrategy");
00036     } catch(LogicError& e) {
00037         //No need to do anything - stick with default
00038     }
00039 
00040     try {
00041         s.getFitStrategy().setOptions("NQR");
00042         
00043         if (!s.publish()) {
00044             IpcObjectException e("Failed to publish Analysis Service", __FILE__, __LINE__);
00045             e.sendToMrs(MRS_ERROR);
00046         }
00047         s.getServer().doSoon(pmgSynch, NULL);
00048         s.run();
00049         s.getServer().run();
00050         s.withdraw();
00051     } catch (Throwable& e) {
00052         e.sendToMrs(MRS_FATAL);
00053         terminate();
00054     }
00055 }
00056 
00057 namespace SctAnalysis {
00058 
00059 AnalysisService::AnalysisService(const string& fitStrategyName) : IPCObject(AnalysisServiceI_C_AnalysisServiceInterface_instanceName, &getServer()),
00060     infoReceiver(new ISInfoReceiver(SctNames::getPartition())) {
00061     if (!infoReceiver.get())
00062         throw ConfigurationException("AnalysisService::AnalysisService can't make infoReceiver ", __FILE__, __LINE__) ;
00063     setFitStrategy(fitStrategyName);
00064     workergroup = new AnalysisWorkerGroup();
00065 }
00066 
00067 IPCServer& AnalysisService::getServer() throw() {
00068     static IPCServer server(AnalysisServiceI_C_AnalysisServiceInterface_serverName, SctNames::getPartition());
00069     return server;
00070 }
00071 
00072 void AnalysisService::run() {
00073     // start up worker thread.
00074     workergroup->go(1);
00075 
00076     // subscribe to various IS stuff.
00077     infoReceiver->subscribe(SctNames::getControlDataName().c_str(), ".*TestData.*", testDataCallback, this);
00078     infoReceiver->subscribe(SctNames::getFittedDataName().c_str(),  ".*FitScanResult.*", scanResultCallback, this);
00079     infoReceiver->subscribe(SctNames::getEventDataName().c_str(), ".*RawScanResult.*", scanResultCallback, this);
00080 }
00081 
00082   ilu_ShortInteger AnalysisService::busy(AnalysisServiceIStatus* status) {
00083     boost::recursive_mutex::scoped_lock lock(m_status_access);
00084      status->returnCode = AnalysisServiceIReply_Success;
00085     return workergroup->busy();
00086   }
00087 
00088   ilu_ShortInteger AnalysisService::queueLength(AnalysisServiceIStatus* status) {
00089   boost::recursive_mutex::scoped_lock lock(m_status_access);
00090      status->returnCode = AnalysisServiceIReply_Success;
00091     return workergroup->busy();
00092   }
00093 
00094 char* AnalysisService::status(AnalysisServiceIStatus* status) throw() {
00095   boost::recursive_mutex::scoped_lock lock(m_status_access);
00096     status->returnCode = AnalysisServiceIReply_Success;
00097     // Have to make a new char* because IPC wants to delete it after use!
00098     ostringstream os;
00099     workergroup->printStatus(os);
00100     os << "\n" << AnalysisAlgorithmMap::instance().getAllStatus() << endl;
00101     const unsigned length = os.str().length()+1;
00102     char *statusMessage = new char[length];
00103     strcpy(statusMessage, os.str().c_str());
00104     return statusMessage;
00105 }
00106 
00107 void AnalysisService::analyzeModule(AnalysisServiceIStatus* status, char* testname, char* modulename) throw() {
00108     try {
00109         shared_ptr<TestData> testdata( new TestData() );
00110         ISInfoDictionary& id = SctNames::getISDictionary();
00111         ISInfo::Status result = id.findValue(testname, *testdata);
00112 
00113         if (result != ISInfo::Success) {
00114             string os = "Error reading from IS server.  Couldn't get: ";
00115             os += testname;
00116             throw IsException(result, os, __FILE__, __LINE__);
00117         }
00118         cout << "read " << testname << " which has #scans=" <<  testdata->nScans << endl;
00119         if (workergroup->findTest(*testdata).get() == 0 ) {
00120             workergroup->addTest(testdata);
00121         }
00122 
00123         for (unsigned iscan=testdata->startScanNumber;
00124                 iscan<(testdata->startScanNumber + testdata->nScans);
00125                 ++iscan ) {
00126             {
00127                 ostringstream name_raw;
00129                 name_raw << "SctData::RawScanResult." <<  testdata->runNumber << "." << iscan << "." << modulename;
00130                 cout << "Looking for " << name_raw.str() << endl;
00131                 ISInfoIterator rawIter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), name_raw.str().c_str());
00132                 while ( rawIter() ) {
00133                     try {
00134                         cout << "trying to add " << rawIter.name() << endl;
00135                         workergroup->push(shared_ptr<IOName>(new IONameIS(rawIter.name())));
00136                     } catch ( Sct::Throwable& e) {
00137                         e.sendToMrs(MRS_ERROR);
00138                     }
00139                 }
00140             } {
00141                 ostringstream name_fit;
00143                 name_fit << "SctData::FitScanResult." <<  testdata->runNumber << "." << iscan << "." << modulename;
00144                 cout << "Looking for " << name_fit.str() << endl;
00145                 ISInfoIterator fitIter(SctNames::getPartition(), SctNames::getFittedDataName().c_str(), name_fit.str().c_str());
00146                 while ( fitIter() ) {
00147                     try {
00148                         cout << "trying to add " << fitIter.name() << endl;
00149                         workergroup->push(shared_ptr<IOName>(new IONameIS(fitIter.name())));
00150                     } catch ( Sct::Throwable& e) {
00151                         e.sendToMrs(MRS_ERROR);
00152                     }
00153                 }
00154             }
00155         }
00156         status->returnCode = AnalysisServiceIReply_Success;
00157     } catch (Throwable& e) {
00158         e.sendToMrs(MRS_ERROR);
00159         return;
00160     }
00161 }
00162 
00163 void AnalysisService::purge(AnalysisServiceIStatus* status) throw() {
00164     try {
00165         cout << "Doing purge" << endl;
00166         workergroup->purge();
00167         cout << "Memory purge completed" << endl;
00168         status->returnCode = AnalysisServiceIReply_Success;
00169     } catch (Throwable& e) {
00170         e.sendToMrs(MRS_ERROR);
00171         return;
00172     }
00173 }
00174 
00175 void AnalysisService::analyze(AnalysisServiceIStatus* status, char* name) throw() {
00176     analyzeModule(status, name, "*");
00177 }
00178 
00179 void AnalysisService::scanResultCallback(ISCallbackInfo * isc) {
00180     try {
00181         if (isc->reason() != ISInfoCreated )
00182             return;
00184         cout << "AnalysisService scanResultCallback on " << string(isc->name())<<endl;
00185         //cout << "AnalysisService scanResultCallback busy=" << AnalysisWorkerGroupFactory::pointer()->busy()
00186         //<< " Q=" << AnalysisWorkerGroupFactory::pointer()->queueSize()
00187         //<< " Nworker=" << AnalysisWorkerGroupFactory::pointer()->nWorkers() << endl;
00188         instance().workergroup->push(shared_ptr<IOName>(new IONameIS(isc->name())));
00189         //cout << " scanResultCallback done " << endl;
00190     } catch(Sct::Throwable& e) {
00191         e.sendToMrs(MRS_ERROR);
00192     }
00193 }
00194 
00195 void AnalysisService::testDataCallback(ISCallbackInfo * isc) {
00196     try {
00197         shared_ptr<TestData> testdata( new TestData() );
00198         isc->value(*testdata);
00199 
00200         if (isc->reason() == ISInfoCreated ) {
00201             cout << "Callback for new testdata object "<<endl;
00202             if (instance().workergroup->findTest(*testdata).get() == 0 ) {
00203                 //cout << "Listener adding test" << endl;
00204                 instance().workergroup->addTest(testdata);
00205                 //cout << "Listener added test" << endl;
00206             } else {
00207                 throw Sct::InvalidArgumentError(string("AnalysisService::TestCallback Test already exists: ")+string(isc->name()), __FILE__, __LINE__);
00208             }
00209 
00210     } else if (isc->reason() == ISInfoUpdated && testdata->status == TestData::ABORTED) {
00211         instance().workergroup->removeTestsUpTo(testdata);
00212         
00213     } else if (isc->reason() == ISInfoDeleted ) {
00214             instance().workergroup->removeTestsUpTo(testdata);
00215         }
00216     } catch (Sct::Throwable& e) {
00217         e.sendToMrs(MRS_ERROR);
00218     }
00219 }
00220 
00221 void AnalysisService::setFitStrategy(const string& name) throw(LogicError) {
00222     fitStrategy = SctFitter::FitStrategyFactory::instance().getStrategy(name);
00223 }
00224 
00225 SctFitter::FitStrategy& AnalysisService::getFitStrategy() const throw(LogicError) {
00226     if (!fitStrategy)
00227         throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00228     return *fitStrategy;
00229 }
00230 
00231 AnalysisService& AnalysisService::instance() {
00232     if (!service)
00233         return initialize();
00234     return *service;
00235 }
00236 
00237 AnalysisService& AnalysisService::initialize(const string& fitStrategyName) {
00238     if (service) {
00239         service->setFitStrategy(fitStrategyName);
00240     } else {
00241         service = new AnalysisService(fitStrategyName);
00242     }
00243     return *service;
00244 }
00245 
00246 AnalysisService* AnalysisService::service = 0;
00247 
00248 }// end of namespace SctAnalysis

Generated on Thu Jul 15 09:50:42 2004 for SCT DAQ/DCS Software - C++ by doxygen 1.3.5