00001 #include "Archiver.h"
00002 #include "ArchivingWorkerGroup.h"
00003 #include "Sct/Archive/IOManagerArchiveFile.h"
00004 #include "Sct/SctNames.h"
00005 #include "Sct/IpcObjectException.h"
00006 #include "Sct/UnsupportedOperationError.h"
00007 #include "Sct/StdExceptionWrapper.h"
00008 #include "is/info.h"
00009 #include <pmg/pmg_initSync.h>
00010
00011 #include "unistd.h"
00012 #include <memory>
00013
00014 #include "TransferCommand.h"
00015 #include "IsGetCommand.h"
00016 #include "IsPutCommand.h"
00017 #include "ArchiveGetCommand.h"
00018 #include "ArchivePutCommand.h"
00019 #include "Sct/Archive/IOParamsArchive.h"
00020 #include "Sct/OmniMarshalling.h"
00021 #include "Sct/ApplicationStartupDebugTools.h"
00022
00023 #include "ipc/core.h"
00024
00025 using namespace std;
00026
00027 using namespace Sct;
00028 using namespace Sct::Archive;
00029 using namespace Sct::IS;
00030
00031 using boost::recursive_mutex;
00032
00033
00034
00035
00036
00037
00038
00039 int main(int argc, char** argv) {
00040 using namespace SctArchiving;
00041 setExceptionHandlers(argv[0]);
00042
00043 IPCCore::init(argc,argv);
00044 Sct::ApplicationStartupDebugTools::announceStartOfMain(argc, argv, __FILE__, __LINE__);
00045
00046
00047 nice(10);
00048
00049 ArchiverArguments args(argc, argv);
00050 args.print(std::cout);
00051
00052 try {
00053 Archiver& s=Archiver::initialize(args);
00054 if (!s.publish()){
00055 throw IpcObjectException("ArchivingService failed to publish", __FILE__, __LINE__);
00056 }
00057 s.go(1);
00058 pmg_initSync();
00059 s.getArchiverServer().run();
00060 } catch (Throwable& e) {
00061 e.sendToMrs(MRS_FATAL);
00062 terminate();
00063 }
00064 }
00065
00066 namespace SctArchiving {
00067
00068 Archiver* Archiver::archiver = 0;
00069
00070 Archiver& Archiver::instance() {
00071 if(!archiver) throw IllegalStateError("Attempt to use uninitialised Archiver", __FILE__, __LINE__);
00072 return *archiver;
00073 }
00074
00075 Archiver& Archiver::initialize(ArchiverArguments args){
00076 archiver = new Archiver(args);
00077 return *archiver;
00078 }
00079
00080 Archiver::Archiver(ArchiverArguments args) :
00081 IPCNamedObject<POA_ArchivingServiceI::ArchivingServiceInterface>(SctNames::getPartition(), args.instanceName()),
00082 nArchived(), nRetrieved(0),
00083 nValidated(0), isTimeTaken(0.),
00084 fileTimeTaken(0.), m_args(args) {
00085
00086 auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00087 if (!ir.get()) throw ConfigurationException("Archiver::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00088 infoReceiver = ir;
00089 workergroup = new ArchivingWorkerGroup();
00090 workergroup->reportTo(args.getISStatusName());
00091 ostringstream oss;
00092 oss << "Started with persistent directory=" << m_args.getPersistentDirectory()
00093 << " and retrieval server = " << m_args.getOutputISServer();
00094
00095 SctNames::Mrs() << "ARCHIVING_SERVICE" << MRS_TEXT(oss.str())
00096 << MRS_INFORMATION << ENDM;
00097 m_archive_manager = &IOManagerArchiveFile::instance();
00098 m_retrieval_is_server=Sct::SctNames::getRetrievedDataName();
00099 m_suspend_callback=false;
00100 m_archive_manager->setCompressionLevel(m_args.getCompressionLevel());
00101 }
00102
00103 Archiver::~Archiver() throw() {
00104 while (workergroup->busy() || workergroup->queueSize() ) {
00105 sleep(1);
00106 }
00107 workergroup->join();
00108 }
00109
00110 void Archiver::addCommand(boost::shared_ptr<ArchivingCommand> command)const {
00111 workergroup->push(command);
00112 }
00113
00114 IPCServer& Archiver::getArchiverServer() throw() {
00115 static IPCServer archiverServer;
00116 return archiverServer;
00117 }
00118
00119 inline bool isControl(std::string name){
00120 return ( name.find("TestData")!=string::npos || name.find("ControlData")!=string::npos);
00121 };
00122
00123 void Archiver::archive_callback(ISCallbackInfo *isc){
00124 try{
00125 bool over_write=isControl(isc->name());
00126 if (isc->reason() == is::Updated && !over_write ) {
00127 throw IoException(string("That is odd ... ") + isc->name() + " has been modified! I'm not going to overwrite the data."
00128 , __FILE__, __LINE__);
00129 }
00130 if (isc->reason() != is::Created && isc->reason() != is::Updated) return;
00131 if (Archiver::instance().m_suspend_callback){
00132 return;
00133 }
00134
00135 shared_ptr<IONameIS> name ( new IONameIS(isc->name()));
00136 shared_ptr<GetCommand> get (new IsGetCommand(name));
00137 shared_ptr<PutCommand> put (new ArchivePutCommand());
00138
00139 if (over_write){
00140 shared_ptr<Sct::Archive::IOParamsArchive> params;
00141 params = shared_ptr<Sct::Archive::IOParamsArchive>(new IOParamsArchive());
00142 params->setOverWrite();
00143 put->setParams(params);
00144 }
00145
00146 shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00147 Archiver::instance().workergroup->push( command );
00148
00149 } catch (Sct::Throwable& e){
00150 e.sendToMrs();
00151 }catch(std::exception& e){
00152 StdExceptionWrapper(e).sendToMrs();
00153 }
00154 }
00155
00156 IOManagerArchive& Archiver::getIOManagerArchive() const {
00157 return *m_archive_manager;
00158 }
00159
00160 string Archiver::internal_getRetrieveIsServer() const throw(){
00161 return m_retrieval_is_server;
00162 }
00163
00164
00165 char * Archiver::getRetrieveIsServer() throw() {
00166 return copyStringToCorba(internal_getRetrieveIsServer());
00167 };
00168
00169 void Archiver::setRetrieveIsServer (const char * serverName){
00170 m_retrieval_is_server=serverName;
00171 }
00172
00173 void Archiver::setPersistentDirectory(const char* newDirectory) {
00174 SctNames::setPersistentDir(newDirectory);
00175 }
00176
00177 void Archiver::suspendCallbacks (CORBA::Boolean doSuspend){
00178 if (doSuspend) SctNames::Mrs() << MRS_TEXT("ArchivingService being suspended from callbacks")
00179 << "ARCHIVE_SUSPEND" << MRS_INFORMATION << ENDM;
00180 m_suspend_callback=doSuspend;
00181 }
00182
00183
00184 void Archiver::subscribe(const string& servername, const string& regexp, ISCallbackInfo::Callback callback){
00185 if (servername==m_args.getOutputISServer()){
00186 ostringstream oss;
00187 oss << "ArchivingService is not allowed to subscribe to [" << servername
00188 << "] since it is publishing there and this could set up a loop";
00189 IoException(oss.str(), __FILE__, __LINE__).sendToMrs();
00190 }
00191 ISInfo::Status s=infoReceiver->subscribe(servername.c_str(), regexp.c_str(), callback);
00192 if (s!=ISInfo::Success) {
00193 ostringstream os;
00194 os <<"Archiving service could not subscribe to [" << servername
00195 << "] to retrieve [" << regexp << "]";
00196 IsException(s, os.str(), __FILE__, __LINE__).sendToMrs();
00197 } else {
00198 ostringstream oss;
00199 oss << "Archiving service subscribed to IS server [" << servername
00200 << "] to retrieve [" << regexp << "]";
00201 SctNames::Mrs() << "ARCHIVE_SUBSCRIBE" << MRS_TEXT(oss.str())
00202 << MRS_INFORMATION << ENDM;
00203 }
00204 }
00205
00206 void Archiver::go(unsigned nWorker) throw(IsException) {
00207 const list<SctService::Arguments::Subscription> theList = m_args.getInputISServers();
00208 for (list<SctService::Arguments::Subscription>::const_iterator i=theList.begin();
00209 i!= theList.end(); ++i){
00210 subscribe((*i).server, (*i).regexp, archive_callback);
00211 }
00212 workergroup->go(m_args.getNWorkers());
00213 if (m_args.recoveryMode()) recover();
00214 }
00215
00216 void Archiver::incrimentNArchived() throw(){
00217 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00218 ++nArchived;
00219 }
00220
00221 void Archiver::incrimentNRetrieved() throw(){
00222 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00223 ++nRetrieved;
00224 }
00225
00226 void Archiver::incrimentNValidated() throw(){
00227 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00228 ++nValidated;
00229 }
00230
00231 void Archiver::addISTime(double time) throw(){
00232 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00233 isTimeTaken += time;
00234 }
00235
00236 void Archiver::addFileTime(double time) throw(){
00237 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00238 fileTimeTaken += time;
00239 }
00240
00241 const char* Archiver::getStatus() const throw(){
00242 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00243 ostringstream os;
00244 os << "\t=== Archiving Service ===" << endl
00245 << "nWorkers=" << workergroup->nWorkers() << endl
00246 << "\t(Busy=" << workergroup->busy() << ")" << endl
00247 << "Queue = " << workergroup->queueSize();
00248 if (workergroup->isFifo()) {
00249 os << " [FIFO]\n";
00250 }else{
00251 os << " [FILO]\n";
00252 }
00253 os<< "# Archived = " << nArchived << endl
00254 << "# Retrieved = " << nRetrieved <<endl
00255 << "# Validated = " << nValidated << endl
00256 << " Approx average time / command / thread :" << endl
00257 << "\tArchive = " << getAverageTime(fileTimeTaken) << endl
00258 << "\tIS/ISProxy = " << getAverageTime(isTimeTaken) << endl;
00259 os << "Compression level = " << getIOManagerArchive().getCompressionLevel() << endl;
00260 os << "IOManagerArchive Information: \n"
00261 << getIOManagerArchive().status() << endl;
00262 return os.str().c_str();
00263 }
00264
00265 double Archiver::getAverageTime(double time) const throw(){
00266 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00267 long ncommands = nArchived + nRetrieved + nValidated;
00268 return (ncommands>0) ? time/ncommands : 0.;
00269 }
00270
00271 char* Archiver::status(){
00272 recursive_mutex::scoped_lock scoped_lock(counterMutex);
00273 std::string status=getStatus();
00274 char *statusMessage = new char[status.size()+1];
00275 strcpy(statusMessage, status.c_str());
00276 return statusMessage;
00277 }
00278
00279 void Archiver::archiveISName(const char* ioNameIS){
00280 try {
00281 shared_ptr<IONameIS> name( new IONameIS(ioNameIS));
00282 shared_ptr<GetCommand> get (new IsGetCommand(name));
00283 shared_ptr<PutCommand> put (new ArchivePutCommand());
00284
00285 if (isControl(ioNameIS)){
00286 shared_ptr<Sct::Archive::IOParamsArchive> params;
00287 params = shared_ptr<Sct::Archive::IOParamsArchive>(new IOParamsArchive());
00288 params->setOverWrite();
00289 put->setParams(params);
00290 }
00291
00292 shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00293 workergroup->push( command );
00294 }catch(Throwable& e){
00295 e.sendToMrs();
00296 }
00297 }
00298
00299 void Archiver::retrieveISName (const char* ioNameIS){
00300 try{
00301 IONameIS nameis(ioNameIS);
00302 IONameArchiveFile archfile( nameis.getUniqueID(), nameis.getClassName());
00304 char* blah = const_cast<char*> (archfile.getIOName().c_str());
00305 retrieveArchName (blah);
00306 }catch(Throwable& e){
00307 e.sendToMrs();
00308 }
00309 }
00310
00311 void Archiver::retrieve(const char* runNumber, const char* scanNumber, const char* className, const char* specifier){
00312 try{
00313 std::ostringstream oss;
00314 oss << m_args.getPersistentDirectory() << "/" << className
00315 << "." << runNumber << "." << scanNumber
00316 << "." << specifier << ".gz";
00317 IONameArchiveFile arch(oss.str());
00318 char* blah = const_cast<char*>(arch.getIOName().c_str());
00319 retrieveArchName(blah);
00320 }catch(Throwable& e){
00321 e.sendToMrs();
00322 }
00323 }
00324
00325 void Archiver::retrieveArchName (const char* archivingName){
00326 try{
00327 shared_ptr<IONameArchiveFile> name(new IONameArchiveFile(archivingName));
00328 shared_ptr<GetCommand> get (new ArchiveGetCommand(name));
00329 shared_ptr<IsPutCommand> put (new IsPutCommand());
00330 put->setServer(internal_getRetrieveIsServer());
00331 shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00332 workergroup->push( command );
00333 }catch(Throwable& e){
00334 e.sendToMrs();
00335 }
00336 }
00337
00338
00339 CORBA::Short Archiver::busy(){
00340 return workergroup->busy();
00341 }
00342
00343 CORBA::Short Archiver::queueLength(){
00344 return workergroup->queueSize();
00345 }
00346
00347 CORBA::Short Archiver::getCompressionLevel () {
00348 return getIOManagerArchive().getCompressionLevel();
00349 }
00350
00351 bool Archiver::isFifo(){
00352 return workergroup->isFifo();
00353 }
00354
00355 void Archiver::setFifo(bool val){
00356 workergroup->setFifo(val);
00357 }
00358
00359 void Archiver::setCompressionLevel (CORBA::Short level) {
00360 getIOManagerArchive().setCompressionLevel(level);
00361 }
00362 void Archiver::recover(){
00363 SctNames::Mrs() << MRS_TEXT("Sorry - recovery not yet implimented")
00364 << "ANALYSIS_RECOVER" << MRS_DIAGNOSTIC << ENDM;
00365 }
00366 }