Main Page   Modules   Namespace List   Class Hierarchy   Data Structures   File List   Namespace Members   Data Fields   Globals   Related Pages  

Fitter.cpp

Go to the documentation of this file.
00001 #include "Fitter.h"
00002 #include "../src/FitStrategy.h"
00003 #include "FitterWorkerGroup.h"
00004 #include "ThresholdFitAlgorithm.h"
00005 #include "ThresholdFitAlgorithm2.h"
00006 
00007 #include "SctData/RawScanResult.h"
00008 #include "Sct/IS/IOManagerIS.h"
00009 #include "Sct/SctNames.h"
00010 #include "Sct/SctParameters.h"
00011 #include "Sct/Timer.h"
00012 
00013 #include <iostream>
00014 #include <vector>
00015 #include <cassert>
00016 #include <ctime>
00017 #include <TMath.h>
00018 
00019 using namespace std;
00020 using namespace Sct;
00021 using namespace Sct::IS;
00022 using namespace SctData;
00023 using namespace boost;
00024 
00025 namespace SctFitter {
00026 void doFits(ISCallbackInfo * isc);
00027 
00028 Fitter* Fitter::fitter = 0;
00029 
00030 Fitter& Fitter::instance() {
00031     if (!fitter)
00032         return initialize();
00033     return *fitter;
00034 }
00035 
00036 Fitter& Fitter::initialize(const string& fitStrategyName) throw(ConfigurationException) {
00037     if (fitter) {
00038         fitter->setFitStrategy(fitStrategyName);
00039         return *fitter;
00040     }
00041     fitter = new Fitter(fitStrategyName);
00042     return *fitter;
00043 }
00044 
00045 Fitter::Fitter(const string& fitStrategyName) throw(ConfigurationException) : IPCObject(FitterI_C_FitterInterface_instanceName, &getFitterServer())
00046     ,nFitsDone(0), nFitErrors(0), nScans(0), scanTimeTaken(0), ioTimeTaken(0), m_scan("No last scan"), fitStrategy(0) {
00047 
00048     auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00049     if (!ir.get())
00050         throw ConfigurationException("Fitter::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00051     infoReceiver  = ir;
00052 
00053     setFitStrategy(fitStrategyName);
00054     workergroup = new FitterWorkerGroup();
00055 }
00056 
00057 Fitter::~Fitter() throw() {}
00058 
00059 IPCServer& Fitter::getFitterServer() throw() {
00060     static IPCServer fitterServer(FitterI_C_FitterInterface_serverName, SctNames::getPartition());
00061     return fitterServer;
00062 }
00063 
00064 
00065 
00066 void Fitter::go(unsigned nWorker) throw(IsException) {
00068     ISInfo::Status s=infoReceiver->subscribe(SctNames::getEventDataName().c_str(),"SctData::RawScanResult.*", doFits);
00069     if (s!=ISInfo::Success) {
00070         ostringstream os;
00071         os <<"Fitter::go() Could not subscribe to "<<SctNames::getEventDataName();
00072         throw IsException(s, os.str(), __FILE__, __LINE__);
00073     }
00074     workergroup->go(nWorker);
00075 
00076     //Not done since Fitter is an IPC server, and so IPCServer.run() is called elsewhere.
00077     //infoReceiver->run();
00078 }
00079 
00080 // The infoReceiver callback function must take a static function as an argument:
00081 void Fitter::doFits(ISCallbackInfo * isc) throw(IsException, LogicError) {
00082     using namespace SctData;
00083     try {
00084     double start = timer();
00085         // find out why the callback function has been called:
00086         if (isc->reason() != ISInfoCreated && isc->reason() != ISInfoUpdated )
00087             return;
00088 
00089         shared_ptr<const Serializable> ob ( IOManagerIS::instance().read(*isc) );
00090         shared_ptr<const RawScanResult> raw = dynamic_pointer_cast<const RawScanResult>(ob);
00091         if (!raw)
00092             throw InvalidArgumentError("Fitter::doFits() not a RawScanResult", __FILE__, __LINE__);
00093         // put it in the queue
00094         Fitter::instance().workergroup->push(raw);
00095     Fitter::instance().addIOTime(timer() - start);
00096     } catch (Throwable& e) {
00097         e.sendToMrs(MRS_ERROR);
00098     }
00099 }
00100 
00101 void Fitter::ipcFit(FitterIStatus* status, char* name) throw() {
00102     try {
00103     double start = timer();
00104         shared_ptr<const Serializable> ob = IOManagerIS::instance().read(string(name));
00105         shared_ptr<const RawScanResult> raw = dynamic_pointer_cast<const RawScanResult>(ob);
00106         if (!raw)
00107             throw InvalidArgumentError("Fitter::ipcFitAll() not a RawScanResult", __FILE__, __LINE__);
00108         // put it in the queue
00109         workergroup->push(raw);
00110         status->returnCode=FitterIReply_Success;
00111     addIOTime(timer() - start);
00112     } catch (Throwable& e) {
00113         e.sendToMrs(MRS_ERROR);
00114         return;
00115     }
00116 }
00117 void Fitter::ipcFitAll(FitterIStatus* status) throw() {
00118 
00119     ISInfoIterator iter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), "SctData::RawScanResult.*");
00120     while (iter()) {
00121         try {
00122         double start = timer();
00123             shared_ptr<const Serializable> ob = IOManagerIS::instance().read(iter);
00124             shared_ptr<const RawScanResult> raw = dynamic_pointer_cast<const RawScanResult>(ob);
00125             if (!raw)
00126                 throw InvalidArgumentError("Fitter::ipcFitAll() not a RawScanResult", __FILE__, __LINE__);
00127             // put it in the queue
00128             workergroup->push(raw);
00129             status->returnCode=FitterIReply_Success;
00130         addIOTime(timer() - start);
00131         } catch (Throwable& e) {
00132             e.sendToMrs(MRS_ERROR);
00133             return;
00134         }
00135     }
00136 }
00137 
00138 char* Fitter::ipcGetFitOptions(FitterIStatus* status) throw() {
00139     string options = "";
00140     try {
00141         options = getFitStrategy().getOptions();        
00142     } catch (LogicError &e) {
00143     //Um don't do anything!
00144     }
00145     status->returnCode = FitterIReply_Success;
00146     char *newOpt=new char[options.length()+1];
00147     strcpy(newOpt, options.c_str());
00148     return newOpt;
00149 }
00150 
00151 char* Fitter::ipcFitterStatus(FitterIStatus* status) throw() {
00152     status->returnCode = FitterIReply_Success;
00153     // Have to make a new char* because IPC wants to delete it after use!
00154     string msg = getStatus();
00155     char *statusMessage = new char[msg.length()+1];
00156     strcpy(statusMessage, msg.c_str());
00157     return statusMessage;
00158 }
00159 
00160 void Fitter::setFitOptions(string opt) throw (LogicError) {
00161     getFitStrategy().setOptions(opt);
00162 }
00163 
00164 void Fitter::ipcSetFitOptions(FitterIStatus* status, char *n) throw() {
00165     string name( n );
00166     try {
00167         setFitOptions(n);
00168     } catch (LogicError &e) {
00169         return;
00170     }
00171     status->returnCode = FitterIReply_Success;
00172 }
00173 
00174 void Fitter::ipcUseAnalyticAlgorithm (FitterIStatus *_status, ilu_Boolean use) {
00175     if (use) {
00176     ThresholdFitAlgorithm2::putInMap();
00177     } else {
00178     ThresholdFitAlgorithm::putInMap();
00179     }
00180 }
00181 
00182 bool Fitter::isUsingAnalyticAlgorithm() const {
00183     return ThresholdFitAlgorithm2::isInMap();
00184 }
00185 
00186 ilu_Boolean Fitter::ipcIsUsingAnalyticAlgorithm(FitterIStatus *_status) {
00187     _status->returnCode = FitterIReply_Success;
00188     return isUsingAnalyticAlgorithm();
00189 }
00190 
00191 char* Fitter::ipcGetFitStrategy(FitterIStatus* status) throw() {
00192     string strat;
00193     try {
00194         strat = getFitStrategy().getName();
00195     } catch (LogicError &e) {
00196     //Do nothing!
00197     }    
00198     status->returnCode = FitterIReply_Success;
00199     char *newOpt=new char[strat.length()+1];
00200     strcpy(newOpt, strat.c_str());
00201     return newOpt;
00202 }
00203 
00204 void Fitter::ipcSetFitStrategy(FitterIStatus* status, char *n) throw() {
00205     string name( n );
00206     cout <<"trying to set fit strategy "<<name<<endl;
00207     try {
00208         setFitStrategy(name);
00209     } catch (LogicError& e) {
00210         cout <<"failed  to set fit strategy "<<name<<endl;
00211         return;
00212     }
00213     cout <<"set fit strategy "<<name<<endl;
00214     status->returnCode = FitterIReply_Success;
00215 }
00216 
00217 const char* Fitter::getStatus() const throw() {
00218     string strategyName;
00219     try {
00220         strategyName=getFitStrategy().getName();
00221     } catch (LogicError &e) {
00222         strategyName="NONE";
00223     }
00224     ostringstream oss;
00225     oss << "Fitter Strategy=" << strategyName <<  " Fit Options=" << getFitStrategy().getOptions()
00226     << " Using analytic fits=" << isUsingAnalyticAlgorithm() << endl;
00227     oss << "Workers=" << workergroup->nWorkers();
00228     oss << ", (Busy=" << workergroup->busy() << "), Fits Done=" << getNFitsDone() << ", Errors=";
00229     oss << getNFitErrors() << ", Scans Done=" << nScans << "\nLast scan: " << lastScan(); 
00230     oss << "\n Timing.  Read I/O time: " << ioTimeTaken << " Scan time: "; 
00231     oss << scanTimeTaken << " Approx average time/scan/thread: " << getAverageTimePerScan() << endl;
00232     status = oss.str();
00233     return status.c_str();
00234 }
00235 
00236 void Fitter::incrementFitErrors() throw() {
00237     mutex::scoped_lock scoped_lock(counterMutex);
00238     ++nFitErrors;
00239 }
00240 
00241 void Fitter::incrementFitsDone() throw() {
00242     mutex::scoped_lock scoped_lock(counterMutex);
00243     ++nFitsDone;
00244 }
00245 
00246 void Fitter::scanDone(double time) throw() {
00247     mutex::scoped_lock scoped_lock(counterMutex);
00248     ++nScans;
00249     scanTimeTaken += time;
00250 }
00251 
00252 void Fitter::addIOTime(double time) throw() {
00253     mutex::scoped_lock scoped_lock(counterMutex);
00254     ioTimeTaken += time;
00255 }
00256 
00257 
00258 double Fitter::getAverageTimePerScan() const throw() {
00259     double time = ioTimeTaken/(1+workergroup->nWorkers()) + scanTimeTaken/workergroup->nWorkers();
00260     if (nScans > 0) time /= nScans;     
00261     return time;
00262 }
00263 
00264 long Fitter::getNFitsDone() const throw() {
00265     return nFitsDone;
00266 }
00267 
00268 long Fitter::getNFitErrors() const throw() {
00269     return nFitErrors;
00270 }
00271 
00272 long Fitter::ipcNFitsDone(FitterIStatus* status) throw() {
00273     status->returnCode = FitterIReply_Success;
00274     return this->getNFitsDone();
00275 }
00276 
00277 long Fitter::ipcNFitErrors(FitterIStatus* status) throw() {
00278     status->returnCode = FitterIReply_Success;
00279     return this->getNFitErrors();
00280 }
00281 
00282 const char* Fitter::lastScan() const throw() {
00283     return m_scan.c_str();
00284 }
00285 char* Fitter::ipcLastScan(FitterIStatus* status) throw() {    
00286     char * message = new char[m_scan.length()+1];
00287     strcpy(message, m_scan.c_str() );
00288     status->returnCode = FitterIReply_Success;
00289     return message;
00290 }
00291 
00292 FitStrategy& Fitter::getFitStrategy() const throw(LogicError) {
00293     if (!fitStrategy)
00294         throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00295     return *fitStrategy;
00296 }
00297 
00298 void Fitter::setFitStrategy(string name) throw(LogicError) {
00299     fitStrategy = FitStrategyFactory::instance().getStrategy(name);
00300 }
00301 
00302 } // end of namespace SctFitter

Generated on Mon Dec 15 19:36:02 2003 for SCT DAQ/DCS Software by doxygen1.3-rc3