Class: Dataflow::Adapters::SqlAdapter
- Inherits:
-
Object
- Object
- Dataflow::Adapters::SqlAdapter
- Defined in:
- lib/dataflow/adapters/sql_adapter.rb
Overview
Interface between a data node and mongodb. We use mongodb to perform all the store/retrieve operations.
Direct Known Subclasses
Constant Summary collapse
- SYSTEM_ID =
:_id
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Class Method Summary collapse
-
.add_extensions(settings, db) ⇒ Object
load Sequel extensions based on the type.
-
.client(settings) ⇒ Sequel::Database
Get (or create) a client that satisfies the given connection settings.
-
.disconnect_clients ⇒ Object
Force the clients to disconnect their connections.
-
.try_create_db(uri, db_name) ⇒ Boolean
Used internally to try to create the DB automatically.
Instance Method Summary collapse
-
#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false) ⇒ Object
retrieve all elements from a data node.
- #all_paginated(where: {}, fields: [], cursor: nil) ⇒ Object
-
#count(where: {}) ⇒ Object
count the number of records.
-
#create_indexes(dataset: nil, type: :all) ⇒ Object
Create the indexes on this dataset.
-
#delete(where: {}) ⇒ Object
Delete records that match the options.
-
#drop_dataset(dataset) ⇒ Object
drops the given dataset.
-
#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object
retrieve a single element from a data node.
-
#initialize(args) ⇒ SqlAdapter
constructor
A new instance of SqlAdapter.
-
#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object
Create queries that permit processing the whole dataset in parallel without using offsets.
-
#recreate_dataset(dataset: nil) ⇒ Object
recreate the table/collection.
- #retrieve_dataset_indexes(dataset_name) ⇒ Object
-
#save(records:, replace_by: nil) ⇒ Object
Save the given records.
- #transform_to_query(opts) ⇒ Object
- #update_settings(args) ⇒ Object
Constructor Details
#initialize(args) ⇒ SqlAdapter
Returns a new instance of SqlAdapter.
65 66 67 68 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 65 def initialize(args) update_settings(args) @client = SqlAdapter.client(settings) end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
63 64 65 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 63 def client @client end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
62 63 64 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 62 def settings @settings end |
Class Method Details
.add_extensions(settings, db) ⇒ Object
load Sequel extensions based on the type
43 44 45 46 47 48 49 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 43 def add_extensions(settings, db) if settings.adapter_type == 'postgresql' db.extension(:pg_array) # db.extension(:pg_json) db.extension(:pg_loose_count) end end |
.client(settings) ⇒ Sequel::Database
Get (or create) a client that satisfies the given connection settings.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 12 def client(settings) @clients ||= {} connection_uri = settings.connection_uri_or_default full_uri = "#{connection_uri}/#{settings.db_name}?encoding=utf8" return @clients[full_uri] if @clients[full_uri].present? # first, make sure the DB is created (if it is not an external db) is_external_db = settings.connection_uri.present? try_create_db(connection_uri, settings.db_name) unless is_external_db # then, create the connection object db = Sequel.connect(full_uri) add_extensions(settings, db) @clients[full_uri] = db end |
.disconnect_clients ⇒ Object
Force the clients to disconnect their connections. Use before forking.
53 54 55 56 57 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 53 def disconnect_clients @clients ||= {} @clients.values.each(&:disconnect) @clients = {} end |
.try_create_db(uri, db_name) ⇒ Boolean
Used internally to try to create the DB automatically.
32 33 34 35 36 37 38 39 40 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 32 def try_create_db(uri, db_name) Sequel.connect(uri) do |db| db.run("CREATE DATABASE #{db_name}") true end rescue Sequel::DatabaseError => e # ignore error false end |
Instance Method Details
#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false) ⇒ Object
retrieve all elements from a data node
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 81 def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false) res = client[settings.read_dataset_name.to_sym] # if there is no fields, automatically # select all the fields expect the system _id if fields.blank? fields = res.columns fields = fields.reject { |x| x == SYSTEM_ID } unless include_system_id end res = res.select(*fields.map(&:to_sym)) if fields.present? res = apply_query(res, where) (sort || {}).each do |k, v| sort_value = v == 1 ? k.to_sym : Sequel.desc(k.to_sym) res = res.order_append(sort_value) end res = res.offset(offset) if offset > 0 res = res.limit(limit) if limit > 0 if block_given? yield res else res.to_a end end |
#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Object
109 110 111 112 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 109 def all_paginated(where: {}, fields: [], cursor: nil) # for now, retrieve all records at once { 'data' => all(where: where, fields: fields), 'next_cursor' => '' } end |
#count(where: {}) ⇒ Object
count the number of records
132 133 134 135 136 137 138 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 132 def count(where: {}) res = client[settings.read_dataset_name.to_sym] res = apply_query(res, where) res.count rescue Sequel::DatabaseError 0 end |
#create_indexes(dataset: nil, type: :all) ⇒ Object
Create the indexes on this dataset. TODO: add support for a :drop_retry_on_error parameter.
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 197 def create_indexes(dataset: nil, type: :all) dataset ||= settings.write_dataset_name dataset = dataset.to_sym indexes = (settings.indexes || []) case type when :unique_only indexes = indexes.select { |idx| idx['unique'] } when :non_unique_only indexes = indexes.reject { |idx| idx['unique'] } end indexes.each do |index| params = index_parameters(index) begin client.add_index(dataset, *params) rescue Sequel::DatabaseError => e # ignore index already exists next if e.wrapped_exception.is_a?(PG::DuplicateTable) # log columns not found but do not raise an error if e.wrapped_exception.is_a?(PG::UndefinedColumn) logger.error(custom_message: "add_index on #{dataset} failed.", error: e) next end # re-raise for everything else raise e end end end |
#delete(where: {}) ⇒ Object
this deletes on the read dataset
Delete records that match the options. i.e. changes are seen immediately in the case of double buffered datasets
173 174 175 176 177 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 173 def delete(where: {}) res = client[settings.read_dataset_name.to_sym] res = apply_query(res, where) res.delete end |
#drop_dataset(dataset) ⇒ Object
drops the given dataset
187 188 189 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 187 def drop_dataset(dataset) client.drop_table?(dataset) end |
#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object
retrieve a single element from a data node
76 77 78 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 76 def find(where: {}, fields: [], sort: {}, offset: 0) all(where: where, fields: fields, sort: sort, offset: offset, limit: 1).first end |
#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object
Create queries that permit processing the whole dataset in parallel without using offsets.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 115 def ordered_system_id_queries(batch_size:, where: {}) ids = all(fields: [SYSTEM_ID], where: where, sort: { SYSTEM_ID => 1 }).map { |x| x[SYSTEM_ID] } queries_count = (ids.size / batch_size.to_f).ceil Array.new(queries_count) do |i| from = ids[i * batch_size] to = ids[(i + 1) * batch_size] || ids[-1] is_last = i == queries_count - 1 where_query = { SYSTEM_ID => { '>=' => from } } operator = is_last ? '<=' : '<' where_query[SYSTEM_ID][operator] = to where_query end end |
#recreate_dataset(dataset: nil) ⇒ Object
recreate the table/collection
180 181 182 183 184 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 180 def recreate_dataset(dataset: nil) dataset ||= settings.write_dataset_name.to_sym drop_dataset(dataset) create_table(dataset, @schema, logger) end |
#retrieve_dataset_indexes(dataset_name) ⇒ Object
260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 260 def retrieve_dataset_indexes(dataset_name) psql_indexes = client.indexes(dataset_name) psql_indexes.values.map do |idx| cols = idx[:columns].map(&:to_s) index = { 'key' => cols } index['unique'] = true if idx[:unique] index end.compact rescue Sequel::DatabaseError [] end |
#save(records:, replace_by: nil) ⇒ Object
Save the given records
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 145 def save(records:, replace_by: nil) dataset_name = settings.write_dataset_name.to_sym dataset = client[dataset_name] columns = dataset.columns.reject { |x| x == SYSTEM_ID } tabular_data = records.map do |record| columns.map { |col| record[col] } end if replace_by.present? index_keys = Array(replace_by).map { |c| c.to_sym}.uniq # On conflict update every field. On Postgresql we can refer # to the "conflicting" rows using the "excluded_" prefix: update_clause = columns.map { |k| [k, Sequel.qualify('excluded', k)] }.to_h dataset .insert_conflict(target: index_keys, update: update_clause) .import(columns, tabular_data) else # ignore insert conflicts dataset.insert_ignore.import(columns, tabular_data) end end |
#transform_to_query(opts) ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/dataflow/adapters/sql_adapter.rb', line 230 def transform_to_query(opts) # map to a serie of AND clauses queries opts.flat_map do |k, v| if v.is_a? Hash v.map do |operator, value| case operator when '!=' if value.is_a? Array Sequel.lit("#{k} NOT IN ?", value) else Sequel.lit("#{k} <> ?", value) end when '<', '<=', '>', '>=' Sequel.lit("#{k} #{operator} ?", value) when '@>', '<@' Sequel.lit("#{k} #{operator} ?", Sequel.pg_array(Array(value))) when '~' Sequel.lit("#{k} #{regex_case_senstive_op} ?", value) when '~*' Sequel.lit("#{k} #{regex_case_insensitive_op} ?", value) end end else # e.g. simple match { 'id' => 1} or IN clauses { 'id' => [1,2] } # are supported with simples hashes [[{ k.to_sym => v }]] end end end |