00001 #include "AnalysisService.h"
00002 #include "AnalysisAlgorithmMap.h"
00003 #include "AnalysisAlgorithm.h"
00004 #include "AnalysisWorkerGroup.h"
00005 #include "AnalysisArguments.h"
00006 #include "DcsInterface.h"
00007 #include "ConfigurationInterface.h"
00008
00009 #include "Sct/SctNames.h"
00010 #include "Sct/StdExceptionWrapper.h"
00011 #include "Sct/IpcObjectException.h"
00012 #include "Sct/IS/IOManagerIS.h"
00013 #include "Sct/IS/IONameIS.h"
00014 #include "Sct/ConfigurationException.h"
00015 #include "Sct/DestroyingDeleter.h"
00016 #include "Sct/UnsupportedOperationError.h"
00017 #include "Sct/OmniMarshalling.h"
00018 #include "sctConf/configuration.h"
00019 #include "SctData/TestResult.h"
00020 #include "sctConfIPC/configipc.h"
00021 #include "CalibrationController/IS/TestData.h"
00022
00023 #include <ipc/core.h>
00024 #include <boost/scoped_ptr.hpp>
00025 #include <is/info.h>
00026 #include <is/infodictionary.h>
00027 #include <is/infoiterator.h>
00028 #include <is/inforeceiver.h>
00029 #include <is/infoT.h>
00030
00031 using namespace Sct;
00032 using namespace Sct::IS;
00033 using namespace SctData;
00034 using boost::scoped_ptr;
00035
00036 namespace SctAnalysis {
00037
00038 AnalysisService::~AnalysisService(){
00039 }
00040
00041 void AnalysisService::shutdown(){
00042 if (workergroup) workergroup->stop();
00043 getServer().stop();
00044 }
00045
00046 AnalysisService::AnalysisService(AnalysisArguments args) :
00047 IPCNamedObject<POA_AnalysisServiceI::AnalysisServiceInterface,
00048 ipc::multi_thread>(
00049 SctNames::getPartition(),
00050 args.instanceName()),
00051 dcsinterface(0), infoReceiver(new ISInfoReceiver(SctNames::getPartition())), m_args(args) {
00052 if (!infoReceiver.get())
00053 throw ConfigurationException("AnalysisService::AnalysisService can't make infoReceiver ", __FILE__, __LINE__) ;
00054 std::cout << "Making DcsInterface ... " << std::endl;
00055 try{
00056 boost::shared_ptr<SctConfiguration::Configuration> configipc(new SctConfiguration::ConfigIPC);
00057 s_configuration=configipc;
00058 boost::shared_ptr<ConfigurationInterface> config_local (new ConfigurationInterface(configipc));
00059 dcsinterface = new DcsInterface(config_local);
00060 config_local->update();
00061 }catch(std::exception &e){
00062 SctNames::Mrs() << MRS_TEXT("I wont be able to save DCS information as I dont know the configuration (std::exception)")
00063 << "ANALYSIS_NO_DCS" << MRS_WARNING << ENDM;
00064 }catch(CORBA::Exception &e){
00065 SctNames::Mrs() << MRS_TEXT("I wont be able to save DCS information as I dont know the configuration (CORBA::Exception)")
00066 << "ANALYSIS_NO_DCS" << MRS_WARNING << ENDM;
00067 }catch(...){
00068 SctNames::Mrs() << MRS_TEXT("I wont be able to save DCS information as I dont know the configuration (unknown exception)")
00069 << "ANALYSIS_NO_DCS" << MRS_WARNING << ENDM;
00070 }
00071 try{
00072 setFitStrategy(m_args.getStrategyName());
00073 }catch(Throwable& e){
00074 e.sendToMrs();
00075 setFitStrategy("RootFitStrategy");
00076 }
00077 getFitStrategy().setOptions(m_args.getFitOptions());
00078 workergroup = new AnalysisWorkerGroup();
00079 workergroup->reportTo(m_args.getISStatusName());
00080 }
00081
00082 IPCServer& AnalysisService::getServer() throw() {
00083 static IPCServer server;
00084 return server;
00085 }
00086
00087 const std::string AnalysisService::getOutputISServerName(){
00088 return m_args.getOutputISServer();
00089 }
00090
00091 boost::shared_ptr<SctConfiguration::Configuration> AnalysisService::getConfiguration(){
00092 return s_configuration;
00093 }
00094
00095 void AnalysisService::run() {
00096
00097 workergroup->go(m_args.getNWorkers());
00098
00099 if (m_args.recoveryMode()) recover();
00100 const list<SctService::Arguments::Subscription>& theList=m_args.getInputISServers();
00101
00102
00103 for (list<SctService::Arguments::Subscription>::const_iterator i=theList.begin();
00104 i!=theList.end(); ++i){
00105 ISInfo::Status s = infoReceiver->subscribe((*i).server.c_str(),
00106 (*i).regexp.c_str(), generalCallback);
00107 if (s!=ISInfo::Success){
00108 std::ostringstream oss;
00109 oss << "Analysis service could not subscribed to [" << (*i).server
00110 << "] to receive [" << (*i).regexp << "]";
00111 SctNames::Mrs() << "ANALYSIS_SUBSCRIBE" << MRS_TEXT(oss.str()) << MRS_ERROR << ENDM;
00112 }else{
00113 std::ostringstream oss;
00114 oss << "Analysis service subscribed to [" << (*i).server
00115 << "] to receive [" << (*i).regexp << "]";
00116 SctNames::Mrs() << "ANALYSIS_SUBSCRIBE" << MRS_TEXT(oss.str()) << MRS_INFORMATION << ENDM;
00117 }
00118 }
00119 }
00120
00121 CORBA::Short AnalysisService::busy() {
00122 boost::recursive_mutex::scoped_lock lock(m_status_access);
00123 return workergroup->busy();
00124 }
00125
00126 CORBA::Short AnalysisService::queueLength() {
00127 boost::recursive_mutex::scoped_lock lock(m_status_access);
00128 return workergroup->busy();
00129 }
00130
00131 char* AnalysisService::status() throw() {
00132 boost::recursive_mutex::scoped_lock lock(m_status_access);
00133 std::ostringstream os;
00134 workergroup->printStatus(os);
00135 os << "\n" << AnalysisAlgorithmMap::instance().getAllStatus() << std::endl;
00136 if (dcsinterface){
00137 os << "AnalysisService : DCS INFORMATION" << std::endl;
00138 dcsinterface->printStatus(os);
00139 }
00140
00141 const unsigned length = os.str().length()+1;
00142 char *statusMessage = new char[length];
00143
00144 strcpy(statusMessage, os.str().c_str());
00145 return statusMessage;
00146 }
00147
00148 void AnalysisService::analyzeModule(const char* testname, const char* modulename) throw() {
00149 try {
00150 shared_ptr<TestData> testdata( new TestData() );
00151 ISInfoDictionary& id = SctNames::getISDictionary();
00152 ISInfo::Status result = id.findValue(testname, *testdata);
00153
00154 if (result != ISInfo::Success) {
00155 string os = "Error reading from IS server. Couldn't get: ";
00156 os += testname;
00157 throw IsException(result, os, __FILE__, __LINE__);
00158 }
00159 if (workergroup->debug()) std::cout << "read " << testname << " which has #scans=" << testdata->nScans << std::endl;
00160 if (workergroup->findTest(*testdata).get() == 0 ) {
00161 workergroup->addTest(testdata);
00162 }
00163
00164 for (unsigned iscan=testdata->startScanNumber;
00165 iscan<(testdata->startScanNumber + testdata->nScans);
00166 ++iscan ) {
00167 {
00168 std::ostringstream name_raw;
00170 name_raw << "SctData::RawScanResult." << testdata->runNumber << "." << iscan << "." << modulename;
00171 if (workergroup->debug()) std::cout << "Looking for " << name_raw.str() << std::endl;
00172 ISInfoIterator rawIter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), name_raw.str().c_str());
00173 while ( rawIter() ) {
00174 try {
00175 if (workergroup->debug())std::cout << "trying to add " << rawIter.name() << std::endl;
00176 workergroup->push(shared_ptr<IOName>(new IONameIS(rawIter.name())));
00177 } catch ( Sct::Throwable& e) {
00178 e.sendToMrs(MRS_ERROR);
00179 }
00180 }
00181 } {
00182 std::ostringstream name_fit;
00183 name_fit << "SctData::FitScanResult." << testdata->runNumber << "." << iscan << "." << modulename;
00184 if(workergroup->debug())std::cout << "Looking for " << name_fit.str() << std::endl;
00185 ISInfoIterator fitIter(SctNames::getPartition(), SctNames::getFittedDataName().c_str(), name_fit.str().c_str());
00186 while ( fitIter() ) {
00187 try {
00188 if (workergroup->debug()) std::cout << "trying to add " << fitIter.name() << std::endl;
00189 workergroup->push(shared_ptr<IOName>(new IONameIS(fitIter.name())));
00190 } catch ( Sct::Throwable& e) {
00191 e.sendToMrs(MRS_ERROR);
00192 }
00193 }
00194 }
00195 }
00196 } catch (Throwable& e) {
00197 e.sendToMrs(MRS_ERROR);
00198 return;
00199 }
00200 }
00201
00202 void AnalysisService::purge() throw() {
00203 try {
00204 std::cout << "Doing purge" << std::endl;
00205 workergroup->purge();
00206 std::cout << "Memory purge completed" << std::endl;
00207 } catch (Throwable& e) {
00208 e.sendToMrs(MRS_ERROR);
00209 return;
00210 }
00211 }
00212
00213 void AnalysisService::analyze(const char* name) throw() {
00214 analyzeModule(name, "*");
00215 }
00216
00217 AnalysisServiceI::StringList* AnalysisService::listAlgorithms(){
00218 return Sct::copyStringListToCorba<AnalysisServiceI::StringList>(AnalysisAlgorithmMap::instance().listAlgorithms());
00219 }
00220
00221 void AnalysisService::generalCallback(ISCallbackInfo * isc){
00222 try{
00223 IONameIS name(isc->name());
00224 if (name.getClassName()=="TestData"){
00225 testDataCallback(isc);
00226 }else if (name.getClassName()=="SctData::RawScanResult"
00227 || name.getClassName()=="SctData::FitScanResult") {
00228 scanResultCallback(isc);
00229 }
00230 }catch(Sct::Throwable& e){
00231 e.sendToMrs();
00232 }
00233 }
00234
00235 void AnalysisService::scanResultCallback(ISCallbackInfo * isc) {
00236 try {
00237 if (isc->reason() != is::Created )
00238 return;
00239 instance().workergroup->push(shared_ptr<IOName>(new IONameIS(isc->name())));
00240 } catch(Sct::Throwable& e) {
00241 e.sendToMrs(MRS_ERROR);
00242 }
00243 }
00244
00245 void AnalysisService::testDataCallback(ISCallbackInfo * isc) {
00246 try {
00247 shared_ptr<TestData> testdata( new TestData() );
00248 isc->value(*testdata);
00249
00250 if (isc->reason() == is::Created ) {
00251 std::cout << "adding " << isc->name() << std::endl;
00252 instance().workergroup->addTest(testdata);
00253 } else if (isc->reason() == is::Updated && testdata->status == TestData::ABORTED) {
00254 std::cout << "removing aborted test " << isc->name() << std::endl;
00255 instance().workergroup->removeTestsUpTo(testdata);
00256 } else if (isc->reason() == is::Updated) {
00257 std::cout << "replacing " << isc->name() << std::endl;
00258 instance().workergroup->replaceTest(testdata);
00259 } else if (isc->reason() == is::Deleted ) {
00260 std::cout << "removing " << isc->name() << std::endl;
00261 instance().workergroup->removeTestsUpTo(testdata);
00262 }
00263 } catch (Sct::Throwable& e) {
00264 e.sendToMrs(MRS_ERROR);
00265 }
00266 }
00267
00268 void AnalysisService::setFitStrategy(const string& name) throw(LogicError) {
00269 SctNames::Mrs() << "ANALYSIS_FIT_STRATEGY" << MRS_PARAM<const char*>("Strategy", name.c_str())
00270 << MRS_INFORMATION << MRS_TEXT("Fitting method is being set") << ENDM;
00271 fitStrategy = SctFitter::FitStrategyFactory::instance().getStrategy(name);
00272 }
00273
00274 SctFitter::FitStrategy& AnalysisService::getFitStrategy() const throw(LogicError) {
00275 if (!fitStrategy)
00276 throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00277 return *fitStrategy;
00278 }
00279
00280 AnalysisService& AnalysisService::instance() {
00281 if (!service) throw Sct::IllegalStateError("Attempt to use uninitialised analysis", __FILE__, __LINE__);
00282 return *service;
00283 }
00284
00285 bool AnalysisService::isFifo(){
00286 return instance().workergroup->isFifo();
00287 }
00288
00289 void AnalysisService::setFifo(bool val){
00290 instance().workergroup->setFifo(val);
00291 }
00292
00293 AnalysisService& AnalysisService::initialize(AnalysisArguments args) {
00294 service = new AnalysisService(args);
00295 return *service;
00296 }
00297
00298 DcsInterface& AnalysisService::getDcsInterface(){
00299 if (!dcsinterface){
00300 throw ConfigurationException("Cant return dcsinterface",__FILE__, __LINE__);
00301 }else{
00302 return *dcsinterface;
00303 }
00304 }
00305
00306 void AnalysisService::recover(){
00307 SctNames::Mrs() << "ANALYSIS_RECOVERY" << MRS_TEXT("Analysis recovery started in recovery mode") << ENDM;
00308 Sct::UnsupportedOperationError("Recovery not implimented", __FILE__, __LINE__).sendToMrs(MRS_DIAGNOSTIC);
00309 }
00310
00311
00312 AnalysisService* AnalysisService::service=0;
00313
00314 }