Trading System API  3.0
Library for Simulating and Deploying Trading and Investment Strategies
tsa_pg_database.h
1 
2 /* ===================================================================
3 *
4 * T R A D I N G S Y S T E M A P I ™
5 * Copyright © 1999 - 2014 by Peter Ritter ( TradingSystemAPI.com )
6 * A L L R I G H T S R E S E R V E D
7 *
8 * Consult your license regarding permissions and restrictions.
9 * You may obtain a copy of the License at:
10 * http://www.TradingSystemAPI.com/licenses/LICENSE-TSAPI-3.0.html
11 *
12 * ====================================================================
13 */
14 
15 #ifndef TSA_POSTGRES_CONN__INCLUDED
16 #define TSA_POSTGRES_CONN__INCLUDED
17 
18 #include <string>
19 #include <queue>
20 #include <vector>
21 #include <thread>
22 
23 #include "tsa.h"
24 
25 #include "libpq-fe.h"
26 #include "pqxx/pqxx"
27 
28 
29 namespace tsa {
30 
33 
35  namespace postgres {
36 
38  struct pg_type {
39  static const pqxx::oid int_8byte = 20; //INT8OID
40  static const pqxx::oid int_4byte = 23; //INT4OID
41  static const pqxx::oid float_8byte = 701; //FLOAT8OID
42  static const pqxx::oid float_4byte = 700; //FLOAT4OID
43  static const pqxx::oid boolean = 16; //BOOLOID
44  static const pqxx::oid datetime = 1114; //TIMESTAMPOID
45  static const pqxx::oid date = 1082; //DATEOID
46  static const pqxx::oid varchar = 1043; //VARCHAROID
47 
48  static pqxx::oid to_equivalent_OID(tsa::type_t);
49  };
50 
51  /*
52  ** ====================================================
53  ** >>> class postgres::database <<<
54  ** ====================================================
55  */
56 
61  class dll_export database : public tsa::data_base {
62  tsa_declare_testable;
63  private:
64  struct record_to_write {
65  std::string table_name;
66  std::vector<std::string> field_names;
67  std::vector<tsa::variant> values;
68  };
69  std::queue<record_to_write> m_records_to_write;
70  public:
71  bool m_is_running = false;
72  bool m_stop_now = false;
73  bool m_verbose = false;
74  void thread_fn__write_records_in_queue();
75  std::thread m_write_rec_thread;
76  bool m_thread_fn__once_only = false;
77  protected:
78  mutable std::mutex m_mutex;
79  pqxx::connection* m_db_connection_ptr=nullptr;
80  mutable std::mutex m_PG_mutex;
81  PGconn* m_PGconn_ptr = nullptr;
82  std::string m_conn_string;
83  void connect_PGconn(const std::string&);
84  public:
85  database(void);
86  ~database(void);
87  void connect(const std::string&);
88  protected:
89  void verify_connected(void)const;
90  private:
91  std::string m_connection_string;
92  public:
93  void connection_string(const std::string& cs)
94  { m_connection_string = cs; }
95  bool is_running(void)const;
96  bool is_connected(void) const;
97  virtual void start_threads(void);
98  virtual void stop_threads(void);
99  public: //Writing records
100  void insert_record__async(const std::string& table_name,
101  const std::vector<tsa::variant>& values);
102  void insert_record__async(const std::string& table_name,
103  const std::vector<std::string>& field_names,
104  const std::vector<tsa::variant>& values);
105  // Writes record to database. Any exceptions must be caught at a higher level.
106  void insert_record(
107  const std::string& table_name,
108  const std::vector<tsa::variant>& values);
109  void insert_record(
110  const std::string& table_name,
111  const std::vector<std::string>& field_names,
112  const std::vector<tsa::variant>& values);
113 
114  // Writes record to database. Any errors are written to cerr
115  void insert_record__no_exception(
116  const std::string& table_name,
117  const std::vector<tsa::variant>& values);
118  // Writes a timeseries record (with timestamp)
119  // Timestamp must be in first field
120  void append_series_record(
121  const std::string& table_name,
122  const std::vector<tsa::variant>& values);
123 
124  void update_record(
125  const std::string& table_name,
126  const std::string& where_clause,
127  //const std::string& primary_key_value,
128  const std::vector<std::string>& fields_to_update,
129  const std::vector<tsa::variant>& new_values);
130  public:
131  //table_info_t table_info(const std::string& table_name);
132  public:
133  pqxx::result execute__no_lock(const std::string& cmd) const;
134  pqxx::result execute(const std::string& cmd) const;
135  tsa::variant execute_scalar(const std::string& cmd) const;
136  public:
137  bool drop_table(const std::string& schema, const std::string& table_name);
138  bool drop_table(const std::string& table_name);
139  virtual bool table_record_count_is_known(void)const override;
140  int64_t table_record_count(const std::string& table_name);
141  virtual bool table_exists(const std::string& table_name)const override; //tsa::data_base
142  bool table_exists(const std::string& schema_name,
143  const std::string& table_name, bool lock_mutex = true)const;
144  bool create_table(const std::string& table_name, const std::string& fields_info);
145  private:
146  bool table_exists__no_lock(const std::string& table_name);
147  public:
148 
149  void create_series_table(
150  const std::string& table_name, const tsa::column_defs& table_def); //TS
151 
152  virtual void series_append(const std::string& table_name, const tsa::mem_table& data);
153  void series_insert(const std::string& table_name, const std::string& data_buff);
154 
155  virtual std::string type(void)const override;
156  virtual tsa::column_defs series_table_column_defs(const std::string& table_name)override;
157 
158  virtual void series_load(const std::string& table_name, tsa::mem_table& data,
159  const date_time& from_timestamp, size_t num_records)override;
160  //virtual void series_append(const std::string& table_name, tsa::mem_table& data)override;
161 
162  virtual date_time series_table_first_timestamp(const std::string& table_name)override;
163  virtual date_time series_table_last_timestamp(const std::string& table_name)override;
164 
165  virtual bool table_has_data(const std::string& table_name)override;
166  virtual date_time series_table_closest_timestamp_LOE(const date_time& ts,
167  const std::string& table_name)override;
168  virtual void truncate_series_table(const std::string& table_name,
169  const date_time& new_max_timestamp = date_time::min)override;
170  virtual bool drop_series_table(const std::string& table_name)override;
171  public:
172  void series_load(const std::string& table_name, tsa::mem_table& data);
173  void clear_table(const std::string& table_name);
174  public:
175  virtual out_stream_adaptor* make_output_stream_adaptor(void) override;
176  virtual in_stream_adaptor* database::make_input_stream_adaptor(const std::string& stream)override;
177  };
178 
179  /*
180  ** =========================================================
181  ** > > > class postgres_writer_out_stream_adaptor < < <
182  ** =========================================================
183  */
184 
185  //Internal
186  class postgres_writer_out_stream_adaptor
187  : public tsa::out_stream_adaptor, public tsa::object {
188  tsa_declare_testable;
189  private:
190  tsa::mem_table m_mt;
191  postgres::database* m_db_ptr = nullptr;
192  std::string m_table_name;
193  std::string m_buff;
194  size_t m_flush_target_rec_count = (size_t)5e4;
195  size_t m_rec_count = 0;
196  size_t m_flush_size = (size_t)1e6;
197  tsa::date_time m_most_recent_timestamp;
198  public:
199  virtual ~postgres_writer_out_stream_adaptor(void) { ; }
200  postgres_writer_out_stream_adaptor(postgres::database*);
201  virtual void osa__append(const date_time& time_stamp,
202  std::vector<variant>& record) override;
203  virtual void osa__prepare_stream(const std::string& target_name,
204  const column_defs& field_list/*, rec_stream_open_flag open_flag*/) override;
205  virtual void cmp__finalize(void) override;
206  void reset_buffer(size_t num_records);
207  };
208 
209  }//postgres
210 
212 
213 }//tsa
214 
215 
216 #endif
217 
218 
Namespace for the &#39;Trading System API&#39; library.
Definition: original1.TSA3Core.cpp:20
Class representing a Postgresql database. Allows ts-api to read and write series to and from a Postgr...
Definition: tsa_pg_database.h:61
Base class for all other database classes such as:
Definition: TSADBBase.h:47
Defines the columnar structure of a table. Each column has a name, data type and field size...
Definition: TSADataDef.h:88
Parent class for &#39;in-stream adaptors&#39;. in_stream object rely on adaptors for access to underlying dat...
Definition: TSAStreams.h:52
Abstract base class for delegates of class out_stream.
Definition: TSAStreams.h:553
types
Definition: tsa_pg_database.h:38
static const date_time min
Constant representing the smallest allowed date_time.
Definition: TSATime.h:449
variant objects can represent values of different types.
Definition: TSAVariant.h:140
_value_types_type
Data type enumeration used throughout the library. Intended to be used via type_t.
Definition: TSATypeDef.h:166
Parent class for many library classes.
Definition: TSATypeDef.h:462
A date of the Gregorian calendar.
Definition: TSATime.h:119
Class representing a database record.
Definition: TSADBRecord.h:52
Class representing a gregorian-date and time-of-day combination. The time component has microsecond r...
Definition: TSATime.h:428
Class mem_table represents a memory based table. mem_table objects can be used in strategies both for...
Definition: TSAMemTable.h:48