Fitter.cpp

00001 #include "Fitter.h"
00002 #include "../src/FitStrategy.h"
00003 #include "FitterWorkerGroup.h"
00004 #include "ThresholdFitAlgorithm.h"
00005 #include "ThresholdFitAlgorithm2.h"
00006 #include "FitAlgorithmMap.h"
00007 
00008 #include "Sct/SctNames.h"
00009 #include "Sct/SctParameters.h"
00010 #include "Sct/DestroyingDeleter.h"
00011 #include "Sct/StdExceptionWrapper.h"
00012 #include "Sct/IS/IONameIS.h"
00013 #include "Sct/OmniMarshalling.h"
00014 
00015 #include <iostream>
00016 #include <vector>
00017 #include <cassert>
00018 #include <ctime>
00019 #include <string>
00020 #include <TMath.h>
00021 #include <boost/timer.hpp>
00022 #include <is/info.h>
00023 #include <is/infodictionary.h>
00024 #include <is/infoiterator.h>
00025 #include <is/inforeceiver.h>
00026 #include <is/infoT.h>
00027 
00028 using namespace std;
00029 using namespace Sct;
00030 using namespace SctData;
00031 using namespace boost;
00032 
00033 namespace SctFitter {
00034 void doFits(ISCallbackInfo * isc);
00035 
00036 Fitter* Fitter::fitter=0;
00037 
00038 Fitter& Fitter::instance() {
00039     if (!fitter)
00040       throw Sct::IllegalStateError("Attempt to use uninitialised Fitter", __FILE__, __LINE__);
00041     return *fitter;
00042 }
00043 
00044 Fitter& Fitter::initialize(FitterArguments args) throw(ConfigurationException) {
00045     fitter = new Fitter(args);
00046     return *fitter;
00047 }
00048 
00049   void Fitter::shutdown(){
00050     if (workergroup) workergroup->stop();
00051     getFitterServer().stop();
00052   }
00053 
00054   Fitter::Fitter(FitterArguments args) throw(ConfigurationException) : 
00055     IPCNamedObject<POA_FitterI::FitterInterface>(
00056                          SctNames::getPartition(),
00057                          args.instanceName()),
00058     m_nFitsDone(0), m_nFitErrors(0), m_nScans(0), m_scanTimeTaken(0), m_ioTimeTaken(0), m_scan("No last scan"), 
00059     m_fitStrategy(0), workergroup(0), m_args(args)  
00060   {
00061     std::cout << "Fitting Service " << m_args.print() << endl;
00062     Sct::SctNames::Mrs() << "FITTER_START" << MRS_PARAM<const char*>("Name",m_args.instanceName().c_str()) 
00063              << MRS_INFORMATION << ENDM;
00064     auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00065     if (!ir.get())
00066         throw ConfigurationException("Fitter::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00067     m_infoReceiver  = ir;
00068     try{
00069       internal_setFitStrategy(m_args.getStrategyName());
00070     }catch(Sct::Throwable& e){
00071       std::string defaultStrategy("RootFitStrategy");
00072       e.sendToMrs(MRS_DIAGNOSTIC);
00073       internal_setFitStrategy(defaultStrategy);
00074     }
00075     workergroup = new FitterWorkerGroup();
00076     workergroup->reportTo(m_args.getISStatusName());
00077     updateSubscriptions();
00078     internal_setFitOptions(m_args.getFitOptions());
00079   }
00080 
00081 Fitter::~Fitter() throw() {}
00082 
00083 
00084 std::string Fitter::getOutputIsServer() const{
00085   return m_args.getOutputISServer();
00086 }
00087 
00088 IPCServer& Fitter::getFitterServer() throw() {
00089   static IPCServer fitterServer;
00090     return fitterServer;
00091 }
00092 
00093 
00094 
00095 void Fitter::go() throw(IsException) {
00096   workergroup->go(m_args.getNWorkers());
00097   if (m_args.recoveryMode()) fitAll();
00098 }
00099 
00100 void Fitter::updateSubscriptions(){
00101   if (workergroup) workergroup->setPaused(true);
00102   recursive_mutex::scoped_lock scoped_lock(counterMutex);
00103  
00104   const std::list<SctService::Arguments::Subscription>& input=m_args.getInputISServers();
00105   for (std::list<SctService::Arguments::Subscription>::const_iterator i=input.begin(); 
00106        i!=input.end(); ++i){
00107     try{
00108       ISInfo::Status s=m_infoReceiver->subscribe((*i).server.c_str(),
00109                          (*i).regexp.c_str(), doFits);
00110       if (s!=ISInfo::Success) {
00111     ostringstream os;
00112     os <<"Fitter Could not subscribe to IS server ["<<(*i).server 
00113        <<"] with regexp [" << (*i).regexp << "]";
00114     IsException(s, os.str(), __FILE__, __LINE__).sendToMrs();
00115       }else{
00116     ostringstream os;
00117     os <<"Fitter subscribed to IS server [" << (*i).server
00118        <<"] with regexp [" << (*i).regexp << "]";
00119     Sct::SctNames::Mrs() << "FIT_SUBSCRIBE" << MRS_TEXT(os.str()) << MRS_INFORMATION << ENDM;
00120       }
00121     }catch(Throwable & e){
00122       e.sendToMrs();
00123     }catch(std::exception & e){
00124       Sct::StdExceptionWrapper(e, __FILE__, __LINE__).sendToMrs();
00125     }
00126   }
00127   if (workergroup) workergroup->setPaused(false);
00128 }
00129 
00130   //infoReceiver callback function must take a static function as an argument:
00131 void Fitter::doFits(ISCallbackInfo * isc) throw(IsException, LogicError) {
00132     // find out why the callback function has been called:
00133     if (isc->reason() != is::Created && isc->reason() != is::Updated )
00134     return;
00135     
00136     // put it in the queue
00137     Fitter::instance().workergroup->push(isc->name());
00138 }
00139 
00140   //CORBA
00141 void Fitter::fit(const char* name) throw() {
00142     // put it in the queue
00143     workergroup->push(name);
00144 }
00145 
00146   //CORBA
00147 void Fitter::fitAll() throw() {
00148   std::list<string> alreadyDone;
00149   {
00150     ISInfoIterator iter(SctNames::getPartition(), getOutputIsServer(), "SctData::FitScanResult.*");
00151     while (iter()) {
00152       Sct::IS::IONameIS name(iter.name());
00153       alreadyDone.push_back(name.getUniqueID());
00154     }
00155   }
00156 
00157   const std::list<SctService::Arguments::Subscription>& input=m_args.getInputISServers();
00158   unsigned nToRecover=0;
00159   for (std::list<SctService::Arguments::Subscription>::const_iterator i=input.begin(); i!=input.end(); ++i){
00160     ISInfoIterator iter(SctNames::getPartition(), (*i).server.c_str(), 
00161             (*i).regexp.c_str());
00162     while (iter()) {
00163       Sct::IS::IONameIS name(iter.name());
00164       std::string id=name.getUniqueID();
00165       if (std::find(alreadyDone.begin(), alreadyDone.end(), id)==alreadyDone.end()){
00166     nToRecover++;
00167     workergroup->push(iter.name());
00168       }
00169     }
00170   }
00171   std::ostringstream msg;
00172   msg << "Need to fit total of " << nToRecover << " objects";
00173   SctNames::Mrs() << "FITTING_RECOVERY" << MRS_TEXT(msg.str()) << MRS_DIAGNOSTIC << ENDM;
00174 }
00175 
00176   //CORBA
00177 char* Fitter::getFitOptions() throw() {
00178     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00179     return Sct::copyStringToCorba(internal_getFitStrategy().getOptions());
00180 }
00181 
00182   //CORBA
00183 char* Fitter::status() throw() {
00184     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00185     return Sct::copyStringToCorba(internal_status());
00186 }
00187 
00188 void Fitter::internal_setFitOptions(const string & opt) throw (LogicError) {
00189   recursive_mutex::scoped_lock scoped_lock(counterMutex);
00190   internal_getFitStrategy().setOptions(opt);
00191 }
00192 
00193   //CORBA
00194 void Fitter::setFitOptions(const char *n) throw() {
00195     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00196     string name( n );
00197     try {
00198         internal_setFitOptions(name);
00199     } catch (LogicError &e) {
00200         return;
00201     }
00202 }
00203 
00204 char* Fitter::getFitStrategy() throw() {
00205     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00206     return Sct::copyStringToCorba(internal_getFitStrategy().getName());
00207 }
00208 
00209 FitterI::StringList* Fitter::listFitAlgorithms(){
00210   return Sct::copyStringListToCorba<FitterI::StringList>(FitAlgorithmMap::instance().listAlgorithms());
00211 }
00212 
00213 FitterI::StringList* Fitter::listFitStrategies(){
00214   return Sct::copyStringListToCorba<FitterI::StringList>(FitStrategyFactory::instance().listStrategies());
00215 }
00216 
00217   //CORBA
00218 void Fitter::setFitStrategy(const char *n) throw() {
00219     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00220     string name( n );
00221     cout <<"trying to set fit strategy "<<name<<endl;
00222     try {
00223         internal_setFitStrategy(name);
00224     } catch (LogicError& e) {
00225       std::string message("failed  to set fit strategy ");
00226       message += name;
00227       Sct::InvalidArgumentError(message, __FILE__, __LINE__).sendToMrs();
00228       return;
00229     }
00230     cout <<"set fit strategy "<<name<<endl;
00231 }
00232 
00233 string Fitter::internal_status() const throw() {
00234     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00235     string strategyName;
00236     try {
00237         strategyName=internal_getFitStrategy().getName();
00238     } catch (LogicError &e) {
00239         strategyName="NONE";
00240     }
00241 
00242     ostringstream oss;
00243     oss << "\t=== FittingService ===\n";
00244     oss << m_args.print()<<endl;
00245     oss << "Current: \n\tStrategy=" << strategyName << endl;
00246     oss << "\tOptions=" << internal_getFitStrategy().getOptions() << endl;
00247     oss << "\tWorkers=" << workergroup->nWorkers() << endl
00248     << "\t  (Busy=" << workergroup->busy() << ")" << endl
00249     << "\tFits Done=" << internal_nFitsDone() << endl
00250     << "\tErrors=" << internal_nFitErrors() << endl
00251     << "\tScans Done=" << m_nScans << endl 
00252     << "\tLast scan: " << internal_lastScan() << endl; 
00253     oss << "\tQueue=" << workergroup->queueSize();
00254     if (workergroup->isFifo()){
00255       oss << " [FIFO]\n";
00256     }else{
00257       oss << " [FILO]\n";
00258     }
00259     oss << "Debug : " << (workergroup->debug()?"yes":"no") << "\n";
00260     
00261     oss << "\nAvailable FitAlgorithms\n\t";
00262     std::list<string> algs = FitAlgorithmMap::instance().listAlgorithms();
00263     for (std::list<string>::const_iterator i=algs.begin(); i!=algs.end(); ++i){
00264       oss << *i << " ";
00265     }
00266     
00267     oss << "\n\nAvailable FitStrategies\n\t";
00268     std::list<string> strategies = FitStrategyFactory::instance().listStrategies();
00269     for (std::list<string>::const_iterator i=strategies.begin(); i!=strategies.end(); ++i){
00270       oss << *i << " ";
00271     }
00272     
00273     oss << "\n\nTests:\n";
00274     workergroup->printTests(oss);
00275     
00276     oss << "\n\nTiming:"<<endl
00277     << "\tI/O time: " << m_ioTimeTaken << endl
00278     << "\tScan time: "<< m_scanTimeTaken << endl 
00279     << "\tApprox average time/scan/thread: " << getAverageTimePerScan() << endl;
00280     return oss.str();
00281 }
00282 
00283 void Fitter::setDebug(CORBA::Boolean value){
00284   workergroup->setDebug(value);
00285 }
00286 
00287 bool Fitter::debug(){
00288   return workergroup->debug();
00289 }
00290 
00291 void Fitter::incrementFitErrors() throw() {
00292     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00293     ++m_nFitErrors;
00294 }
00295 
00296 void Fitter::incrementFitsDone() throw() {
00297     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00298     ++m_nFitsDone;
00299 }
00300 
00301 void Fitter::scanDone(double time) throw() {
00302     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00303     ++m_nScans;
00304     m_scanTimeTaken += time;
00305 }
00306 
00307 void Fitter::addIOTime(double time) throw() {
00308     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00309     m_ioTimeTaken += time;
00310 }
00311 
00312 
00313 double Fitter::getAverageTimePerScan() const throw() {
00314     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00315     double time = (m_ioTimeTaken+m_scanTimeTaken)/workergroup->nWorkers();
00316     if (m_nScans > 0) time /= m_nScans;     
00317     return time;
00318 }
00319 
00320 
00321 long Fitter::internal_nFitsDone() const throw() {
00322     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00323     return m_nFitsDone;
00324 }
00325 
00326 long Fitter::internal_nFitErrors() const throw() {
00327     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00328     return m_nFitErrors;
00329 }
00330 
00331   //CORBA
00332 long Fitter::nFitsDone() throw() {
00333     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00334     return internal_nFitsDone();
00335 }
00336 
00337   //CORBA
00338 long Fitter::nFitErrors() throw() {
00339     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00340     return internal_nFitErrors();
00341 }
00342 
00343   //CORBA
00344 long Fitter::queueLength() {
00345     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00346     return workergroup->queueSize();
00347 }
00348 
00349   //CORBA
00350 long Fitter::busy() {
00351     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00352     return workergroup->busy();
00353 }
00354 
00355 std::string Fitter::internal_lastScan() const throw() {
00356     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00357     return m_scan;
00358 }
00359 
00360   //CORBA
00361 char* Fitter::lastScan() throw() {    
00362     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00363     return copyStringToCorba(m_scan);
00364 }
00365 
00366 bool Fitter::isFifo(){
00367   return workergroup->isFifo();
00368 }
00369 
00370 void Fitter::setFifo(bool val){
00371   workergroup->setFifo(val);
00372 }
00373 
00374 FitStrategy& Fitter::internal_getFitStrategy() const throw(LogicError) {
00375     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00376     if (!m_fitStrategy)
00377         throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00378     return *m_fitStrategy;
00379 }
00380 
00381 void Fitter::internal_setFitStrategy(const string & name) throw(LogicError) {
00382     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00383     m_fitStrategy = FitStrategyFactory::instance().getStrategy(name);
00384     SctNames::Mrs() << "FIT_STRATEGY" << MRS_TEXT(string("set fit strategy to ")+name) 
00385             << MRS_INFORMATION << ENDM;
00386 }
00387 
00388 } // end of namespace SctFitter

Generated on Mon Feb 6 14:01:20 2006 for SCT DAQ/DCS Software - C++ by  doxygen 1.4.6