00001 #include "Fitter.h"
00002 #include "../src/FitStrategy.h"
00003 #include "FitterWorkerGroup.h"
00004 #include "ThresholdFitAlgorithm.h"
00005 #include "ThresholdFitAlgorithm2.h"
00006
00007 #include "SctData/RawScanResult.h"
00008 #include "Sct/IS/IOManagerIS.h"
00009 #include "Sct/SctNames.h"
00010 #include "Sct/SctParameters.h"
00011 #include "Sct/Timer.h"
00012
00013 #include <iostream>
00014 #include <vector>
00015 #include <cassert>
00016 #include <ctime>
00017 #include <TMath.h>
00018
00019 using namespace std;
00020 using namespace Sct;
00021 using namespace Sct::IS;
00022 using namespace SctData;
00023 using namespace boost;
00024
00025 namespace SctFitter {
00026 void doFits(ISCallbackInfo * isc);
00027
00028 Fitter* Fitter::fitter = 0;
00029
00030 Fitter& Fitter::instance() {
00031 if (!fitter)
00032 return initialize();
00033 return *fitter;
00034 }
00035
00036 Fitter& Fitter::initialize(const string& fitStrategyName) throw(ConfigurationException) {
00037 if (fitter) {
00038 fitter->setFitStrategy(fitStrategyName);
00039 return *fitter;
00040 }
00041 fitter = new Fitter(fitStrategyName);
00042 return *fitter;
00043 }
00044
00045 Fitter::Fitter(const string& fitStrategyName) throw(ConfigurationException) : IPCObject(FitterI_C_FitterInterface_instanceName, &getFitterServer())
00046 ,nFitsDone(0), nFitErrors(0), nScans(0), scanTimeTaken(0), ioTimeTaken(0), m_scan("No last scan"), fitStrategy(0) {
00047
00048 auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00049 if (!ir.get())
00050 throw ConfigurationException("Fitter::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00051 infoReceiver = ir;
00052
00053 setFitStrategy(fitStrategyName);
00054 workergroup = new FitterWorkerGroup();
00055 }
00056
00057 Fitter::~Fitter() throw() {}
00058
00059 IPCServer& Fitter::getFitterServer() throw() {
00060 static IPCServer fitterServer(FitterI_C_FitterInterface_serverName, SctNames::getPartition());
00061 return fitterServer;
00062 }
00063
00064
00065
00066 void Fitter::go(unsigned nWorker) throw(IsException) {
00068 ISInfo::Status s=infoReceiver->subscribe(SctNames::getEventDataName().c_str(),"SctData::RawScanResult.*", doFits);
00069 if (s!=ISInfo::Success) {
00070 ostringstream os;
00071 os <<"Fitter::go() Could not subscribe to "<<SctNames::getEventDataName();
00072 throw IsException(s, os.str(), __FILE__, __LINE__);
00073 }
00074 workergroup->go(nWorker);
00075
00076
00077
00078 }
00079
00080
00081 void Fitter::doFits(ISCallbackInfo * isc) throw(IsException, LogicError) {
00082 using namespace SctData;
00083 try {
00084 double start = timer();
00085
00086 if (isc->reason() != ISInfoCreated && isc->reason() != ISInfoUpdated )
00087 return;
00088
00089 shared_ptr<const Serializable> ob ( IOManagerIS::instance().read(*isc) );
00090 shared_ptr<const RawScanResult> raw = dynamic_pointer_cast<const RawScanResult>(ob);
00091 if (!raw)
00092 throw InvalidArgumentError("Fitter::doFits() not a RawScanResult", __FILE__, __LINE__);
00093
00094 Fitter::instance().workergroup->push(raw);
00095 Fitter::instance().addIOTime(timer() - start);
00096 } catch (Throwable& e) {
00097 e.sendToMrs(MRS_ERROR);
00098 }
00099 }
00100
00101 void Fitter::ipcFit(FitterIStatus* status, char* name) throw() {
00102 try {
00103 double start = timer();
00104 shared_ptr<const Serializable> ob = IOManagerIS::instance().read(string(name));
00105 shared_ptr<const RawScanResult> raw = dynamic_pointer_cast<const RawScanResult>(ob);
00106 if (!raw)
00107 throw InvalidArgumentError("Fitter::ipcFitAll() not a RawScanResult", __FILE__, __LINE__);
00108
00109 workergroup->push(raw);
00110 status->returnCode=FitterIReply_Success;
00111 addIOTime(timer() - start);
00112 } catch (Throwable& e) {
00113 e.sendToMrs(MRS_ERROR);
00114 return;
00115 }
00116 }
00117 void Fitter::ipcFitAll(FitterIStatus* status) throw() {
00118
00119 ISInfoIterator iter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), "SctData::RawScanResult.*");
00120 while (iter()) {
00121 try {
00122 double start = timer();
00123 shared_ptr<const Serializable> ob = IOManagerIS::instance().read(iter);
00124 shared_ptr<const RawScanResult> raw = dynamic_pointer_cast<const RawScanResult>(ob);
00125 if (!raw)
00126 throw InvalidArgumentError("Fitter::ipcFitAll() not a RawScanResult", __FILE__, __LINE__);
00127
00128 workergroup->push(raw);
00129 status->returnCode=FitterIReply_Success;
00130 addIOTime(timer() - start);
00131 } catch (Throwable& e) {
00132 e.sendToMrs(MRS_ERROR);
00133 return;
00134 }
00135 }
00136 }
00137
00138 char* Fitter::ipcGetFitOptions(FitterIStatus* status) throw() {
00139 string options = "";
00140 try {
00141 options = getFitStrategy().getOptions();
00142 } catch (LogicError &e) {
00143
00144 }
00145 status->returnCode = FitterIReply_Success;
00146 char *newOpt=new char[options.length()+1];
00147 strcpy(newOpt, options.c_str());
00148 return newOpt;
00149 }
00150
00151 char* Fitter::ipcFitterStatus(FitterIStatus* status) throw() {
00152 status->returnCode = FitterIReply_Success;
00153
00154 string msg = getStatus();
00155 char *statusMessage = new char[msg.length()+1];
00156 strcpy(statusMessage, msg.c_str());
00157 return statusMessage;
00158 }
00159
00160 void Fitter::setFitOptions(string opt) throw (LogicError) {
00161 getFitStrategy().setOptions(opt);
00162 }
00163
00164 void Fitter::ipcSetFitOptions(FitterIStatus* status, char *n) throw() {
00165 string name( n );
00166 try {
00167 setFitOptions(n);
00168 } catch (LogicError &e) {
00169 return;
00170 }
00171 status->returnCode = FitterIReply_Success;
00172 }
00173
00174 void Fitter::ipcUseAnalyticAlgorithm (FitterIStatus *_status, ilu_Boolean use) {
00175 if (use) {
00176 ThresholdFitAlgorithm2::putInMap();
00177 } else {
00178 ThresholdFitAlgorithm::putInMap();
00179 }
00180 }
00181
00182 bool Fitter::isUsingAnalyticAlgorithm() const {
00183 return ThresholdFitAlgorithm2::isInMap();
00184 }
00185
00186 ilu_Boolean Fitter::ipcIsUsingAnalyticAlgorithm(FitterIStatus *_status) {
00187 _status->returnCode = FitterIReply_Success;
00188 return isUsingAnalyticAlgorithm();
00189 }
00190
00191 char* Fitter::ipcGetFitStrategy(FitterIStatus* status) throw() {
00192 string strat;
00193 try {
00194 strat = getFitStrategy().getName();
00195 } catch (LogicError &e) {
00196
00197 }
00198 status->returnCode = FitterIReply_Success;
00199 char *newOpt=new char[strat.length()+1];
00200 strcpy(newOpt, strat.c_str());
00201 return newOpt;
00202 }
00203
00204 void Fitter::ipcSetFitStrategy(FitterIStatus* status, char *n) throw() {
00205 string name( n );
00206 cout <<"trying to set fit strategy "<<name<<endl;
00207 try {
00208 setFitStrategy(name);
00209 } catch (LogicError& e) {
00210 cout <<"failed to set fit strategy "<<name<<endl;
00211 return;
00212 }
00213 cout <<"set fit strategy "<<name<<endl;
00214 status->returnCode = FitterIReply_Success;
00215 }
00216
00217 const char* Fitter::getStatus() const throw() {
00218 string strategyName;
00219 try {
00220 strategyName=getFitStrategy().getName();
00221 } catch (LogicError &e) {
00222 strategyName="NONE";
00223 }
00224 ostringstream oss;
00225 oss << "Fitter Strategy=" << strategyName << " Fit Options=" << getFitStrategy().getOptions()
00226 << " Using analytic fits=" << isUsingAnalyticAlgorithm() << endl;
00227 oss << "Workers=" << workergroup->nWorkers();
00228 oss << ", (Busy=" << workergroup->busy() << "), Fits Done=" << getNFitsDone() << ", Errors=";
00229 oss << getNFitErrors() << ", Scans Done=" << nScans << "\nLast scan: " << lastScan();
00230 oss << "\n Timing. Read I/O time: " << ioTimeTaken << " Scan time: ";
00231 oss << scanTimeTaken << " Approx average time/scan/thread: " << getAverageTimePerScan() << endl;
00232 status = oss.str();
00233 return status.c_str();
00234 }
00235
00236 void Fitter::incrementFitErrors() throw() {
00237 mutex::scoped_lock scoped_lock(counterMutex);
00238 ++nFitErrors;
00239 }
00240
00241 void Fitter::incrementFitsDone() throw() {
00242 mutex::scoped_lock scoped_lock(counterMutex);
00243 ++nFitsDone;
00244 }
00245
00246 void Fitter::scanDone(double time) throw() {
00247 mutex::scoped_lock scoped_lock(counterMutex);
00248 ++nScans;
00249 scanTimeTaken += time;
00250 }
00251
00252 void Fitter::addIOTime(double time) throw() {
00253 mutex::scoped_lock scoped_lock(counterMutex);
00254 ioTimeTaken += time;
00255 }
00256
00257
00258 double Fitter::getAverageTimePerScan() const throw() {
00259 double time = ioTimeTaken/(1+workergroup->nWorkers()) + scanTimeTaken/workergroup->nWorkers();
00260 if (nScans > 0) time /= nScans;
00261 return time;
00262 }
00263
00264 long Fitter::getNFitsDone() const throw() {
00265 return nFitsDone;
00266 }
00267
00268 long Fitter::getNFitErrors() const throw() {
00269 return nFitErrors;
00270 }
00271
00272 long Fitter::ipcNFitsDone(FitterIStatus* status) throw() {
00273 status->returnCode = FitterIReply_Success;
00274 return this->getNFitsDone();
00275 }
00276
00277 long Fitter::ipcNFitErrors(FitterIStatus* status) throw() {
00278 status->returnCode = FitterIReply_Success;
00279 return this->getNFitErrors();
00280 }
00281
00282 const char* Fitter::lastScan() const throw() {
00283 return m_scan.c_str();
00284 }
00285 char* Fitter::ipcLastScan(FitterIStatus* status) throw() {
00286 char * message = new char[m_scan.length()+1];
00287 strcpy(message, m_scan.c_str() );
00288 status->returnCode = FitterIReply_Success;
00289 return message;
00290 }
00291
00292 FitStrategy& Fitter::getFitStrategy() const throw(LogicError) {
00293 if (!fitStrategy)
00294 throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00295 return *fitStrategy;
00296 }
00297
00298 void Fitter::setFitStrategy(string name) throw(LogicError) {
00299 fitStrategy = FitStrategyFactory::instance().getStrategy(name);
00300 }
00301
00302 }