00001 #include "Fitter.h"
00002 #include "../src/FitStrategy.h"
00003 #include "FitterWorkerGroup.h"
00004 #include "ThresholdFitAlgorithm.h"
00005 #include "ThresholdFitAlgorithm2.h"
00006 #include "FitAlgorithmMap.h"
00007
00008 #include "Sct/SctNames.h"
00009 #include "Sct/SctParameters.h"
00010 #include "Sct/DestroyingDeleter.h"
00011 #include "Sct/StdExceptionWrapper.h"
00012 #include "Sct/IS/IONameIS.h"
00013 #include "Sct/OmniMarshalling.h"
00014
00015 #include <iostream>
00016 #include <vector>
00017 #include <cassert>
00018 #include <ctime>
00019 #include <string>
00020 #include <TMath.h>
00021 #include <boost/timer.hpp>
00022 #include <is/info.h>
00023 #include <is/infodictionary.h>
00024 #include <is/infoiterator.h>
00025 #include <is/inforeceiver.h>
00026 #include <is/infoT.h>
00027
00028 using namespace std;
00029 using namespace Sct;
00030 using namespace SctData;
00031 using namespace boost;
00032
00033 namespace SctFitter {
00034 void doFits(ISCallbackInfo * isc);
00035
00036 Fitter* Fitter::fitter=0;
00037
00038 Fitter& Fitter::instance() {
00039 if (!fitter)
00040 throw Sct::IllegalStateError("Attempt to use uninitialised Fitter", __FILE__, __LINE__);
00041 return *fitter;
00042 }
00043
00044 Fitter& Fitter::initialize(FitterArguments args) throw(ConfigurationException) {
00045 fitter = new Fitter(args);
00046 return *fitter;
00047 }
00048
00049 void Fitter::shutdown(){
00050 if (workergroup) workergroup->stop();
00051 getFitterServer().stop();
00052 }
00053
00054 Fitter::Fitter(FitterArguments args) throw(ConfigurationException) :
00055 IPCNamedObject<POA_FitterI::FitterInterface>(
00056 SctNames::getPartition(),
00057 args.instanceName()),
00058 m_nFitsDone(0), m_nFitErrors(0), m_nScans(0), m_scanTimeTaken(0), m_ioTimeTaken(0), m_scan("No last scan"),
00059 m_fitStrategy(0), workergroup(0), m_args(args)
00060 {
00061 std::cout << "Fitting Service " << m_args.print() << endl;
00062 Sct::SctNames::Mrs() << "FITTER_START" << MRS_PARAM<const char*>("Name",m_args.instanceName().c_str())
00063 << MRS_INFORMATION << ENDM;
00064 auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00065 if (!ir.get())
00066 throw ConfigurationException("Fitter::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00067 m_infoReceiver = ir;
00068 try{
00069 internal_setFitStrategy(m_args.getStrategyName());
00070 }catch(Sct::Throwable& e){
00071 std::string defaultStrategy("RootFitStrategy");
00072 e.sendToMrs(MRS_DIAGNOSTIC);
00073 internal_setFitStrategy(defaultStrategy);
00074 }
00075 workergroup = new FitterWorkerGroup();
00076 workergroup->reportTo(m_args.getISStatusName());
00077 updateSubscriptions();
00078 internal_setFitOptions(m_args.getFitOptions());
00079 }
00080
00081 Fitter::~Fitter() throw() {}
00082
00083
00084 std::string Fitter::getOutputIsServer() const{
00085 return m_args.getOutputISServer();
00086 }
00087
00088 IPCServer& Fitter::getFitterServer() throw() {
00089 static IPCServer fitterServer;
00090 return fitterServer;
00091 }
00092
00093
00094
00095 void Fitter::go() throw(IsException) {
00096 workergroup->go(m_args.getNWorkers());
00097 if (m_args.recoveryMode()) fitAll();
00098 }
00099
00100 void Fitter::updateSubscriptions(){
00101 if (workergroup) workergroup->setPaused(true);
00102 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00103
00104 const std::list<SctService::Arguments::Subscription>& input=m_args.getInputISServers();
00105 for (std::list<SctService::Arguments::Subscription>::const_iterator i=input.begin();
00106 i!=input.end(); ++i){
00107 try{
00108 ISInfo::Status s=m_infoReceiver->subscribe((*i).server.c_str(),
00109 (*i).regexp.c_str(), doFits);
00110 if (s!=ISInfo::Success) {
00111 ostringstream os;
00112 os <<"Fitter Could not subscribe to IS server ["<<(*i).server
00113 <<"] with regexp [" << (*i).regexp << "]";
00114 IsException(s, os.str(), __FILE__, __LINE__).sendToMrs();
00115 }else{
00116 ostringstream os;
00117 os <<"Fitter subscribed to IS server [" << (*i).server
00118 <<"] with regexp [" << (*i).regexp << "]";
00119 Sct::SctNames::Mrs() << "FIT_SUBSCRIBE" << MRS_TEXT(os.str()) << MRS_INFORMATION << ENDM;
00120 }
00121 }catch(Throwable & e){
00122 e.sendToMrs();
00123 }catch(std::exception & e){
00124 Sct::StdExceptionWrapper(e, __FILE__, __LINE__).sendToMrs();
00125 }
00126 }
00127 if (workergroup) workergroup->setPaused(false);
00128 }
00129
00130
00131 void Fitter::doFits(ISCallbackInfo * isc) throw(IsException, LogicError) {
00132
00133 if (isc->reason() != is::Created && isc->reason() != is::Updated )
00134 return;
00135
00136
00137 Fitter::instance().workergroup->push(isc->name());
00138 }
00139
00140
00141 void Fitter::fit(const char* name) throw() {
00142
00143 workergroup->push(name);
00144 }
00145
00146
00147 void Fitter::fitAll() throw() {
00148 std::list<string> alreadyDone;
00149 {
00150 ISInfoIterator iter(SctNames::getPartition(), getOutputIsServer(), "SctData::FitScanResult.*");
00151 while (iter()) {
00152 Sct::IS::IONameIS name(iter.name());
00153 alreadyDone.push_back(name.getUniqueID());
00154 }
00155 }
00156
00157 const std::list<SctService::Arguments::Subscription>& input=m_args.getInputISServers();
00158 unsigned nToRecover=0;
00159 for (std::list<SctService::Arguments::Subscription>::const_iterator i=input.begin(); i!=input.end(); ++i){
00160 ISInfoIterator iter(SctNames::getPartition(), (*i).server.c_str(),
00161 (*i).regexp.c_str());
00162 while (iter()) {
00163 Sct::IS::IONameIS name(iter.name());
00164 std::string id=name.getUniqueID();
00165 if (std::find(alreadyDone.begin(), alreadyDone.end(), id)==alreadyDone.end()){
00166 nToRecover++;
00167 workergroup->push(iter.name());
00168 }
00169 }
00170 }
00171 std::ostringstream msg;
00172 msg << "Need to fit total of " << nToRecover << " objects";
00173 SctNames::Mrs() << "FITTING_RECOVERY" << MRS_TEXT(msg.str()) << MRS_DIAGNOSTIC << ENDM;
00174 }
00175
00176
00177 char* Fitter::getFitOptions() throw() {
00178 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00179 return Sct::copyStringToCorba(internal_getFitStrategy().getOptions());
00180 }
00181
00182
00183 char* Fitter::status() throw() {
00184 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00185 return Sct::copyStringToCorba(internal_status());
00186 }
00187
00188 void Fitter::internal_setFitOptions(const string & opt) throw (LogicError) {
00189 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00190 internal_getFitStrategy().setOptions(opt);
00191 }
00192
00193
00194 void Fitter::setFitOptions(const char *n) throw() {
00195 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00196 string name( n );
00197 try {
00198 internal_setFitOptions(name);
00199 } catch (LogicError &e) {
00200 return;
00201 }
00202 }
00203
00204 char* Fitter::getFitStrategy() throw() {
00205 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00206 return Sct::copyStringToCorba(internal_getFitStrategy().getName());
00207 }
00208
00209 FitterI::StringList* Fitter::listFitAlgorithms(){
00210 return Sct::copyStringListToCorba<FitterI::StringList>(FitAlgorithmMap::instance().listAlgorithms());
00211 }
00212
00213 FitterI::StringList* Fitter::listFitStrategies(){
00214 return Sct::copyStringListToCorba<FitterI::StringList>(FitStrategyFactory::instance().listStrategies());
00215 }
00216
00217
00218 void Fitter::setFitStrategy(const char *n) throw() {
00219 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00220 string name( n );
00221 cout <<"trying to set fit strategy "<<name<<endl;
00222 try {
00223 internal_setFitStrategy(name);
00224 } catch (LogicError& e) {
00225 std::string message("failed to set fit strategy ");
00226 message += name;
00227 Sct::InvalidArgumentError(message, __FILE__, __LINE__).sendToMrs();
00228 return;
00229 }
00230 cout <<"set fit strategy "<<name<<endl;
00231 }
00232
00233 string Fitter::internal_status() const throw() {
00234 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00235 string strategyName;
00236 try {
00237 strategyName=internal_getFitStrategy().getName();
00238 } catch (LogicError &e) {
00239 strategyName="NONE";
00240 }
00241
00242 ostringstream oss;
00243 oss << "\t=== FittingService ===\n";
00244 oss << m_args.print()<<endl;
00245 oss << "Current: \n\tStrategy=" << strategyName << endl;
00246 oss << "\tOptions=" << internal_getFitStrategy().getOptions() << endl;
00247 oss << "\tWorkers=" << workergroup->nWorkers() << endl
00248 << "\t (Busy=" << workergroup->busy() << ")" << endl
00249 << "\tFits Done=" << internal_nFitsDone() << endl
00250 << "\tErrors=" << internal_nFitErrors() << endl
00251 << "\tScans Done=" << m_nScans << endl
00252 << "\tLast scan: " << internal_lastScan() << endl;
00253 oss << "\tQueue=" << workergroup->queueSize();
00254 if (workergroup->isFifo()){
00255 oss << " [FIFO]\n";
00256 }else{
00257 oss << " [FILO]\n";
00258 }
00259 oss << "Debug : " << (workergroup->debug()?"yes":"no") << "\n";
00260
00261 oss << "\nAvailable FitAlgorithms\n\t";
00262 std::list<string> algs = FitAlgorithmMap::instance().listAlgorithms();
00263 for (std::list<string>::const_iterator i=algs.begin(); i!=algs.end(); ++i){
00264 oss << *i << " ";
00265 }
00266
00267 oss << "\n\nAvailable FitStrategies\n\t";
00268 std::list<string> strategies = FitStrategyFactory::instance().listStrategies();
00269 for (std::list<string>::const_iterator i=strategies.begin(); i!=strategies.end(); ++i){
00270 oss << *i << " ";
00271 }
00272
00273 oss << "\n\nTests:\n";
00274 workergroup->printTests(oss);
00275
00276 oss << "\n\nTiming:"<<endl
00277 << "\tI/O time: " << m_ioTimeTaken << endl
00278 << "\tScan time: "<< m_scanTimeTaken << endl
00279 << "\tApprox average time/scan/thread: " << getAverageTimePerScan() << endl;
00280 return oss.str();
00281 }
00282
00283 void Fitter::setDebug(CORBA::Boolean value){
00284 workergroup->setDebug(value);
00285 }
00286
00287 bool Fitter::debug(){
00288 return workergroup->debug();
00289 }
00290
00291 void Fitter::incrementFitErrors() throw() {
00292 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00293 ++m_nFitErrors;
00294 }
00295
00296 void Fitter::incrementFitsDone() throw() {
00297 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00298 ++m_nFitsDone;
00299 }
00300
00301 void Fitter::scanDone(double time) throw() {
00302 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00303 ++m_nScans;
00304 m_scanTimeTaken += time;
00305 }
00306
00307 void Fitter::addIOTime(double time) throw() {
00308 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00309 m_ioTimeTaken += time;
00310 }
00311
00312
00313 double Fitter::getAverageTimePerScan() const throw() {
00314 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00315 double time = (m_ioTimeTaken+m_scanTimeTaken)/workergroup->nWorkers();
00316 if (m_nScans > 0) time /= m_nScans;
00317 return time;
00318 }
00319
00320
00321 long Fitter::internal_nFitsDone() const throw() {
00322 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00323 return m_nFitsDone;
00324 }
00325
00326 long Fitter::internal_nFitErrors() const throw() {
00327 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00328 return m_nFitErrors;
00329 }
00330
00331
00332 long Fitter::nFitsDone() throw() {
00333 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00334 return internal_nFitsDone();
00335 }
00336
00337
00338 long Fitter::nFitErrors() throw() {
00339 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00340 return internal_nFitErrors();
00341 }
00342
00343
00344 long Fitter::queueLength() {
00345 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00346 return workergroup->queueSize();
00347 }
00348
00349
00350 long Fitter::busy() {
00351 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00352 return workergroup->busy();
00353 }
00354
00355 std::string Fitter::internal_lastScan() const throw() {
00356 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00357 return m_scan;
00358 }
00359
00360
00361 char* Fitter::lastScan() throw() {
00362 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00363 return copyStringToCorba(m_scan);
00364 }
00365
00366 bool Fitter::isFifo(){
00367 return workergroup->isFifo();
00368 }
00369
00370 void Fitter::setFifo(bool val){
00371 workergroup->setFifo(val);
00372 }
00373
00374 FitStrategy& Fitter::internal_getFitStrategy() const throw(LogicError) {
00375 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00376 if (!m_fitStrategy)
00377 throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00378 return *m_fitStrategy;
00379 }
00380
00381 void Fitter::internal_setFitStrategy(const string & name) throw(LogicError) {
00382 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00383 m_fitStrategy = FitStrategyFactory::instance().getStrategy(name);
00384 SctNames::Mrs() << "FIT_STRATEGY" << MRS_TEXT(string("set fit strategy to ")+name)
00385 << MRS_INFORMATION << ENDM;
00386 }
00387
00388 }