cdi_daq.cxx

00001 
00002 //    cdi_daq.cxx
00003 //
00004 //    Test application for TDAQ conditions data collection
00005 //
00006 //    Luis Pedro,  November 2003
00007 //
00008 //    description:
00009 //          Implements an interce between the TDAQ IS service
00010 //          and the ATLAS Conditions Database
00011 //      
00013 
00014 // changed after the release of the new IS API
00015 //#include <is/isinfo.h>
00016 //#include <is/isinfoany.h>
00017 #include <is/info.h>
00018 #include <is/infoany.h>
00019 
00020 #include <iostream>
00021 #include <owl/time.h>
00022 #include <ipc/core.h>
00023 #include <ipc/partition.h>
00024 #include <cmdl/cmdargs.h>
00025 #include <signal.h>
00026 #include <list>
00027 #include <unistd.h>
00028 
00029 #include "cdi/cdiHandle.h"
00030 #include "cdi/cdiDataHandle.h"
00031 #include "cdi/cdiCOOLDataHandle.h"
00032 #include <exception>
00033 
00034 #define LISBON 0
00035 #define COOL   1
00036 
00037 //enum apis {LISBON=0,COOL=1};
00038 
00039 int verbose;
00040 
00041 ISInfoReceiver *    ISReceiver;
00042 
00043 struct CallbackParams
00044 {
00045   std::string           partition_name;
00046   //  cdiDataHandle *       data_handle;
00047   cdiHandle*                    data_handle;
00048   ISInfoReceiver *      receiver;
00049   std::vector<std::string>  is_names;
00050   cdiAux            mout;
00051   std::string                   conf_obj; 
00052 };
00053 
00054 void subscribe_to_IS( CallbackParams & params );
00055 void unsubscribe_from_IS( CallbackParams & params );
00056 
00057 void signal_exit( int )
00058 {
00059    if ( ISReceiver )
00060      ISReceiver->stop();        
00061 }
00062 
00063 void is_callback_function( ISCallbackInfo * isc )
00064 {
00065 
00066   CallbackParams * params = static_cast<CallbackParams*>(isc->parameter());
00067   params->mout.info( std::string("Callback::" + (std::string)isc->name()) );
00068   
00069   // NBARROS: This might lead to problems
00070   // currently the value is hardcoded as  "RunParams.Conditions";
00071   
00072   // here we have the issue that each time that someone changes the Conditions
00073   // object all data is stored all over again
00074   
00075   if( std::string(isc->name()) == params->conf_obj) // std::string("RunParams.Conditions")) //cdiDataHandle::configuration_name )
00076     {
00077       params->mout.debug(std::string( "Processing configuration callback"));
00078       unsubscribe_from_IS( *params );
00079       params->data_handle->getConfig( params->is_names );
00080       subscribe_to_IS( *params );  
00081     } 
00082   else
00083     {
00084       params->mout.debug( std::string( "Processing normal callback (not configuration)"));
00085     }
00086   params->data_handle->process_data( std::string( isc->name()), isc );
00087 }
00088 
00089 void subscribe_to_IS( CallbackParams & params )
00090 {
00091   params.mout.verbose( std::string("::cdi_daq::unsubscribe_to_IS -> Entering..."));
00092   
00093   ISInfo::Status status;
00094     
00095   for( size_t i = 0; i < params.is_names.size(); i++ )
00096   {
00097     if ( (status = params.receiver->subscribe(  params.is_names[i].c_str(), 
00098                         is_callback_function, 
00099                         &params ) ) != ISInfo::Success ) {
00100     if ( status == ISInfo::AlreadyExist)
00101       params.mout.error( std::string( "Subscribe for IS info " + params.is_names[i] + " FAILED: AlreadyExist!") );
00102       else if ( status == ISInfo::CommFailure)
00103     params.mout.error( std::string( "Subscribe for IS info " + params.is_names[i] + " FAILED: CommFailure !") );
00104       else
00105     params.mout.error( std::string( "Subscribe for IS info " + params.is_names[i] + " FAILED: Unknown !") );
00106     }
00107     else
00108     {
00109       params.mout.info( std::string( "Subscribe IS server " + params.is_names[i] + " SUCCESS.") );
00110     }
00111   }
00112 
00113   params.mout.info( std::string("Ready to data aquisition!") );
00114 }
00115 
00116 
00117 void unsubscribe_from_IS( CallbackParams & params )
00118 {  
00119   ISInfo::Status status;
00120   
00121   for( size_t i = 0; i < params.is_names.size(); i++ )
00122   {
00123     if ( (status = params.receiver->unsubscribe( params.is_names[i].c_str() ) ) != ISInfo::Success )
00124       params.mout.error( std::string( "Unsubscribe from IS info " + params.is_names[i] + " FAILED:" ) );
00125     else
00126       params.mout.info ( std::string( "Unsubscribe from IS info " + params.is_names[i] + " SUCCESS.") );
00127   }
00128 }
00129 
00130 
00131 int main(int argc, char ** argv){
00132   
00133   IPCCore::init(argc, argv);
00134   
00135   CmdArgStr partition_name('p', "partition", "partition-name", "partition to work in.", CmdArg::isREQ);
00136   CmdArgInt backend('B', "backend", "cdi-backend", "Choose the backend to be used (0:Lisbon_CONDDB, 1:COOL).", CmdArg::isREQ);
00137   CmdArgInt verb_level('v', "verbose", "verbosity-level", "0:Silent, 1:Info, 2:Debug, 3:Verbose (defaults to 0).");
00138   CmdArgStr    conf_obj('O',"config-list","configuration-object","Name of the ISobject containing the list of objects to be stored (defaults to \"RunParams.Conditions\")");
00139   
00140   CmdLine  cmd(*argv, &partition_name, &backend ,&verb_level, &conf_obj,NULL);
00141   CmdArgvIter  arg_iter(--argc, ++argv);
00142   
00143   cmd.description("This program implements the functionality to store data published in subscribed\n"
00144           "IS Servers into the ATLAS Conditions Database.\n"
00145           "\t It provides the functionality to interface the TDAQ system to the Conditions Database.\n"
00146           "The user must choose which backend myust be used (COOL or Lisbon CondDB)");
00147   
00148   
00149   partition_name = 0;
00150   verb_level = 0;
00151   conf_obj = "RunParams.Conditions";
00152 
00153   cmd.parse(arg_iter);
00154   
00155   std::cout << OWLTime() << " :: cdi_daq is initializing with " << argc << " command line parameters" << std::endl;
00156   
00157   
00158   //Initialization
00159   //  cdiDataHandle cdi_handle(verb_level, (std::string)partition_name);
00160    
00161   cdiHandle *cdi_handle;
00162   
00163   if ( backend == COOL ) {
00164     cdi_handle = new cdiCOOLDataHandle(verb_level,(std::string)partition_name,(std::string)conf_obj);
00165   } else
00166     if ( backend == LISBON ) {
00167       cdi_handle = new cdiDataHandle(verb_level,(std::string)partition_name);
00168     }
00169     else {
00170       std::cerr << OWLTime() << " :: cdi_daq failed because the backend [" << backend 
00171         << "] is not valid!" << std::endl;
00172       exit(1);
00173     }
00174 
00175   // initialize signal handlers
00176   signal(SIGINT,  signal_exit);
00177   signal(SIGTERM, signal_exit);
00178  
00179   IPCPartition partition(partition_name);  
00180   
00181   ISInfoReceiver receiver(partition);
00182  
00183   ISReceiver = &receiver;
00184 
00185   std::vector<std::string> is_names;
00186   
00187   //  cdi_handle.getConfig( is_names );
00188 
00189   // this fills the ISobjects subscribed using the Conditions object and the mandatory elements
00190   cdi_handle->getConfig( is_names );
00191   
00192   if (verb_level >= 3)
00193     std::cout << OWLTime() << "::cdi_daq -> Creating CallbackParams..." << std::endl;
00194   
00195   CallbackParams params;
00196   
00197   if (verb_level >= 3)
00198     std::cout << OWLTime() << "::cdi_daq -> Defining partition as : [" << partition_name << "]..." << std::endl;
00199   params.partition_name = partition_name;
00200   
00201   if (verb_level >= 3)
00202     std::cout << OWLTime() << "::cdi_daq -> Configuration object to be used : [" << conf_obj << "]..." << std::endl;
00203   params.conf_obj = conf_obj;
00204   
00205 
00206   if (verb_level >= 3)
00207     std::cout << OWLTime() << "::cdi_daq -> Uploading the cdiHandle..." << std::endl;
00208   params.data_handle = cdi_handle;//&cdi_handle;
00209   params.receiver = &receiver;
00210   params.is_names = is_names;
00211   params.mout = cdiAux( verb_level );
00212   params.conf_obj = conf_obj;
00213 
00214   if (verb_level >= 3)
00215     std::cout << OWLTime() << "::cdi_daq -> Subscribing to IS..." << std::endl;
00216   subscribe_to_IS( params );
00217   
00218   std::cout << OWLTime() << " :: cdi_daq has been started in the \"" 
00219         << partition.name() << "\" partition" << std::endl;
00220 
00221 
00222   receiver.run();
00223   
00224   /*
00225     try {
00226     receiver.run();
00227     }
00228     catch (std::exception &e)
00229     {
00230     std::cerr << "Standard exception caught [" << e.message() << "]" << std::endl;
00231     std::cerr << OWLTime() << " :: cdi_daq is going to quit!" << std::endl;
00232     
00233     }
00234   */
00235   ISReceiver = 0;
00236   
00237   unsubscribe_from_IS( params );
00238   
00239   return 0;
00240 }

Generated on Mon Feb 6 14:01:17 2006 for SCT DAQ/DCS Software - C++ by  doxygen 1.4.6