AnalysisService.cpp

00001 #include "AnalysisService.h"
00002 #include "AnalysisAlgorithmMap.h"
00003 #include "AnalysisAlgorithm.h"
00004 #include "AnalysisWorkerGroup.h"
00005 #include "AnalysisArguments.h"
00006 #include "DcsInterface.h"
00007 #include "ConfigurationInterface.h"
00008 
00009 #include "Sct/SctNames.h"
00010 #include "Sct/StdExceptionWrapper.h"
00011 #include "Sct/IpcObjectException.h"
00012 #include "Sct/IS/IOManagerIS.h"
00013 #include "Sct/IS/IONameIS.h"
00014 #include "Sct/ConfigurationException.h"
00015 #include "Sct/DestroyingDeleter.h"
00016 #include "Sct/UnsupportedOperationError.h"
00017 #include "Sct/OmniMarshalling.h"
00018 #include "sctConf/configuration.h"
00019 #include "SctData/TestResult.h"
00020 #include "sctConfIPC/configipc.h"
00021 #include "CalibrationController/IS/TestData.h"
00022 
00023 #include <ipc/core.h>
00024 #include <boost/scoped_ptr.hpp>
00025 #include <is/info.h>
00026 #include <is/infodictionary.h>
00027 #include <is/infoiterator.h>
00028 #include <is/inforeceiver.h>
00029 #include <is/infoT.h>
00030 
00031 using namespace Sct;
00032 using namespace Sct::IS;
00033 using namespace SctData;
00034 using boost::scoped_ptr;
00035 
00036 namespace SctAnalysis {
00037 
00038 AnalysisService::~AnalysisService(){
00039 }
00040 
00041 void AnalysisService::shutdown(){
00042   if (workergroup) workergroup->stop();
00043   getServer().stop();
00044 }
00045 
00046 AnalysisService::AnalysisService(AnalysisArguments args) : 
00047       IPCNamedObject<POA_AnalysisServiceI::AnalysisServiceInterface,
00048              ipc::multi_thread>(
00049                     SctNames::getPartition(),
00050                     args.instanceName()),
00051       dcsinterface(0), infoReceiver(new ISInfoReceiver(SctNames::getPartition())), m_args(args) {
00052   if (!infoReceiver.get())
00053     throw ConfigurationException("AnalysisService::AnalysisService can't make infoReceiver ", __FILE__, __LINE__) ;
00054   std::cout << "Making DcsInterface ... " << std::endl;
00055     try{
00056       boost::shared_ptr<SctConfiguration::Configuration> configipc(new SctConfiguration::ConfigIPC);
00057       s_configuration=configipc;
00058       boost::shared_ptr<ConfigurationInterface> config_local (new ConfigurationInterface(configipc));
00059       dcsinterface = new DcsInterface(config_local);
00060       config_local->update();
00061     }catch(std::exception &e){
00062       SctNames::Mrs() << MRS_TEXT("I wont be able to save DCS information as I dont know the configuration (std::exception)")
00063               << "ANALYSIS_NO_DCS" << MRS_WARNING << ENDM;
00064     }catch(CORBA::Exception &e){
00065       SctNames::Mrs() << MRS_TEXT("I wont be able to save DCS information as I dont know the configuration (CORBA::Exception)")
00066               << "ANALYSIS_NO_DCS" << MRS_WARNING << ENDM;
00067     }catch(...){
00068       SctNames::Mrs() << MRS_TEXT("I wont be able to save DCS information as I dont know the configuration (unknown exception)")
00069               << "ANALYSIS_NO_DCS" << MRS_WARNING << ENDM;
00070     }
00071     try{
00072       setFitStrategy(m_args.getStrategyName());
00073     }catch(Throwable& e){
00074       e.sendToMrs();
00075       setFitStrategy("RootFitStrategy");
00076     }
00077     getFitStrategy().setOptions(m_args.getFitOptions());
00078     workergroup = new AnalysisWorkerGroup();
00079     workergroup->reportTo(m_args.getISStatusName());
00080 }
00081 
00082 IPCServer& AnalysisService::getServer() throw() {
00083     static IPCServer server;
00084     return server;
00085 }
00086 
00087 const std::string AnalysisService::getOutputISServerName(){
00088   return m_args.getOutputISServer();
00089 }
00090 
00091 boost::shared_ptr<SctConfiguration::Configuration> AnalysisService::getConfiguration(){
00092   return s_configuration;
00093 }
00094 
00095 void AnalysisService::run() {
00096     // start up worker thread.
00097     workergroup->go(m_args.getNWorkers());
00098     
00099     if (m_args.recoveryMode()) recover();
00100     const list<SctService::Arguments::Subscription>& theList=m_args.getInputISServers();
00101 
00102     // subscribe to various IS stuff.
00103     for (list<SctService::Arguments::Subscription>::const_iterator i=theList.begin(); 
00104      i!=theList.end(); ++i){
00105       ISInfo::Status s = infoReceiver->subscribe((*i).server.c_str(), 
00106                          (*i).regexp.c_str(), generalCallback);
00107       if (s!=ISInfo::Success){
00108     std::ostringstream oss;
00109     oss << "Analysis service could not subscribed to [" << (*i).server 
00110         << "] to receive [" << (*i).regexp << "]";
00111     SctNames::Mrs() << "ANALYSIS_SUBSCRIBE" << MRS_TEXT(oss.str()) << MRS_ERROR << ENDM;
00112       }else{
00113     std::ostringstream oss;
00114     oss << "Analysis service subscribed to [" << (*i).server 
00115         << "] to receive [" << (*i).regexp << "]";
00116     SctNames::Mrs() << "ANALYSIS_SUBSCRIBE" << MRS_TEXT(oss.str()) << MRS_INFORMATION << ENDM;
00117       }
00118     }
00119 }
00120 
00121   CORBA::Short AnalysisService::busy() {
00122     boost::recursive_mutex::scoped_lock lock(m_status_access);
00123     return workergroup->busy();
00124   }
00125 
00126   CORBA::Short AnalysisService::queueLength() {
00127   boost::recursive_mutex::scoped_lock lock(m_status_access);
00128     return workergroup->busy();
00129   }
00130 
00131 char* AnalysisService::status() throw() {
00132   boost::recursive_mutex::scoped_lock lock(m_status_access);
00133     std::ostringstream os;
00134     workergroup->printStatus(os);
00135     os << "\n" << AnalysisAlgorithmMap::instance().getAllStatus() << std::endl;
00136     if (dcsinterface){
00137       os << "AnalysisService : DCS INFORMATION" << std::endl;
00138       dcsinterface->printStatus(os);
00139     }
00140 
00141     const unsigned length = os.str().length()+1;
00142     char *statusMessage = new char[length];
00143     // Have to make a new char* because IPC wants to delete it after use!
00144     strcpy(statusMessage, os.str().c_str());
00145     return statusMessage;
00146 }
00147 
00148 void AnalysisService::analyzeModule(const char* testname, const char* modulename) throw() {
00149     try {
00150         shared_ptr<TestData> testdata( new TestData() );
00151         ISInfoDictionary& id = SctNames::getISDictionary();
00152         ISInfo::Status result = id.findValue(testname, *testdata);
00153 
00154         if (result != ISInfo::Success) {
00155             string os = "Error reading from IS server.  Couldn't get: ";
00156             os += testname;
00157             throw IsException(result, os, __FILE__, __LINE__);
00158         }
00159         if (workergroup->debug()) std::cout << "read " << testname << " which has #scans=" <<  testdata->nScans << std::endl;
00160         if (workergroup->findTest(*testdata).get() == 0 ) {
00161             workergroup->addTest(testdata);
00162         }
00163 
00164         for (unsigned iscan=testdata->startScanNumber;
00165                 iscan<(testdata->startScanNumber + testdata->nScans);
00166                 ++iscan ) {
00167             {
00168                 std::ostringstream name_raw;
00170                 name_raw << "SctData::RawScanResult." <<  testdata->runNumber << "." << iscan << "." << modulename;
00171                 if (workergroup->debug()) std::cout << "Looking for " << name_raw.str() << std::endl;
00172                 ISInfoIterator rawIter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), name_raw.str().c_str());
00173                 while ( rawIter() ) {
00174                     try {
00175                         if (workergroup->debug())std::cout << "trying to add " << rawIter.name() << std::endl;
00176                         workergroup->push(shared_ptr<IOName>(new IONameIS(rawIter.name())));
00177                     } catch ( Sct::Throwable& e) {
00178                         e.sendToMrs(MRS_ERROR);
00179                     }
00180                 }
00181             } {
00182                 std::ostringstream name_fit;
00183                 name_fit << "SctData::FitScanResult." <<  testdata->runNumber << "." << iscan << "." << modulename;
00184                 if(workergroup->debug())std::cout << "Looking for " << name_fit.str() << std::endl;
00185                 ISInfoIterator fitIter(SctNames::getPartition(), SctNames::getFittedDataName().c_str(), name_fit.str().c_str());
00186                 while ( fitIter() ) {
00187                     try {
00188                         if (workergroup->debug()) std::cout << "trying to add " << fitIter.name() << std::endl;
00189                         workergroup->push(shared_ptr<IOName>(new IONameIS(fitIter.name())));
00190                     } catch ( Sct::Throwable& e) {
00191                         e.sendToMrs(MRS_ERROR);
00192                     }
00193                 }
00194             }
00195         }
00196     } catch (Throwable& e) {
00197         e.sendToMrs(MRS_ERROR);
00198         return;
00199     }
00200 }
00201 
00202 void AnalysisService::purge() throw() {
00203     try {
00204         std::cout << "Doing purge" << std::endl;
00205         workergroup->purge();
00206         std::cout << "Memory purge completed" << std::endl;
00207     } catch (Throwable& e) {
00208         e.sendToMrs(MRS_ERROR);
00209         return;
00210     }
00211 }
00212 
00213 void AnalysisService::analyze(const char* name) throw() {
00214     analyzeModule(name, "*");
00215 }
00216 
00217 AnalysisServiceI::StringList* AnalysisService::listAlgorithms(){
00218   return Sct::copyStringListToCorba<AnalysisServiceI::StringList>(AnalysisAlgorithmMap::instance().listAlgorithms());
00219 }
00220 
00221 void AnalysisService::generalCallback(ISCallbackInfo * isc){
00222   try{
00223     IONameIS name(isc->name());
00224     if (name.getClassName()=="TestData"){
00225       testDataCallback(isc);
00226     }else if (name.getClassName()=="SctData::RawScanResult"
00227           || name.getClassName()=="SctData::FitScanResult") {
00228       scanResultCallback(isc);
00229     }
00230   }catch(Sct::Throwable& e){
00231     e.sendToMrs();
00232   }
00233 }
00234 
00235 void AnalysisService::scanResultCallback(ISCallbackInfo * isc) {
00236     try {
00237         if (isc->reason() != is::Created )
00238             return;
00239         instance().workergroup->push(shared_ptr<IOName>(new IONameIS(isc->name())));
00240     } catch(Sct::Throwable& e) {
00241         e.sendToMrs(MRS_ERROR);
00242     }
00243 }
00244 
00245 void AnalysisService::testDataCallback(ISCallbackInfo * isc) {
00246     try {
00247         shared_ptr<TestData> testdata( new TestData() );
00248         isc->value(*testdata);
00249 
00250         if (isc->reason() == is::Created ) { 
00251             std::cout << "adding " << isc->name() << std::endl; 
00252             instance().workergroup->addTest(testdata);
00253     } else if (isc->reason() == is::Updated && testdata->status == TestData::ABORTED) {
00254             std::cout << "removing aborted test " << isc->name() << std::endl; 
00255       instance().workergroup->removeTestsUpTo(testdata);
00256     } else if (isc->reason() == is::Updated) {
00257             std::cout << "replacing " << isc->name() << std::endl; 
00258         instance().workergroup->replaceTest(testdata);
00259     } else if (isc->reason() == is::Deleted ) {
00260             std::cout << "removing " << isc->name() << std::endl; 
00261             instance().workergroup->removeTestsUpTo(testdata);
00262         }
00263     } catch (Sct::Throwable& e) {
00264         e.sendToMrs(MRS_ERROR);
00265     }
00266 }
00267 
00268 void AnalysisService::setFitStrategy(const string& name) throw(LogicError) {
00269     SctNames::Mrs() << "ANALYSIS_FIT_STRATEGY"  << MRS_PARAM<const char*>("Strategy", name.c_str()) 
00270             << MRS_INFORMATION << MRS_TEXT("Fitting method is being set") << ENDM;
00271     fitStrategy = SctFitter::FitStrategyFactory::instance().getStrategy(name);
00272 }
00273 
00274 SctFitter::FitStrategy& AnalysisService::getFitStrategy() const throw(LogicError) {
00275     if (!fitStrategy)
00276         throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00277     return *fitStrategy;
00278 }
00279 
00280 AnalysisService& AnalysisService::instance() {
00281     if (!service) throw Sct::IllegalStateError("Attempt to use uninitialised analysis", __FILE__, __LINE__);
00282     return *service;
00283 }
00284 
00285 bool AnalysisService::isFifo(){
00286   return instance().workergroup->isFifo();
00287 }
00288 
00289 void AnalysisService::setFifo(bool val){
00290   instance().workergroup->setFifo(val);
00291 }
00292 
00293 AnalysisService& AnalysisService::initialize(AnalysisArguments args) {
00294     service = new AnalysisService(args);
00295     return *service;
00296 }
00297 
00298 DcsInterface& AnalysisService::getDcsInterface(){
00299   if (!dcsinterface){
00300     throw ConfigurationException("Cant return dcsinterface",__FILE__, __LINE__);
00301   }else{
00302     return *dcsinterface;
00303   }
00304 }
00305 
00306 void AnalysisService::recover(){
00307   SctNames::Mrs() << "ANALYSIS_RECOVERY" << MRS_TEXT("Analysis recovery started in recovery mode") << ENDM;
00308   Sct::UnsupportedOperationError("Recovery not implimented", __FILE__, __LINE__).sendToMrs(MRS_DIAGNOSTIC);
00309 }
00310 
00311 
00312 AnalysisService* AnalysisService::service=0;
00313 
00314 }// end of namespace SctAnalysis

Generated on Mon Feb 6 14:01:16 2006 for SCT DAQ/DCS Software - C++ by  doxygen 1.4.6