00001 #include "Fitter.h"
00002 #include "../src/FitStrategy.h"
00003 #include "FitterWorkerGroup.h"
00004 #include "ThresholdFitAlgorithm.h"
00005 #include "ThresholdFitAlgorithm2.h"
00006
00007 #include "Sct/SctNames.h"
00008 #include "Sct/SctParameters.h"
00009
00010 #include <iostream>
00011 #include <vector>
00012 #include <cassert>
00013 #include <ctime>
00014 #include <TMath.h>
00015 #include <boost/timer.hpp>
00016
00017 using namespace std;
00018 using namespace Sct;
00019 using namespace SctData;
00020 using namespace boost;
00021
00022 namespace SctFitter {
00023 void doFits(ISCallbackInfo * isc);
00024
00025 Fitter* Fitter::fitter = 0;
00026
00027 Fitter& Fitter::instance() {
00028 if (!fitter)
00029 return initialize();
00030 return *fitter;
00031 }
00032
00033 Fitter& Fitter::initialize(const string& fitStrategyName) throw(ConfigurationException) {
00034 if (fitter) {
00035 fitter->setFitStrategy(fitStrategyName);
00036 return *fitter;
00037 }
00038 fitter = new Fitter(fitStrategyName);
00039 return *fitter;
00040 }
00041
00042 Fitter::Fitter(const string& fitStrategyName) throw(ConfigurationException) : IPCObject(FitterI_C_FitterInterface_instanceName, &getFitterServer())
00043 ,m_nFitsDone(0), m_nFitErrors(0), m_nScans(0), m_scanTimeTaken(0), m_ioTimeTaken(0), m_scan("No last scan"), m_fitStrategy(0) {
00044
00045 auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00046 if (!ir.get())
00047 throw ConfigurationException("Fitter::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00048 m_infoReceiver = ir;
00049
00050 setFitStrategy(fitStrategyName);
00051 workergroup = new FitterWorkerGroup();
00052 }
00053
00054 Fitter::~Fitter() throw() {}
00055
00056 IPCServer& Fitter::getFitterServer() throw() {
00057 static IPCServer fitterServer(FitterI_C_FitterInterface_serverName, SctNames::getPartition());
00058 return fitterServer;
00059 }
00060
00061
00062
00063 void Fitter::go(unsigned nWorker) throw(IsException) {
00065 ISInfo::Status s=m_infoReceiver->subscribe(SctNames::getEventDataName().c_str(),"SctData::RawScanResult.*", doFits);
00066 if (s!=ISInfo::Success) {
00067 ostringstream os;
00068 os <<"Fitter::go() Could not subscribe to "<<SctNames::getEventDataName();
00069 throw IsException(s, os.str(), __FILE__, __LINE__);
00070 }
00071 workergroup->go(nWorker);
00072
00073
00074
00075 }
00076
00077
00078 void Fitter::doFits(ISCallbackInfo * isc) throw(IsException, LogicError) {
00079
00080 if (isc->reason() != ISInfoCreated && isc->reason() != ISInfoUpdated )
00081 return;
00082
00083
00084 Fitter::instance().workergroup->push(isc->name());
00085 }
00086
00087 void Fitter::fit(FitterIStatus* status, char* name) throw() {
00088
00089 workergroup->push(name);
00090 status->returnCode=FitterIReply_Success;
00091 }
00092
00093 void Fitter::fitAll(FitterIStatus* status) throw() {
00094
00095 ISInfoIterator iter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), "SctData::RawScanResult.*");
00096 while (iter()) {
00097
00098 workergroup->push(iter.name());
00099 status->returnCode=FitterIReply_Success;
00100 }
00101 }
00102
00103 char* Fitter::getFitOptions(FitterIStatus* status) throw() {
00104 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00105 string options = "";
00106 try {
00107 options = getFitStrategy().getOptions();
00108 } catch (LogicError &e) {
00109
00110 }
00111 status->returnCode = FitterIReply_Success;
00112 char *newOpt=new char[options.length()+1];
00113 strcpy(newOpt, options.c_str());
00114 return newOpt;
00115 }
00116
00117 char* Fitter::status(FitterIStatus* stat) throw() {
00118 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00119 stat->returnCode = FitterIReply_Success;
00120
00121 string msg = status();
00122 char *statusMessage = new char[msg.length()+1];
00123 strcpy(statusMessage, msg.c_str());
00124 return statusMessage;
00125 }
00126
00127 void Fitter::setFitOptions(string opt) throw (LogicError) {
00128 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00129 getFitStrategy().setOptions(opt);
00130 }
00131
00132 void Fitter::setFitOptions(FitterIStatus* status, char *n) throw() {
00133 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00134 string name( n );
00135 try {
00136 setFitOptions(n);
00137 } catch (LogicError &e) {
00138 return;
00139 }
00140 status->returnCode = FitterIReply_Success;
00141 }
00142
00143 void Fitter::useAnalyticAlgorithm (FitterIStatus *_status, ilu_Boolean use) {
00144 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00145 if (use) {
00146 ThresholdFitAlgorithm2::putInMap();
00147 } else {
00148 ThresholdFitAlgorithm::putInMap();
00149 }
00150 }
00151
00152 bool Fitter::isUsingAnalyticAlgorithm() const {
00153 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00154 return ThresholdFitAlgorithm2::isInMap();
00155 }
00156
00157 ilu_Boolean Fitter::isUsingAnalyticAlgorithm(FitterIStatus *_status) {
00158 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00159 _status->returnCode = FitterIReply_Success;
00160 return isUsingAnalyticAlgorithm();
00161 }
00162
00163 char* Fitter::getFitStrategy(FitterIStatus* status) throw() {
00164 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00165 string strat;
00166 try {
00167 strat = getFitStrategy().getName();
00168 } catch (LogicError &e) {
00169
00170 }
00171 status->returnCode = FitterIReply_Success;
00172 char *newOpt=new char[strat.length()+1];
00173 strcpy(newOpt, strat.c_str());
00174 return newOpt;
00175 }
00176
00177 void Fitter::setFitStrategy(FitterIStatus* status, char *n) throw() {
00178 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00179 string name( n );
00180 cout <<"trying to set fit strategy "<<name<<endl;
00181 try {
00182 setFitStrategy(name);
00183 } catch (LogicError& e) {
00184 cout <<"failed to set fit strategy "<<name<<endl;
00185 return;
00186 }
00187 cout <<"set fit strategy "<<name<<endl;
00188 status->returnCode = FitterIReply_Success;
00189 }
00190
00191 const char* Fitter::status() const throw() {
00192 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00193 string strategyName;
00194 try {
00195 strategyName=getFitStrategy().getName();
00196 } catch (LogicError &e) {
00197 strategyName="NONE";
00198 }
00199 ostringstream oss;
00200 oss << "Fitter Strategy=" << strategyName << " Fit Options=" << getFitStrategy().getOptions()
00201 << " Using analytic fits=" << isUsingAnalyticAlgorithm() << endl;
00202 oss << "Workers=" << workergroup->nWorkers();
00203 oss << ", (Busy=" << workergroup->busy() << "), Fits Done=" << nFitsDone() << ", Errors=";
00204 oss << nFitErrors() << ", Scans Done=" << m_nScans << "\nLast scan: " << lastScan();
00205 oss << "\n Timing. I/O time: " << m_ioTimeTaken << " Scan time: ";
00206 oss << m_scanTimeTaken << " Approx average time/scan/thread: " << getAverageTimePerScan() << endl;
00207 m_status = oss.str();
00208 return m_status.c_str();
00209 }
00210
00211 void Fitter::incrementFitErrors() throw() {
00212 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00213 ++m_nFitErrors;
00214 }
00215
00216 void Fitter::incrementFitsDone() throw() {
00217 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00218 ++m_nFitsDone;
00219 }
00220
00221 void Fitter::scanDone(double time) throw() {
00222 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00223 ++m_nScans;
00224 m_scanTimeTaken += time;
00225 }
00226
00227 void Fitter::addIOTime(double time) throw() {
00228 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00229 m_ioTimeTaken += time;
00230 }
00231
00232
00233 double Fitter::getAverageTimePerScan() const throw() {
00234 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00235 double time = (m_ioTimeTaken+m_scanTimeTaken)/workergroup->nWorkers();
00236 if (m_nScans > 0) time /= m_nScans;
00237 return time;
00238 }
00239
00240 long Fitter::nFitsDone() const throw() {
00241 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00242 return m_nFitsDone;
00243 }
00244
00245 long Fitter::nFitErrors() const throw() {
00246 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00247 return m_nFitErrors;
00248 }
00249
00250 long Fitter::nFitsDone(FitterIStatus* status) throw() {
00251 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00252 status->returnCode = FitterIReply_Success;
00253 return this->nFitsDone();
00254 }
00255
00256 long Fitter::nFitErrors(FitterIStatus* status) throw() {
00257 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00258 status->returnCode = FitterIReply_Success;
00259 return this->nFitErrors();
00260 }
00261
00262 long Fitter::queueLength(FitterIStatus* status) {
00263 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00264 status->returnCode = FitterIReply_Success;
00265 return workergroup->queueSize();
00266 }
00267
00268 long Fitter::busy(FitterIStatus* status) {
00269 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00270 status->returnCode = FitterIReply_Success;
00271 return workergroup->busy();
00272 }
00273
00274 const char* Fitter::lastScan() const throw() {
00275 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00276 return m_scan.c_str();
00277 }
00278
00279 char* Fitter::lastScan(FitterIStatus* status) throw() {
00280 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00281 char * message = new char[m_scan.length()+1];
00282 strcpy(message, m_scan.c_str() );
00283 status->returnCode = FitterIReply_Success;
00284 return message;
00285 }
00286
00287 FitStrategy& Fitter::getFitStrategy() const throw(LogicError) {
00288 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00289 if (!m_fitStrategy)
00290 throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00291 return *m_fitStrategy;
00292 }
00293
00294 void Fitter::setFitStrategy(string name) throw(LogicError) {
00295 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00296 m_fitStrategy = FitStrategyFactory::instance().getStrategy(name);
00297 }
00298
00299 }