00001
00002 #include "Archiver.h"
00003 #include "ArchivingWorkerGroup.h"
00004 #include "Sct/Archive/IOManagerArchiveFile.h"
00005 #include "Sct/SctNames.h"
00006 #include "Sct/IpcObjectException.h"
00007 #include "Sct/UnsupportedOperationError.h"
00008 #include "Sct/StdExceptionWrapper.h"
00009 #include "Sct/ISStreamerWrapper.h"
00010 #include "is/isinfo.h"
00011 #include <pmg/pmg_initSync.h>
00012
00013 #include "unistd.h"
00014 #include <memory>
00015
00016 #include "TransferCommand.h"
00017 #include "IsGetCommand.h"
00018 #include "IsPutCommand.h"
00019 #include "ArchiveGetCommand.h"
00020 #include "ArchivePutCommand.h"
00021 #include "Sct/Archive/IOParamsArchive.h"
00022
00023 using namespace std;
00024
00025 using namespace Sct;
00026 using namespace Sct::Archive;
00027 using namespace Sct::IS;
00028
00029 using boost::mutex;
00030
00031 void pmgSynch(void *) {
00032 pmg_initSync();
00033 }
00034
00035 int main(int argc, char** argv) {
00036 using namespace SctArchiving;
00037 setExceptionHandlers(argv[0]);
00038
00039
00040 nice(10);
00041
00042 bool multiThreaded = true;
00043 IPCCore::init(multiThreaded);
00044
00045 try {
00046 Archiver& s=Archiver::instance();
00047 if (!s.publish()){
00048 throw IpcObjectException("ArchivingService failed to publish", __FILE__, __LINE__);
00049 }
00050 s.go(1);
00051 s.getArchiverServer().doSoon(pmgSynch, NULL);
00052 s.getArchiverServer().run();
00053 } catch (Throwable& e) {
00054 e.sendToMrs(MRS_FATAL);
00055 terminate();
00056 }
00057 }
00058
00059 namespace SctArchiving {
00060
00061 Archiver* Archiver::archiver = 0;
00062
00063 Archiver& Archiver::instance() {
00064 if(!archiver) archiver = new Archiver();
00065 return *archiver;
00066 }
00067
00068 Archiver::Archiver() throw(ConfigurationException) : IPCObject(ArchivingServiceI_C_ArchivingServiceInterface_instanceName, &getArchiverServer()),
00069 nArchived(), nRetrieved(0), nValidated(0), isTimeTaken(0.), fileTimeTaken(0.) {
00070
00071 auto_ptr<ISInfoReceiver> ir(new ISInfoReceiver(SctNames::getPartition()));
00072 if (!ir.get()) throw ConfigurationException("Archiver::initialize can't make infoReceiver ", __FILE__, __LINE__) ;
00073 infoReceiver = ir;
00074 workergroup = new ArchivingWorkerGroup();
00075 cout << "constructed" << endl;
00076 cout << "Persistent dir:" << SctNames::getPersistentDir() << endl;
00077 cout << "Retrieval is server : " << SctNames::getRetrievedDataName() << endl;
00078 m_archive_manager = &IOManagerArchiveFile::instance();
00079 m_retrieval_is_server=Sct::SctNames::getRetrievedDataName();
00080 m_suspend_callback=false;
00081 }
00082
00083 Archiver::~Archiver() throw() {
00084 cout << "ArchivingService waiting for queue to finish " << endl;
00085 while (workergroup->busy() || workergroup->queueSize() ) {
00086 sleep(1);
00087 }
00088 workergroup->join();
00089 }
00090
00091 void Archiver::addCommand(boost::shared_ptr<ArchivingCommand> command)const {
00092 workergroup->push(command);
00093 }
00094
00095 IPCServer& Archiver::getArchiverServer() throw() {
00096 static IPCServer archiverServer(ArchivingServiceI_C_ArchivingServiceInterface_serverName, SctNames::getPartition());
00097 return archiverServer;
00098 }
00099
00100 inline bool isControl(std::string name){
00101 return ( name.find("TestData")!=string::npos || name.find("ControlData")!=string::npos);
00102 };
00103
00104 void Archiver::archive_callback(ISCallbackInfo *isc){
00105 try{
00106 bool over_write=isControl(isc->name());
00107 if (isc->reason() == ISInfoUpdated && !over_write ) {
00108 throw IoException(string("That is odd ... ") + isc->name() + " has been modified! I'm not going to overwrite the data."
00109 , __FILE__, __LINE__);
00110 }
00111 if (isc->reason() != ISInfoCreated && isc->reason() != ISInfoUpdated) return;
00112 if (Archiver::instance().m_suspend_callback){
00113 SctNames::Mrs() << MRS_TEXT("ArchivingService being suspended from callbacks") << "ARCHIVE_SUSPEND" << MRS_INFORMATION << ENDM;
00114 return;
00115 }
00116 cout << "Archiving " << isc->name() << endl;
00117
00118 shared_ptr<IONameIS> name ( new IONameIS(isc->name()));
00119 shared_ptr<GetCommand> get (new IsGetCommand(name));
00120 shared_ptr<PutCommand> put (new ArchivePutCommand());
00121
00122 if (over_write){
00123 shared_ptr<Sct::Archive::IOParamsArchive> params;
00124 params = shared_ptr<Sct::Archive::IOParamsArchive>(new IOParamsArchive());
00125 params->setOverWrite();
00126 put->setParams(params);
00127 }
00128
00129 shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00130 Archiver::instance().workergroup->push( command );
00131
00132 } catch (Sct::Throwable& e){
00133 e.sendToMrs();
00134 }catch(std::exception& e){
00135 StdExceptionWrapper(e).sendToMrs();
00136 }
00137 }
00138
00139 IOManagerArchive& Archiver::getIOManagerArchive() const {
00140 return *m_archive_manager;
00141 }
00142
00143 string Archiver::getRetrieveIsServer() const throw(){
00144 return m_retrieval_is_server;
00145 }
00146
00147 void Archiver::setRetrieveIsServer (ArchivingServiceIStatus *_status, ilu_T_CString serverName){
00148 _status->returnCode = ArchivingServiceIReply_Success;
00149 m_retrieval_is_server=serverName;
00150 }
00151
00152 void Archiver::suspendCallbacks (ArchivingServiceIStatus *_status, ilu_Boolean doSuspend){
00153 _status->returnCode = ArchivingServiceIReply_Success;
00154 m_suspend_callback=doSuspend;
00155 }
00156
00157
00158 void Archiver::subscribe(const string& servername, const string& regexp, ISCallbackInfo::Callback callback){
00159 if (servername==SctNames::getRetrievedDataName()){
00160 ostringstream oss;
00161 oss << "ArchivingService is not allowed to subscribe to " << servername
00162 << " since this could set up a loop";
00163 IoException(oss.str(), __FILE__, __LINE__).sendToMrs();
00164 }
00165 ISInfo::Status s=infoReceiver->subscribe(servername.c_str(), regexp.c_str(), callback);
00166 if (s!=ISInfo::Success) {
00167 ostringstream os;
00168 os <<"Could not subscribe to "<<servername<< " to retrieve " << regexp;
00169 IsException(s, os.str(), __FILE__, __LINE__).sendToMrs();
00170 } else {
00171 cout << "Subscribed to " << servername << " to retrieve " << regexp << endl;
00172 }
00173 }
00174
00175 void Archiver::go(unsigned nWorker) throw(IsException) {
00176 subscribe(SctNames::getEventDataName(), "SctData::RawScanResult.*", archive_callback);
00177 subscribe(SctNames::getFittedDataName(), "SctData::FitScanResult.*", archive_callback);
00178 subscribe(SctNames::getTestDataName(), "SctData::.*TestResult.*", archive_callback);
00179 subscribe(SctNames::getControlDataName(), "TestData.*", archive_callback);
00180 subscribe(SctNames::getControlDataName(), "SequenceData.*", archive_callback);
00181 workergroup->go(nWorker);
00182 }
00183
00184 void Archiver::incrimentNArchived() throw(){
00185 mutex::scoped_lock scoped_lock(counterMutex);
00186 ++nArchived;
00187 }
00188
00189 void Archiver::incrimentNRetrieved() throw(){
00190 mutex::scoped_lock scoped_lock(counterMutex);
00191 ++nRetrieved;
00192 }
00193
00194 void Archiver::incrimentNValidated() throw(){
00195 mutex::scoped_lock scoped_lock(counterMutex);
00196 ++nValidated;
00197 }
00198
00199 void Archiver::addISTime(double time) throw(){
00200 mutex::scoped_lock scoped_lock(counterMutex);
00201 isTimeTaken += time;
00202 }
00203
00204 void Archiver::addFileTime(double time) throw(){
00205 mutex::scoped_lock scoped_lock(counterMutex);
00206 fileTimeTaken += time;
00207 }
00208
00209 const char* Archiver::getStatus() const throw(){
00210 ostringstream os;
00211 os << "pid = " << getpid() << endl;
00212 os << "Workers=" << workergroup->nWorkers();
00213 os << ", (Busy=" << workergroup->busy() << ")"
00214 << ". Queue = " << workergroup->queueSize() << endl
00215 << "# Archived = " << nArchived
00216 << " ; # Retrieved = " << nRetrieved
00217 << " ; # Validated = " << nValidated << endl
00218 << " Approx average time / command / thread = " << getAverageTime(fileTimeTaken)
00219 << " (Archive) + " << getAverageTime(isTimeTaken) << " (IS/ISProxy)" << endl;
00220 os << "Compression level = " << getIOManagerArchive().getCompressionLevel() << endl;
00221 os << "IOManagerArchive Information: \n"
00222 << getIOManagerArchive().status();
00223 return os.str().c_str();
00224 }
00225
00226 double Archiver::getAverageTime(double time) const throw(){
00227 long ncommands = nArchived + nRetrieved + nValidated;
00228 return (ncommands>0) ? time/ncommands : 0.;
00229 }
00230
00231 char* Archiver::status(ArchivingServiceIStatus* status){
00232 status->returnCode = ArchivingServiceIReply_Success;
00233 char *statusMessage = new char[300];
00234 strcpy(statusMessage, getStatus());
00235 return statusMessage;
00236 }
00237
00238 void Archiver::archiveISName(ArchivingServiceIStatus *status, char* ioNameIS){
00239 try {
00240 status->returnCode = ArchivingServiceIReply_Success;
00241 shared_ptr<IONameIS> name( new IONameIS(ioNameIS));
00242 shared_ptr<GetCommand> get (new IsGetCommand(name));
00243 shared_ptr<PutCommand> put (new ArchivePutCommand());
00244
00245 if (isControl(ioNameIS)){
00246 shared_ptr<Sct::Archive::IOParamsArchive> params;
00247 params = shared_ptr<Sct::Archive::IOParamsArchive>(new IOParamsArchive());
00248 params->setOverWrite();
00249 put->setParams(params);
00250 }
00251
00252 shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00253 workergroup->push( command );
00254 }catch(Throwable& e){
00255 e.sendToMrs();
00256 }
00257 }
00258
00259 void Archiver::retrieveISName (ArchivingServiceIStatus *_status, char* ioNameIS){
00260 try{
00261 IONameIS nameis(ioNameIS);
00262 IONameArchiveFile archfile( nameis.getUniqueID(), nameis.getClassName());
00264 char* blah = const_cast<char*> (archfile.getIOName().c_str());
00265 retrieveArchName (_status, blah);
00266 }catch(Throwable& e){
00267 e.sendToMrs();
00268 }
00269 }
00270
00271 void Archiver::retrieve(ArchivingServiceIStatus *_status, char* runNumber, char* scanNumber, char* className, char* specifier){
00272 try{
00273 std::ostringstream oss;
00274 oss << SctNames::getPersistentDir() << "/" << className
00275 << "." << runNumber << "." << scanNumber
00276 << "." << specifier << ".gz";
00277 IONameArchiveFile arch(oss.str());
00278 char* blah = const_cast<char*>(arch.getIOName().c_str());
00279 retrieveArchName(_status, blah);
00280 }catch(Throwable& e){
00281 e.sendToMrs();
00282 }
00283 }
00284
00285 void Archiver::retrieveArchName (ArchivingServiceIStatus *status, char* archivingName){
00286 try{
00287 status->returnCode = ArchivingServiceIReply_Success;
00288 shared_ptr<IONameArchiveFile> name(new IONameArchiveFile(archivingName));
00289 shared_ptr<GetCommand> get (new ArchiveGetCommand(name));
00290 shared_ptr<IsPutCommand> put (new IsPutCommand());
00291 put->setServer(getRetrieveIsServer());
00292 shared_ptr<TransferCommand> command( new TransferCommand(get, put));
00293 workergroup->push( command );
00294 }catch(Throwable& e){
00295 e.sendToMrs();
00296 }
00297 }
00298
00299
00300 ilu_ShortInteger Archiver::busy(ArchivingServiceIStatus* status){
00301 status->returnCode = ArchivingServiceIReply_Success;
00302 return workergroup->busy();
00303 }
00304
00305 ilu_ShortInteger Archiver::queueLength(ArchivingServiceIStatus* status){
00306 status->returnCode = ArchivingServiceIReply_Success;
00307 return workergroup->queueSize();
00308 }
00309
00310 ilu_ShortInteger Archiver::getCompressionLevel (ArchivingServiceIStatus *status) {
00311 status->returnCode = ArchivingServiceIReply_Success;
00312 return getIOManagerArchive().getCompressionLevel();
00313 }
00314
00315 void Archiver::setCompressionLevel ( ArchivingServiceIStatus *status, ilu_ShortInteger level) {
00316 status->returnCode = ArchivingServiceIReply_Success;
00317 getIOManagerArchive().setCompressionLevel(level);
00318 }
00319 }