Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Data Structures | File List | Namespace Members | Data Fields | Related Pages

Archiver.cpp

00001 //#include "Sct/isstream_bugfix.h"
00002 #include "Archiver.h"
00003 #include "ArchivingWorkerGroup.h"
00004 #include "Sct/Archive/IOManagerArchiveFile.h" 
00005 #include "Sct/SctNames.h"
00006 #include "Sct/IpcObjectException.h"
00007 #include "Sct/UnsupportedOperationError.h"
00008 #include "Sct/StdExceptionWrapper.h"
00009 #include "Sct/ISStreamerWrapper.h"
00010 #include "is/isinfo.h"
00011 #include <pmg/pmg_initSync.h>
00012 
00013 #include "unistd.h"
00014 #include <memory>
00015 
00016 #include "TransferCommand.h"
00017 #include "IsGetCommand.h"
00018 #include "IsPutCommand.h"
00019 #include "ArchiveGetCommand.h"
00020 #include "ArchivePutCommand.h"
00021 #include "Sct/Archive/IOParamsArchive.h"
00022 
00023 using namespace std;
00024 
00025 using namespace Sct;
00026 using namespace Sct::Archive;
00027 using namespace Sct::IS;
00028 
00029 using boost::mutex;
00030 
00031 void pmgSynch(void *) {
00032     pmg_initSync();
00033 }
00034 
00035 int main(int argc, char** argv) {
00036     using namespace SctArchiving;
00037     setExceptionHandlers(argv[0]);
00038 
00039     // renice this job!
00040     nice(10);
00041 
00042     bool multiThreaded = true; //Choose single/multi-threaded
00043     IPCCore::init(multiThreaded);
00044     
00045     try {
00046       Archiver& s=Archiver::instance();
00047       if (!s.publish()){
00048     throw IpcObjectException("ArchivingService failed to publish", __FILE__, __LINE__);
00049       }
00050       s.go(1);
00051       s.getArchiverServer().doSoon(pmgSynch, NULL);
00052       s.getArchiverServer().run();
00053     } catch (Throwable& e) {
00054       e.sendToMrs(MRS_FATAL);
00055       terminate();
00056     }
00057 }
00058 
00059 namespace SctArchiving {
00060 
00061 Archiver* Archiver::archiver = 0;
00062 
00063 Archiver& Archiver::instance() {
00064   if(!archiver) archiver = new Archiver();
00065   return *archiver;
00066 }
00067 
00068   Archiver::Archiver() throw(ConfigurationException) : IPCObject(ArchivingServiceI_C_ArchivingServiceInterface_instanceName, &getArchiverServer()),
00069      nArchived(), nRetrieved(0), nValidated(0), isTimeTaken(0.), fileTimeTaken(0.) {
00070   
00071   auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00072   if (!ir.get()) throw ConfigurationException("Archiver::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00073   infoReceiver = ir;
00074   workergroup = new ArchivingWorkerGroup();
00075   cout << "constructed" << endl;
00076   cout << "Persistent dir:" << SctNames::getPersistentDir() << endl;
00077   cout << "Retrieval is server : " << SctNames::getRetrievedDataName() << endl;
00078   m_archive_manager = &IOManagerArchiveFile::instance();
00079   m_retrieval_is_server=Sct::SctNames::getRetrievedDataName();
00080   m_suspend_callback=false;
00081 }
00082 
00083 Archiver::~Archiver() throw() {
00084   cout << "ArchivingService waiting for queue to finish " << endl;
00085   while (workergroup->busy() || workergroup->queueSize() ) {
00086     sleep(1);
00087   }
00088   workergroup->join();
00089 }
00090 
00091 void Archiver::addCommand(boost::shared_ptr<ArchivingCommand> command)const {
00092   workergroup->push(command);
00093 }
00094 
00095 IPCServer& Archiver::getArchiverServer() throw() {
00096   static IPCServer archiverServer(ArchivingServiceI_C_ArchivingServiceInterface_serverName, SctNames::getPartition());
00097   return archiverServer;
00098 }
00099 
00100 inline bool isControl(std::string name){
00101   return ( name.find("TestData")!=string::npos || name.find("ControlData")!=string::npos);
00102 };
00103 
00104 void Archiver::archive_callback(ISCallbackInfo *isc){
00105   try{
00106     bool over_write=isControl(isc->name());
00107     if (isc->reason() == ISInfoUpdated && !over_write ) {
00108       throw IoException(string("That is odd ... ") + isc->name() +  " has been modified! I'm not going to overwrite the data."
00109             , __FILE__, __LINE__);
00110     }
00111     if (isc->reason() != ISInfoCreated && isc->reason() != ISInfoUpdated) return;
00112     if (Archiver::instance().m_suspend_callback){
00113     SctNames::Mrs() << MRS_TEXT("ArchivingService being suspended from callbacks") << "ARCHIVE_SUSPEND" << MRS_INFORMATION << ENDM;
00114         return;
00115     }
00116     cout << "Archiving " << isc->name() << endl;
00117 
00118     shared_ptr<IONameIS> name ( new IONameIS(isc->name()));
00119     shared_ptr<GetCommand> get (new IsGetCommand(name));
00120     shared_ptr<PutCommand> put (new ArchivePutCommand());
00121 
00122     if (over_write){
00123       shared_ptr<Sct::Archive::IOParamsArchive> params;
00124       params = shared_ptr<Sct::Archive::IOParamsArchive>(new IOParamsArchive());
00125       params->setOverWrite();
00126       put->setParams(params);
00127     }
00128 
00129     shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00130     Archiver::instance().workergroup->push( command );
00131     
00132   } catch (Sct::Throwable& e){
00133     e.sendToMrs();
00134   }catch(std::exception& e){
00135     StdExceptionWrapper(e).sendToMrs();
00136   }
00137 }
00138 
00139 IOManagerArchive& Archiver::getIOManagerArchive() const {
00140   return *m_archive_manager;
00141 }
00142 
00143 string Archiver::getRetrieveIsServer() const throw(){
00144   return  m_retrieval_is_server;
00145 }
00146 
00147 void  Archiver::setRetrieveIsServer (ArchivingServiceIStatus *_status, ilu_T_CString serverName){
00148   _status->returnCode = ArchivingServiceIReply_Success;
00149   m_retrieval_is_server=serverName;
00150 }
00151 
00152 void Archiver::suspendCallbacks (ArchivingServiceIStatus *_status, ilu_Boolean doSuspend){
00153    _status->returnCode = ArchivingServiceIReply_Success;
00154    m_suspend_callback=doSuspend;
00155 }
00156 
00157 
00158 void Archiver::subscribe(const string& servername, const string& regexp, ISCallbackInfo::Callback callback){
00159   if (servername==SctNames::getRetrievedDataName()){
00160     ostringstream oss;
00161     oss << "ArchivingService is not allowed to subscribe to " << servername 
00162     << " since this could set up a loop";
00163     IoException(oss.str(), __FILE__, __LINE__).sendToMrs();
00164   }
00165   ISInfo::Status s=infoReceiver->subscribe(servername.c_str(), regexp.c_str(), callback);
00166   if (s!=ISInfo::Success) {
00167     ostringstream os;
00168     os <<"Could not subscribe to "<<servername<< " to retrieve " << regexp;
00169     IsException(s, os.str(), __FILE__, __LINE__).sendToMrs();
00170   } else {
00171     cout << "Subscribed to " << servername << " to retrieve " << regexp << endl;
00172   }
00173 }
00174   
00175 void Archiver::go(unsigned nWorker) throw(IsException) {
00176   subscribe(SctNames::getEventDataName(), "SctData::RawScanResult.*", archive_callback);
00177   subscribe(SctNames::getFittedDataName(), "SctData::FitScanResult.*", archive_callback);
00178   subscribe(SctNames::getTestDataName(), "SctData::.*TestResult.*", archive_callback);
00179   subscribe(SctNames::getControlDataName(), "TestData.*", archive_callback);
00180   subscribe(SctNames::getControlDataName(), "SequenceData.*", archive_callback);
00181   workergroup->go(nWorker);
00182 }
00183 
00184   void Archiver::incrimentNArchived() throw(){
00185     mutex::scoped_lock scoped_lock(counterMutex);
00186     ++nArchived;
00187   }
00188   
00189   void Archiver::incrimentNRetrieved() throw(){
00190     mutex::scoped_lock scoped_lock(counterMutex);
00191     ++nRetrieved;
00192   }
00193   
00194   void Archiver::incrimentNValidated() throw(){
00195     mutex::scoped_lock scoped_lock(counterMutex);
00196     ++nValidated;
00197   }
00198   
00199   void Archiver::addISTime(double time) throw(){
00200     mutex::scoped_lock scoped_lock(counterMutex);
00201     isTimeTaken += time;
00202   }
00203 
00204   void Archiver::addFileTime(double time) throw(){
00205     mutex::scoped_lock scoped_lock(counterMutex);
00206     fileTimeTaken += time;
00207   }
00208 
00209 const char* Archiver::getStatus() const throw(){
00210   ostringstream os;
00211   os << "pid = " << getpid() << endl;
00212   os << "Workers=" << workergroup->nWorkers();
00213   os << ", (Busy=" << workergroup->busy() << ")" 
00214      << ". Queue = " << workergroup->queueSize() << endl
00215      << "# Archived = " << nArchived
00216      << " ; # Retrieved = " << nRetrieved  
00217       << " ; # Validated = " << nValidated << endl
00218      << " Approx average time / command / thread = " << getAverageTime(fileTimeTaken)
00219      << " (Archive) + " << getAverageTime(isTimeTaken) << " (IS/ISProxy)" << endl;
00220   os << "Compression level = " << getIOManagerArchive().getCompressionLevel() << endl;
00221   os << "IOManagerArchive Information: \n"
00222      << getIOManagerArchive().status();
00223   return os.str().c_str();
00224 }
00225 
00226   double Archiver::getAverageTime(double time) const throw(){
00227     long ncommands =  nArchived + nRetrieved + nValidated;
00228     return (ncommands>0) ? time/ncommands : 0.;
00229   }
00230 
00231   char* Archiver::status(ArchivingServiceIStatus* status){
00232     status->returnCode = ArchivingServiceIReply_Success;
00233     char *statusMessage = new char[300];
00234     strcpy(statusMessage, getStatus());
00235     return statusMessage;
00236   }
00237   
00238   void Archiver::archiveISName(ArchivingServiceIStatus *status, char* ioNameIS){
00239     try {
00240       status->returnCode = ArchivingServiceIReply_Success;
00241       shared_ptr<IONameIS> name( new IONameIS(ioNameIS));
00242       shared_ptr<GetCommand> get (new IsGetCommand(name));
00243       shared_ptr<PutCommand> put (new ArchivePutCommand());
00244 
00245       if (isControl(ioNameIS)){
00246     shared_ptr<Sct::Archive::IOParamsArchive> params;
00247     params = shared_ptr<Sct::Archive::IOParamsArchive>(new IOParamsArchive());
00248     params->setOverWrite();
00249     put->setParams(params);
00250       }
00251 
00252       shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00253       workergroup->push( command );
00254     }catch(Throwable& e){
00255       e.sendToMrs();
00256     }
00257   }
00258 
00259   void Archiver::retrieveISName (ArchivingServiceIStatus *_status, char* ioNameIS){
00260     try{ 
00261       IONameIS nameis(ioNameIS);
00262       IONameArchiveFile archfile( nameis.getUniqueID(), nameis.getClassName());
00264       char* blah = const_cast<char*> (archfile.getIOName().c_str());
00265       retrieveArchName (_status, blah);
00266     }catch(Throwable& e){
00267       e.sendToMrs();
00268     }
00269   }
00270 
00271   void Archiver::retrieve(ArchivingServiceIStatus *_status, char* runNumber, char* scanNumber, char* className, char* specifier){
00272     try{ 
00273       std::ostringstream oss;
00274       oss << SctNames::getPersistentDir() << "/" << className 
00275       << "." << runNumber << "." << scanNumber 
00276       << "." << specifier << ".gz";
00277       IONameArchiveFile arch(oss.str());
00278       char* blah = const_cast<char*>(arch.getIOName().c_str());
00279       retrieveArchName(_status, blah);
00280     }catch(Throwable& e){
00281       e.sendToMrs();
00282     }
00283   }
00284 
00285   void Archiver::retrieveArchName (ArchivingServiceIStatus *status, char* archivingName){
00286     try{
00287       status->returnCode = ArchivingServiceIReply_Success;
00288       shared_ptr<IONameArchiveFile> name(new IONameArchiveFile(archivingName));
00289       shared_ptr<GetCommand> get (new ArchiveGetCommand(name));
00290       shared_ptr<IsPutCommand> put (new IsPutCommand());
00291       put->setServer(getRetrieveIsServer());
00292       shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00293       workergroup->push( command );
00294     }catch(Throwable& e){
00295       e.sendToMrs();
00296     }
00297   }
00298 
00299   
00300   ilu_ShortInteger Archiver::busy(ArchivingServiceIStatus* status){
00301     status->returnCode = ArchivingServiceIReply_Success;
00302     return workergroup->busy();
00303   }
00304 
00305   ilu_ShortInteger Archiver::queueLength(ArchivingServiceIStatus* status){
00306     status->returnCode = ArchivingServiceIReply_Success;
00307     return workergroup->queueSize();
00308   }
00309 
00310   ilu_ShortInteger Archiver::getCompressionLevel (ArchivingServiceIStatus *status) {
00311     status->returnCode = ArchivingServiceIReply_Success;
00312     return getIOManagerArchive().getCompressionLevel();
00313   }
00314   
00315   void Archiver::setCompressionLevel ( ArchivingServiceIStatus *status, ilu_ShortInteger level) {
00316     status->returnCode = ArchivingServiceIReply_Success;
00317     getIOManagerArchive().setCompressionLevel(level);
00318   }
00319 } // end of namespace SctArchiving

Generated on Thu Jul 15 09:50:42 2004 for SCT DAQ/DCS Software - C++ by doxygen 1.3.5