Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Data Structures | File List | Namespace Members | Data Fields | Related Pages

Fitter.cpp

00001 #include "Fitter.h"
00002 #include "../src/FitStrategy.h"
00003 #include "FitterWorkerGroup.h"
00004 #include "ThresholdFitAlgorithm.h"
00005 #include "ThresholdFitAlgorithm2.h"
00006 
00007 #include "Sct/SctNames.h"
00008 #include "Sct/SctParameters.h"
00009 
00010 #include <iostream>
00011 #include <vector>
00012 #include <cassert>
00013 #include <ctime>
00014 #include <TMath.h>
00015 #include <boost/timer.hpp>
00016 
00017 using namespace std;
00018 using namespace Sct;
00019 using namespace SctData;
00020 using namespace boost;
00021 
00022 namespace SctFitter {
00023 void doFits(ISCallbackInfo * isc);
00024 
00025 Fitter* Fitter::fitter = 0;
00026 
00027 Fitter& Fitter::instance() {
00028     if (!fitter)
00029         return initialize();
00030     return *fitter;
00031 }
00032 
00033 Fitter& Fitter::initialize(const string& fitStrategyName) throw(ConfigurationException) {
00034     if (fitter) {
00035         fitter->setFitStrategy(fitStrategyName);
00036         return *fitter;
00037     }
00038     fitter = new Fitter(fitStrategyName);
00039     return *fitter;
00040 }
00041 
00042 Fitter::Fitter(const string& fitStrategyName) throw(ConfigurationException) : IPCObject(FitterI_C_FitterInterface_instanceName, &getFitterServer())
00043     ,m_nFitsDone(0), m_nFitErrors(0), m_nScans(0), m_scanTimeTaken(0), m_ioTimeTaken(0), m_scan("No last scan"), m_fitStrategy(0) {
00044 
00045     auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00046     if (!ir.get())
00047         throw ConfigurationException("Fitter::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00048     m_infoReceiver  = ir;
00049 
00050     setFitStrategy(fitStrategyName);
00051     workergroup = new FitterWorkerGroup();
00052 }
00053 
00054 Fitter::~Fitter() throw() {}
00055 
00056 IPCServer& Fitter::getFitterServer() throw() {
00057     static IPCServer fitterServer(FitterI_C_FitterInterface_serverName, SctNames::getPartition());
00058     return fitterServer;
00059 }
00060 
00061 
00062 
00063 void Fitter::go(unsigned nWorker) throw(IsException) {
00065     ISInfo::Status s=m_infoReceiver->subscribe(SctNames::getEventDataName().c_str(),"SctData::RawScanResult.*", doFits);
00066     if (s!=ISInfo::Success) {
00067         ostringstream os;
00068         os <<"Fitter::go() Could not subscribe to "<<SctNames::getEventDataName();
00069         throw IsException(s, os.str(), __FILE__, __LINE__);
00070     }
00071     workergroup->go(nWorker);
00072 
00073     //Not done since Fitter is an IPC server, and so IPCServer.run() is called elsewhere.
00074     //infoReceiver->run();
00075 }
00076 
00077 // The infoReceiver callback function must take a static function as an argument:
00078 void Fitter::doFits(ISCallbackInfo * isc) throw(IsException, LogicError) {
00079     // find out why the callback function has been called:
00080     if (isc->reason() != ISInfoCreated && isc->reason() != ISInfoUpdated )
00081     return;
00082     
00083     // put it in the queue
00084     Fitter::instance().workergroup->push(isc->name());
00085 }
00086 
00087 void Fitter::fit(FitterIStatus* status, char* name) throw() {
00088     // put it in the queue
00089     workergroup->push(name);
00090     status->returnCode=FitterIReply_Success;
00091 }
00092 
00093 void Fitter::fitAll(FitterIStatus* status) throw() {
00094 
00095     ISInfoIterator iter(SctNames::getPartition(), SctNames::getEventDataName().c_str(), "SctData::RawScanResult.*");
00096     while (iter()) {
00097     // put it in the queue
00098     workergroup->push(iter.name());
00099     status->returnCode=FitterIReply_Success;
00100     }
00101 }
00102 
00103 char* Fitter::getFitOptions(FitterIStatus* status) throw() {
00104     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00105     string options = "";
00106     try {
00107         options = getFitStrategy().getOptions();        
00108     } catch (LogicError &e) {
00109     //Um don't do anything!
00110     }
00111     status->returnCode = FitterIReply_Success;
00112     char *newOpt=new char[options.length()+1];
00113     strcpy(newOpt, options.c_str());
00114     return newOpt;
00115 }
00116 
00117 char* Fitter::status(FitterIStatus* stat) throw() {
00118     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00119     stat->returnCode = FitterIReply_Success;
00120     // Have to make a new char* because IPC wants to delete it after use!
00121     string msg = status();
00122     char *statusMessage = new char[msg.length()+1];
00123     strcpy(statusMessage, msg.c_str());
00124     return statusMessage;
00125 }
00126 
00127 void Fitter::setFitOptions(string opt) throw (LogicError) {
00128     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00129     getFitStrategy().setOptions(opt);
00130 }
00131 
00132 void Fitter::setFitOptions(FitterIStatus* status, char *n) throw() {
00133     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00134     string name( n );
00135     try {
00136         setFitOptions(n);
00137     } catch (LogicError &e) {
00138         return;
00139     }
00140     status->returnCode = FitterIReply_Success;
00141 }
00142 
00143 void Fitter::useAnalyticAlgorithm (FitterIStatus *_status, ilu_Boolean use) {
00144     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00145     if (use) {
00146     ThresholdFitAlgorithm2::putInMap();
00147     } else {
00148     ThresholdFitAlgorithm::putInMap();
00149     }
00150 }
00151 
00152 bool Fitter::isUsingAnalyticAlgorithm() const {
00153     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00154     return ThresholdFitAlgorithm2::isInMap();
00155 }
00156 
00157 ilu_Boolean Fitter::isUsingAnalyticAlgorithm(FitterIStatus *_status) {
00158     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00159     _status->returnCode = FitterIReply_Success;
00160     return isUsingAnalyticAlgorithm();
00161 }
00162 
00163 char* Fitter::getFitStrategy(FitterIStatus* status) throw() {
00164     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00165     string strat;
00166     try {
00167         strat = getFitStrategy().getName();
00168     } catch (LogicError &e) {
00169     //Do nothing!
00170     }    
00171     status->returnCode = FitterIReply_Success;
00172     char *newOpt=new char[strat.length()+1];
00173     strcpy(newOpt, strat.c_str());
00174     return newOpt;
00175 }
00176 
00177 void Fitter::setFitStrategy(FitterIStatus* status, char *n) throw() {
00178     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00179     string name( n );
00180     cout <<"trying to set fit strategy "<<name<<endl;
00181     try {
00182         setFitStrategy(name);
00183     } catch (LogicError& e) {
00184         cout <<"failed  to set fit strategy "<<name<<endl;
00185         return;
00186     }
00187     cout <<"set fit strategy "<<name<<endl;
00188     status->returnCode = FitterIReply_Success;
00189 }
00190 
00191 const char* Fitter::status() const throw() {
00192     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00193     string strategyName;
00194     try {
00195         strategyName=getFitStrategy().getName();
00196     } catch (LogicError &e) {
00197         strategyName="NONE";
00198     }
00199     ostringstream oss;
00200     oss << "Fitter Strategy=" << strategyName <<  " Fit Options=" << getFitStrategy().getOptions()
00201     << " Using analytic fits=" << isUsingAnalyticAlgorithm() << endl;
00202     oss << "Workers=" << workergroup->nWorkers();
00203     oss << ", (Busy=" << workergroup->busy() << "), Fits Done=" << nFitsDone() << ", Errors=";
00204     oss << nFitErrors() << ", Scans Done=" << m_nScans << "\nLast scan: " << lastScan(); 
00205     oss << "\n Timing.  I/O time: " << m_ioTimeTaken << " Scan time: "; 
00206     oss << m_scanTimeTaken << " Approx average time/scan/thread: " << getAverageTimePerScan() << endl;
00207     m_status = oss.str();
00208     return m_status.c_str();
00209 }
00210 
00211 void Fitter::incrementFitErrors() throw() {
00212     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00213     ++m_nFitErrors;
00214 }
00215 
00216 void Fitter::incrementFitsDone() throw() {
00217     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00218     ++m_nFitsDone;
00219 }
00220 
00221 void Fitter::scanDone(double time) throw() {
00222     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00223     ++m_nScans;
00224     m_scanTimeTaken += time;
00225 }
00226 
00227 void Fitter::addIOTime(double time) throw() {
00228     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00229     m_ioTimeTaken += time;
00230 }
00231 
00232 
00233 double Fitter::getAverageTimePerScan() const throw() {
00234     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00235     double time = (m_ioTimeTaken+m_scanTimeTaken)/workergroup->nWorkers();
00236     if (m_nScans > 0) time /= m_nScans;     
00237     return time;
00238 }
00239 
00240 long Fitter::nFitsDone() const throw() {
00241     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00242     return m_nFitsDone;
00243 }
00244 
00245 long Fitter::nFitErrors() const throw() {
00246     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00247     return m_nFitErrors;
00248 }
00249 
00250 long Fitter::nFitsDone(FitterIStatus* status) throw() {
00251     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00252     status->returnCode = FitterIReply_Success;
00253     return this->nFitsDone();
00254 }
00255 
00256 long Fitter::nFitErrors(FitterIStatus* status) throw() {
00257     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00258     status->returnCode = FitterIReply_Success;
00259     return this->nFitErrors();
00260 }
00261 
00262 long Fitter::queueLength(FitterIStatus* status) {
00263     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00264     status->returnCode = FitterIReply_Success;
00265     return workergroup->queueSize();
00266 }
00267 
00268 long Fitter::busy(FitterIStatus* status) {
00269     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00270     status->returnCode = FitterIReply_Success;
00271     return workergroup->busy();
00272 }
00273 
00274 const char* Fitter::lastScan() const throw() {
00275     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00276     return m_scan.c_str();
00277 }
00278 
00279 char* Fitter::lastScan(FitterIStatus* status) throw() {    
00280     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00281     char * message = new char[m_scan.length()+1];
00282     strcpy(message, m_scan.c_str() );
00283     status->returnCode = FitterIReply_Success;
00284     return message;
00285 }
00286 
00287 FitStrategy& Fitter::getFitStrategy() const throw(LogicError) {
00288     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00289     if (!m_fitStrategy)
00290         throw InvariantViolatedError("Fitter::getStrategy() no fit strategy defined", __FILE__, __LINE__);
00291     return *m_fitStrategy;
00292 }
00293 
00294 void Fitter::setFitStrategy(string name) throw(LogicError) {
00295     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00296     m_fitStrategy = FitStrategyFactory::instance().getStrategy(name);
00297 }
00298 
00299 } // end of namespace SctFitter

Generated on Thu Jul 15 09:50:46 2004 for SCT DAQ/DCS Software - C++ by doxygen 1.3.5