Archiver.cpp

00001 #include "Archiver.h"
00002 #include "ArchivingWorkerGroup.h"
00003 #include "Sct/Archive/IOManagerArchiveFile.h" 
00004 #include "Sct/SctNames.h"
00005 #include "Sct/IpcObjectException.h"
00006 #include "Sct/UnsupportedOperationError.h"
00007 #include "Sct/StdExceptionWrapper.h"
00008 #include "is/info.h"
00009 #include <pmg/pmg_initSync.h>
00010 
00011 #include "unistd.h"
00012 #include <memory>
00013 
00014 #include "TransferCommand.h"
00015 #include "IsGetCommand.h"
00016 #include "IsPutCommand.h"
00017 #include "ArchiveGetCommand.h"
00018 #include "ArchivePutCommand.h"
00019 #include "Sct/Archive/IOParamsArchive.h"
00020 #include "Sct/OmniMarshalling.h"
00021 #include "Sct/ApplicationStartupDebugTools.h"
00022 
00023 #include "ipc/core.h"
00024 
00025 using namespace std;
00026 
00027 using namespace Sct;
00028 using namespace Sct::Archive;
00029 using namespace Sct::IS;
00030 
00031 using boost::recursive_mutex;
00032 
00033 /* Obsolete:
00034 void pmgSynch(void *) {
00035     pmg_initSync();
00036 }
00037 */
00038 
00039 int main(int argc, char** argv) {
00040     using namespace SctArchiving;
00041     setExceptionHandlers(argv[0]);
00042 
00043     IPCCore::init(argc,argv);
00044     Sct::ApplicationStartupDebugTools::announceStartOfMain(argc, argv, __FILE__, __LINE__);
00045  
00046     // renice this job!
00047     nice(10);
00048     
00049     ArchiverArguments args(argc, argv);
00050     args.print(std::cout);
00051 
00052     try {
00053       Archiver& s=Archiver::initialize(args);
00054       if (!s.publish()){
00055     throw IpcObjectException("ArchivingService failed to publish", __FILE__, __LINE__);
00056       }
00057       s.go(1);
00058       pmg_initSync();
00059       s.getArchiverServer().run();
00060     } catch (Throwable& e) {
00061       e.sendToMrs(MRS_FATAL);
00062       terminate();
00063     }
00064 }
00065 
00066 namespace SctArchiving {
00067 
00068 Archiver* Archiver::archiver = 0;
00069 
00070 Archiver& Archiver::instance() {
00071   if(!archiver) throw IllegalStateError("Attempt to use uninitialised Archiver", __FILE__, __LINE__);
00072   return *archiver;
00073 }
00074 
00075 Archiver& Archiver::initialize(ArchiverArguments args){
00076   archiver = new Archiver(args);
00077   return *archiver;
00078 }
00079 
00080 Archiver::Archiver(ArchiverArguments args) : 
00081     IPCNamedObject<POA_ArchivingServiceI::ArchivingServiceInterface>(SctNames::getPartition(), args.instanceName()),
00082     nArchived(),     nRetrieved(0), 
00083     nValidated(0),   isTimeTaken(0.), 
00084     fileTimeTaken(0.), m_args(args) {
00085   
00086   auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00087   if (!ir.get()) throw ConfigurationException("Archiver::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00088   infoReceiver = ir;
00089   workergroup = new ArchivingWorkerGroup();
00090   workergroup->reportTo(args.getISStatusName());
00091   ostringstream oss;
00092   oss << "Started with persistent directory=" << m_args.getPersistentDirectory()
00093       << " and retrieval server = " << m_args.getOutputISServer();
00094 
00095   SctNames::Mrs() << "ARCHIVING_SERVICE" << MRS_TEXT(oss.str()) 
00096           << MRS_INFORMATION << ENDM;
00097   m_archive_manager = &IOManagerArchiveFile::instance();
00098   m_retrieval_is_server=Sct::SctNames::getRetrievedDataName();
00099   m_suspend_callback=false;
00100   m_archive_manager->setCompressionLevel(m_args.getCompressionLevel());
00101 }
00102 
00103 Archiver::~Archiver() throw() {
00104   while (workergroup->busy() || workergroup->queueSize() ) {
00105     sleep(1);
00106   }
00107   workergroup->join();
00108 }
00109 
00110 void Archiver::addCommand(boost::shared_ptr<ArchivingCommand> command)const {
00111   workergroup->push(command);
00112 }
00113 
00114 IPCServer& Archiver::getArchiverServer() throw() {
00115   static IPCServer archiverServer;
00116   return archiverServer;
00117 }
00118 
00119 inline bool isControl(std::string name){
00120   return ( name.find("TestData")!=string::npos || name.find("ControlData")!=string::npos);
00121 };
00122 
00123 void Archiver::archive_callback(ISCallbackInfo *isc){
00124   try{
00125     bool over_write=isControl(isc->name());
00126     if (isc->reason() == is::Updated && !over_write ) {
00127       throw IoException(string("That is odd ... ") + isc->name() +  " has been modified! I'm not going to overwrite the data."
00128             , __FILE__, __LINE__);
00129     }
00130     if (isc->reason() != is::Created && isc->reason() != is::Updated) return;
00131     if (Archiver::instance().m_suspend_callback){
00132         return;
00133     }
00134 
00135     shared_ptr<IONameIS> name ( new IONameIS(isc->name()));
00136     shared_ptr<GetCommand> get (new IsGetCommand(name));
00137     shared_ptr<PutCommand> put (new ArchivePutCommand());
00138 
00139     if (over_write){
00140       shared_ptr<Sct::Archive::IOParamsArchive> params;
00141       params = shared_ptr<Sct::Archive::IOParamsArchive>(new IOParamsArchive());
00142       params->setOverWrite();
00143       put->setParams(params);
00144     }
00145 
00146     shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00147     Archiver::instance().workergroup->push( command );
00148     
00149   } catch (Sct::Throwable& e){
00150     e.sendToMrs();
00151   }catch(std::exception& e){
00152     StdExceptionWrapper(e).sendToMrs();
00153   }
00154 }
00155 
00156 IOManagerArchive& Archiver::getIOManagerArchive() const {
00157   return *m_archive_manager;
00158 }
00159 
00160 string Archiver::internal_getRetrieveIsServer() const throw(){
00161   return  m_retrieval_is_server;
00162 }
00163 
00164   
00165 char * Archiver::getRetrieveIsServer() throw() {
00166   return copyStringToCorba(internal_getRetrieveIsServer());
00167 };
00168 
00169 void  Archiver::setRetrieveIsServer (const char * serverName){
00170   m_retrieval_is_server=serverName;
00171 }
00172 
00173 void Archiver::setPersistentDirectory(const char* newDirectory) {
00174   SctNames::setPersistentDir(newDirectory);
00175 }
00176 
00177 void Archiver::suspendCallbacks (CORBA::Boolean doSuspend){
00178    if (doSuspend) SctNames::Mrs() << MRS_TEXT("ArchivingService being suspended from callbacks") 
00179                   << "ARCHIVE_SUSPEND" << MRS_INFORMATION << ENDM;
00180    m_suspend_callback=doSuspend;
00181 }
00182 
00183 
00184 void Archiver::subscribe(const string& servername, const string& regexp, ISCallbackInfo::Callback callback){
00185   if (servername==m_args.getOutputISServer()){
00186     ostringstream oss;
00187     oss << "ArchivingService is not allowed to subscribe to [" << servername 
00188     << "] since it is publishing there and this could set up a loop";
00189     IoException(oss.str(), __FILE__, __LINE__).sendToMrs();
00190   }
00191   ISInfo::Status s=infoReceiver->subscribe(servername.c_str(), regexp.c_str(), callback);
00192   if (s!=ISInfo::Success) {
00193     ostringstream os;
00194     os <<"Archiving service could not subscribe to [" << servername
00195        << "] to retrieve [" << regexp << "]";
00196     IsException(s, os.str(), __FILE__, __LINE__).sendToMrs();
00197   } else {
00198     ostringstream oss;
00199     oss << "Archiving service subscribed to IS server [" << servername
00200     << "] to retrieve [" << regexp << "]";
00201     SctNames::Mrs() << "ARCHIVE_SUBSCRIBE" << MRS_TEXT(oss.str())
00202             << MRS_INFORMATION << ENDM;
00203   }
00204 }
00205   
00206   void Archiver::go(unsigned nWorker) throw(IsException) {
00207     const list<SctService::Arguments::Subscription> theList = m_args.getInputISServers();
00208     for (list<SctService::Arguments::Subscription>::const_iterator i=theList.begin(); 
00209      i!= theList.end(); ++i){
00210       subscribe((*i).server, (*i).regexp, archive_callback);
00211     }
00212     workergroup->go(m_args.getNWorkers());
00213     if (m_args.recoveryMode()) recover();
00214   }
00215 
00216   void Archiver::incrimentNArchived() throw(){
00217     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00218     ++nArchived;
00219   }
00220   
00221   void Archiver::incrimentNRetrieved() throw(){
00222     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00223     ++nRetrieved;
00224   }
00225   
00226   void Archiver::incrimentNValidated() throw(){
00227     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00228     ++nValidated;
00229   }
00230   
00231   void Archiver::addISTime(double time) throw(){
00232     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00233     isTimeTaken += time;
00234   }
00235 
00236   void Archiver::addFileTime(double time) throw(){
00237     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00238     fileTimeTaken += time;
00239   }
00240 
00241 const char* Archiver::getStatus() const throw(){
00242   recursive_mutex::scoped_lock scoped_lock(counterMutex);
00243   ostringstream os;
00244   os << "\t=== Archiving Service ===" << endl
00245      << "nWorkers=" << workergroup->nWorkers() << endl
00246      << "\t(Busy=" << workergroup->busy() << ")" << endl
00247      << "Queue = " << workergroup->queueSize();
00248   if (workergroup->isFifo()) {
00249     os << " [FIFO]\n";
00250   }else{
00251     os << " [FILO]\n";
00252   }
00253   os<< "# Archived = " << nArchived << endl
00254      << "# Retrieved = " << nRetrieved  <<endl
00255      << "# Validated = " << nValidated << endl
00256      << " Approx average time / command / thread :" << endl 
00257      << "\tArchive = " << getAverageTime(fileTimeTaken) << endl
00258      << "\tIS/ISProxy = " << getAverageTime(isTimeTaken) << endl;
00259   os << "Compression level = " << getIOManagerArchive().getCompressionLevel() << endl;
00260   os << "IOManagerArchive Information: \n"
00261      << getIOManagerArchive().status() << endl;
00262   return os.str().c_str();
00263 }
00264 
00265   double Archiver::getAverageTime(double time) const throw(){
00266     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00267     long ncommands =  nArchived + nRetrieved + nValidated;
00268     return (ncommands>0) ? time/ncommands : 0.;
00269   }
00270 
00271   char* Archiver::status(){
00272     recursive_mutex::scoped_lock scoped_lock(counterMutex);
00273     std::string status=getStatus();
00274     char *statusMessage = new char[status.size()+1];
00275     strcpy(statusMessage, status.c_str());
00276     return statusMessage;
00277   }
00278   
00279   void Archiver::archiveISName(const char* ioNameIS){
00280     try {
00281       shared_ptr<IONameIS> name( new IONameIS(ioNameIS));
00282       shared_ptr<GetCommand> get (new IsGetCommand(name));
00283       shared_ptr<PutCommand> put (new ArchivePutCommand());
00284 
00285       if (isControl(ioNameIS)){
00286     shared_ptr<Sct::Archive::IOParamsArchive> params;
00287     params = shared_ptr<Sct::Archive::IOParamsArchive>(new IOParamsArchive());
00288     params->setOverWrite();
00289     put->setParams(params);
00290       }
00291 
00292       shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00293       workergroup->push( command );
00294     }catch(Throwable& e){
00295       e.sendToMrs();
00296     }
00297   }
00298 
00299   void Archiver::retrieveISName (const char* ioNameIS){
00300     try{ 
00301       IONameIS nameis(ioNameIS);
00302       IONameArchiveFile archfile( nameis.getUniqueID(), nameis.getClassName());
00304       char* blah = const_cast<char*> (archfile.getIOName().c_str());
00305       retrieveArchName (blah);
00306     }catch(Throwable& e){
00307       e.sendToMrs();
00308     }
00309   }
00310 
00311   void Archiver::retrieve(const char* runNumber, const char* scanNumber, const char* className, const char* specifier){
00312     try{ 
00313       std::ostringstream oss;
00314       oss << m_args.getPersistentDirectory() << "/" << className 
00315       << "." << runNumber << "." << scanNumber 
00316       << "." << specifier << ".gz";
00317       IONameArchiveFile arch(oss.str());
00318       char* blah = const_cast<char*>(arch.getIOName().c_str());
00319       retrieveArchName(blah);
00320     }catch(Throwable& e){
00321       e.sendToMrs();
00322     }
00323   }
00324 
00325   void Archiver::retrieveArchName (const char* archivingName){
00326     try{
00327       shared_ptr<IONameArchiveFile> name(new IONameArchiveFile(archivingName));
00328       shared_ptr<GetCommand> get (new ArchiveGetCommand(name));
00329       shared_ptr<IsPutCommand> put (new IsPutCommand());
00330       put->setServer(internal_getRetrieveIsServer());
00331       shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00332       workergroup->push( command );
00333     }catch(Throwable& e){
00334       e.sendToMrs();
00335     }
00336   }
00337 
00338   
00339   CORBA::Short Archiver::busy(){
00340     return workergroup->busy();
00341   }
00342 
00343   CORBA::Short Archiver::queueLength(){
00344     return workergroup->queueSize();
00345   }
00346 
00347   CORBA::Short Archiver::getCompressionLevel () {
00348     return getIOManagerArchive().getCompressionLevel();
00349   }
00350   
00351   bool Archiver::isFifo(){
00352     return workergroup->isFifo();
00353   }
00354 
00355   void Archiver::setFifo(bool val){
00356     workergroup->setFifo(val);
00357   }
00358 
00359   void Archiver::setCompressionLevel (CORBA::Short level) {
00360     getIOManagerArchive().setCompressionLevel(level);
00361   }
00362   void Archiver::recover(){
00363     SctNames::Mrs() << MRS_TEXT("Sorry - recovery not yet implimented") 
00364             << "ANALYSIS_RECOVER" << MRS_DIAGNOSTIC << ENDM;
00365   }
00366 } // end of namespace SctArchiving

Generated on Mon Feb 6 14:01:16 2006 for SCT DAQ/DCS Software - C++ by  doxygen 1.4.6