00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00013
00014
00015
00016
00017 #include <is/info.h>
00018 #include <is/infoany.h>
00019
00020 #include <iostream>
00021 #include <owl/time.h>
00022 #include <ipc/core.h>
00023 #include <ipc/partition.h>
00024 #include <cmdl/cmdargs.h>
00025 #include <signal.h>
00026 #include <list>
00027 #include <unistd.h>
00028
00029 #include "cdi/cdiHandle.h"
00030 #include "cdi/cdiDataHandle.h"
00031 #include "cdi/cdiCOOLDataHandle.h"
00032 #include <exception>
00033
00034 #define LISBON 0
00035 #define COOL 1
00036
00037
00038
00039 int verbose;
00040
00041 ISInfoReceiver * ISReceiver;
00042
00043 struct CallbackParams
00044 {
00045 std::string partition_name;
00046
00047 cdiHandle* data_handle;
00048 ISInfoReceiver * receiver;
00049 std::vector<std::string> is_names;
00050 cdiAux mout;
00051 std::string conf_obj;
00052 };
00053
00054 void subscribe_to_IS( CallbackParams & params );
00055 void unsubscribe_from_IS( CallbackParams & params );
00056
00057 void signal_exit( int )
00058 {
00059 if ( ISReceiver )
00060 ISReceiver->stop();
00061 }
00062
00063 void is_callback_function( ISCallbackInfo * isc )
00064 {
00065
00066 CallbackParams * params = static_cast<CallbackParams*>(isc->parameter());
00067 params->mout.info( std::string("Callback::" + (std::string)isc->name()) );
00068
00069
00070
00071
00072
00073
00074
00075 if( std::string(isc->name()) == params->conf_obj)
00076 {
00077 params->mout.debug(std::string( "Processing configuration callback"));
00078 unsubscribe_from_IS( *params );
00079 params->data_handle->getConfig( params->is_names );
00080 subscribe_to_IS( *params );
00081 }
00082 else
00083 {
00084 params->mout.debug( std::string( "Processing normal callback (not configuration)"));
00085 }
00086 params->data_handle->process_data( std::string( isc->name()), isc );
00087 }
00088
00089 void subscribe_to_IS( CallbackParams & params )
00090 {
00091 params.mout.verbose( std::string("::cdi_daq::unsubscribe_to_IS -> Entering..."));
00092
00093 ISInfo::Status status;
00094
00095 for( size_t i = 0; i < params.is_names.size(); i++ )
00096 {
00097 if ( (status = params.receiver->subscribe( params.is_names[i].c_str(),
00098 is_callback_function,
00099 ¶ms ) ) != ISInfo::Success ) {
00100 if ( status == ISInfo::AlreadyExist)
00101 params.mout.error( std::string( "Subscribe for IS info " + params.is_names[i] + " FAILED: AlreadyExist!") );
00102 else if ( status == ISInfo::CommFailure)
00103 params.mout.error( std::string( "Subscribe for IS info " + params.is_names[i] + " FAILED: CommFailure !") );
00104 else
00105 params.mout.error( std::string( "Subscribe for IS info " + params.is_names[i] + " FAILED: Unknown !") );
00106 }
00107 else
00108 {
00109 params.mout.info( std::string( "Subscribe IS server " + params.is_names[i] + " SUCCESS.") );
00110 }
00111 }
00112
00113 params.mout.info( std::string("Ready to data aquisition!") );
00114 }
00115
00116
00117 void unsubscribe_from_IS( CallbackParams & params )
00118 {
00119 ISInfo::Status status;
00120
00121 for( size_t i = 0; i < params.is_names.size(); i++ )
00122 {
00123 if ( (status = params.receiver->unsubscribe( params.is_names[i].c_str() ) ) != ISInfo::Success )
00124 params.mout.error( std::string( "Unsubscribe from IS info " + params.is_names[i] + " FAILED:" ) );
00125 else
00126 params.mout.info ( std::string( "Unsubscribe from IS info " + params.is_names[i] + " SUCCESS.") );
00127 }
00128 }
00129
00130
00131 int main(int argc, char ** argv){
00132
00133 IPCCore::init(argc, argv);
00134
00135 CmdArgStr partition_name('p', "partition", "partition-name", "partition to work in.", CmdArg::isREQ);
00136 CmdArgInt backend('B', "backend", "cdi-backend", "Choose the backend to be used (0:Lisbon_CONDDB, 1:COOL).", CmdArg::isREQ);
00137 CmdArgInt verb_level('v', "verbose", "verbosity-level", "0:Silent, 1:Info, 2:Debug, 3:Verbose (defaults to 0).");
00138 CmdArgStr conf_obj('O',"config-list","configuration-object","Name of the ISobject containing the list of objects to be stored (defaults to \"RunParams.Conditions\")");
00139
00140 CmdLine cmd(*argv, &partition_name, &backend ,&verb_level, &conf_obj,NULL);
00141 CmdArgvIter arg_iter(--argc, ++argv);
00142
00143 cmd.description("This program implements the functionality to store data published in subscribed\n"
00144 "IS Servers into the ATLAS Conditions Database.\n"
00145 "\t It provides the functionality to interface the TDAQ system to the Conditions Database.\n"
00146 "The user must choose which backend myust be used (COOL or Lisbon CondDB)");
00147
00148
00149 partition_name = 0;
00150 verb_level = 0;
00151 conf_obj = "RunParams.Conditions";
00152
00153 cmd.parse(arg_iter);
00154
00155 std::cout << OWLTime() << " :: cdi_daq is initializing with " << argc << " command line parameters" << std::endl;
00156
00157
00158
00159
00160
00161 cdiHandle *cdi_handle;
00162
00163 if ( backend == COOL ) {
00164 cdi_handle = new cdiCOOLDataHandle(verb_level,(std::string)partition_name,(std::string)conf_obj);
00165 } else
00166 if ( backend == LISBON ) {
00167 cdi_handle = new cdiDataHandle(verb_level,(std::string)partition_name);
00168 }
00169 else {
00170 std::cerr << OWLTime() << " :: cdi_daq failed because the backend [" << backend
00171 << "] is not valid!" << std::endl;
00172 exit(1);
00173 }
00174
00175
00176 signal(SIGINT, signal_exit);
00177 signal(SIGTERM, signal_exit);
00178
00179 IPCPartition partition(partition_name);
00180
00181 ISInfoReceiver receiver(partition);
00182
00183 ISReceiver = &receiver;
00184
00185 std::vector<std::string> is_names;
00186
00187
00188
00189
00190 cdi_handle->getConfig( is_names );
00191
00192 if (verb_level >= 3)
00193 std::cout << OWLTime() << "::cdi_daq -> Creating CallbackParams..." << std::endl;
00194
00195 CallbackParams params;
00196
00197 if (verb_level >= 3)
00198 std::cout << OWLTime() << "::cdi_daq -> Defining partition as : [" << partition_name << "]..." << std::endl;
00199 params.partition_name = partition_name;
00200
00201 if (verb_level >= 3)
00202 std::cout << OWLTime() << "::cdi_daq -> Configuration object to be used : [" << conf_obj << "]..." << std::endl;
00203 params.conf_obj = conf_obj;
00204
00205
00206 if (verb_level >= 3)
00207 std::cout << OWLTime() << "::cdi_daq -> Uploading the cdiHandle..." << std::endl;
00208 params.data_handle = cdi_handle;
00209 params.receiver = &receiver;
00210 params.is_names = is_names;
00211 params.mout = cdiAux( verb_level );
00212 params.conf_obj = conf_obj;
00213
00214 if (verb_level >= 3)
00215 std::cout << OWLTime() << "::cdi_daq -> Subscribing to IS..." << std::endl;
00216 subscribe_to_IS( params );
00217
00218 std::cout << OWLTime() << " :: cdi_daq has been started in the \""
00219 << partition.name() << "\" partition" << std::endl;
00220
00221
00222 receiver.run();
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235 ISReceiver = 0;
00236
00237 unsubscribe_from_IS( params );
00238
00239 return 0;
00240 }