00001 #include "IOManagerDB.h"
00002 #include "MySqlException.h"
00003 #include "ZlibStringCompressor.h"
00004 #include "Sct/Serializable.h"
00005 #include "Sct/SctNames.h"
00006 #include "Sct/XmlStyleStream.h"
00007 #include "Sct/StdExceptionWrapper.h"
00008 #include "Sct/Archive/IONameArchiveFile.h"
00009
00010 #include "SctData/UniqueID.h"
00011 #include "SctData/ScanResult.h"
00012 #include "SctData/TestResult.h"
00013
00014 #include <sqlplus.hh>
00015 #include <custom.hh>
00016 #include <sstream>
00017 #include <boost/date_time/posix_time/posix_time.hpp>
00018
00019 using std::string;
00020 using std::ostringstream;
00021 using boost::posix_time::to_simple_string;
00022 using boost::shared_ptr;
00023 using namespace Sct;
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 namespace SctArchiving {
00035
00036 IOManagerDB::IOManagerDB() {
00037 m_compressor = shared_ptr<ZlibStringCompressor>(new ZlibStringCompressor());
00038 m_connection = shared_ptr<Connection>(new Connection("test_db") );
00039 }
00040
00041 IOManagerDB::~IOManagerDB() {
00042 }
00043
00044 IOManagerDB* IOManagerDB::s_man=0;
00045
00046 IOManagerDB& IOManagerDB::instance() {
00047 if (!s_man) s_man=new IOManagerDB();
00048 return *s_man;
00049 }
00050
00051 std::string IOManagerDB::status() const {
00052 std::ostringstream oss;
00053 oss << "IOManagerDB";
00054 if (m_compressor.get()){
00055 oss << "String compressor buffer size : "
00056 << m_compressor->getBufferSize() << std::endl;
00057 } else {
00058 oss << "String compressor not initialized" << endl;
00059 }
00060 return oss.str();
00061 }
00062
00063 Query IOManagerDB::getQuery() const{
00064 return m_connection->query();
00065 }
00066
00067
00068 const SctData::ResultHeader& IOManagerDB::getHeader(const Serializable& ob){
00069 const SctData::ResultHeader* header=0;
00070 if (ob.getClassName().find("TestResult")!=string::npos) {
00071 header = & dynamic_cast<const SctData::TestResult&>(ob).getHeader();
00072 }
00073 if (ob.getClassName().find("ScanResult")!=string::npos) {
00074 header = & dynamic_cast<const SctData::ScanResult&>(ob).getHeader();
00075 }
00076 if (header) {
00077 return *header;
00078 } else {
00079 ostringstream oss;
00080 oss << "Dont know how to find a header for object with class name : "
00081 << ob.getClassName() << " which has UniqueID : " << ob.getUniqueID();
00082 throw IoException(oss.str(), __FILE__, __LINE__);
00083 }
00084 }
00085
00086 void IOManagerDB::prepareInsertion(const Serializable& ob, MysqlQuery& query) const{
00087 const SctData::ResultHeader& header = getHeader(ob);
00088 SctData::UniqueID id(header.getUniqueID());
00089
00090 ostringstream oss;
00091 XmlStyleOStream out_ad(oss);
00092 writeImpl(out_ad, ob, false);
00093
00094 query << string("INSERT INTO ") << getTable(ob) << string(" VALUES ('")
00095 << ob.getClassName() << string("','")
00096 << id.getRunNumber() << string("','")
00097 << id.getScanNumber() << string("','")
00098 << id.getModule() << string("','")
00099 << to_simple_string(header.getStartTime()) << string("','")
00100 << to_simple_string(header.getEndTime()) << string("','")
00101 << m_compressor->compress(oss.str(), getCompressionLevel()) << string("')");
00102 }
00103
00104 string IOManagerDB::getTable(const Serializable& ob){
00105 return "data";
00106 }
00107
00108 void IOManagerDB::write(const Serializable& ob, const IOParams* par) const {
00109 try{
00110 boost::recursive_mutex::scoped_lock lock (getMutex());
00111 setReadMode(false);
00112
00113 MysqlQuery query = getQuery();
00114 prepareInsertion(ob, query);
00115 query.execute();
00116
00117
00118 }catch(BadQuery& er){
00119 ostringstream oss;
00120 oss << "Bad Query error number : " << er.error;
00121 throw MySqlException(oss.str(), __FILE__, __LINE__);
00122 }catch (BadConversion& er) {
00123 ostringstream oss;
00124 oss << "Tried to convert \"" << er.data << "\" to a \""
00125 << er.type_name << "\"." << endl;
00126 throw MySqlException(oss.str(), __FILE__, __LINE__);
00127 }catch (Sct::Throwable& e){
00128 throw;
00129 }catch(std::exception& e){
00130 StdExceptionWrapper sew(e);
00131 throw IoException(sew, __FILE__, __LINE__);
00132 }
00133 }
00134
00135 string getQueryMatching(const string& name){
00136 Archive::IONameArchiveFile ioname(name);
00137 SctData::UniqueID id(ioname.getUniqueID());
00138 ostringstream oss;
00139 oss << " WHERE className = " << ioname.getClassName()
00140 << " AND runNumber = " << id.getRunNumber()
00141 << " AND scanNumber = " << id.getScanNumber()
00142 << " AND moduleName = " << id.getModule();
00143 return oss.str();
00144 }
00145
00146 boost::shared_ptr<Serializable> IOManagerDB::read(const string& name, const IOParams* par) const {
00147 try{
00148 boost::recursive_mutex::scoped_lock lock (getMutex());
00149 setReadMode(true);
00150 getReadVersionMap().clear();
00151 MysqlQuery query = getQuery();
00152 query << string("SELECT dataBlob FROM DATA ") << getQueryMatching(name);
00153 Row row = query.use().fetch_row();
00154
00155 const string compressed (row[0]);
00156 std::istringstream iss(m_compressor->inflate(compressed));
00157 XmlStyleIStream in_ad(iss);
00158
00159 return boost::dynamic_pointer_cast<Serializable>(readImpl(in_ad));
00160
00161
00162 }catch(BadQuery& er){
00163 ostringstream oss;
00164 oss << "Bad Query error number : " << er.error;
00165 throw MySqlException(oss.str(), __FILE__, __LINE__);
00166 }catch (BadConversion& er) {
00167 ostringstream oss;
00168 oss << "Tried to convert \"" << er.data << "\" to a \""
00169 << er.type_name << "\"." << endl;
00170 throw MySqlException(oss.str(), __FILE__, __LINE__);
00171 }catch (Sct::Throwable& e) {
00172 throw;
00173 }catch(std::exception& e){
00174 StdExceptionWrapper sew(e);
00175 throw IoException(sew, __FILE__, __LINE__);
00176 }
00177 }
00178 }