Class: Dataflow::Adapters::MongoDbAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/dataflow/adapters/mongo_db_adapter.rb

Overview

Interface between a data node and mongodb. We use mongodb to perform all the store/retrieve operations.

Constant Summary collapse

SYSTEM_ID =
'_id'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ MongoDbAdapter

Returns a new instance of MongoDbAdapter.



36
37
38
39
40
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 36

def initialize(args)
  update_settings(args)
  @client = MongoDbAdapter.client(settings)
  @admin_client = MongoDbAdapter.admin_client(settings)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



34
35
36
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 34

def client
  @client
end

#settingsObject (readonly)

Returns the value of attribute settings.



33
34
35
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 33

def settings
  @settings
end

Class Method Details

.admin_client(settings) ⇒ Object



19
20
21
22
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 19

def admin_client(settings)
  return @admin_client if @admin_client
  @admin_client = client(settings, db_name: 'admin')
end

.client(settings, db_name: nil) ⇒ Object



10
11
12
13
14
15
16
17
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 10

def client(settings, db_name: nil)
  @clients ||= {}

  settings.adapter_type = 'mongodb'
  connection_uri = settings.connection_uri_or_default
  db_name ||= settings.db_name
  @clients["#{connection_uri}.#{db_name}"] ||= Mongo::Client.new([connection_uri], database: db_name)
end

.disconnect_clientsObject

Force the clients to disconnect their connections. Use before forking.



26
27
28
29
30
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 26

def disconnect_clients
  @clients ||= {}
  @clients.values.each(&:close)
  @clients = {}
end

Instance Method Details

#all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false) ⇒ Object

retrieve all elements from a data node



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 52

def all(where: {}, fields: [], sort: {}, offset: 0, limit: 0, include_system_id: false)
  projection = fields.map { |field| [field, 1] }

  unless include_system_id || fields.map(&:to_s).include?(SYSTEM_ID)
    # by default, do not select the _id field
    projection << [SYSTEM_ID, 0].freeze
  end

  opts = transform_to_query(where)
  res = client[read_dataset_name].find(opts)
  res = res.projection(projection.to_h)

  res = res.sort(sort)   if sort
  res = res.skip(offset) if offset > 0
  res = res.limit(limit) if limit > 0

  if block_given?
    yield res
  else
    res.to_a
  end
end

#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash

Helper that supports paginating through the whole dataset at fixed performance. Unlike using offset/skip which requires to read through the skipped content (high usage of CPU), we use the internal mongo cursor to get batch of results.

Returns:

  • (Hash)

    with 2 fields: data and next_cursor for the next call



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 80

def all_paginated(where: {}, fields: [], cursor: nil)
  cursor = cursor.to_i
  data = []

  # If there is no cursor, we make the initial query
  # get the first batch of data and get the cursor id.
  if cursor.zero?
    all(where: where, fields: fields) do |res|
      results = res.initial_query
      data = results.documents
      cursor = res.cursor.id
    end
  end

  # The first query's result batch is a small 101 set of results
  # so we want to get one more batch of data.
  # However, there might be queries whose results are very small
  # and the resulting cursor is 0. In such case there is no more
  # data to be fetched.
  unless cursor.zero?
    # send a getMore command on the cursor id
    command = { getMore: cursor, collection: read_dataset_name }
    result = client.database.command(command).documents[0]
    cursor = result['cursor']['id']
    data += result['cursor']['nextBatch']
  end

  # We want to return the cursor as a string.
  # If there is no cursor (zero) then make it empty
  cursor = '' if cursor.zero?

  { 'data' => data, 'next_cursor' => cursor.to_s }
rescue Mongo::Error::OperationFailure
  { 'data' => data, 'next_cursor' => '' }
end

#count(where: {}) ⇒ Object

count the number of records



134
135
136
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 134

def count(where: {})
  client[read_dataset_name].count(transform_to_query(where))
end

#create_indexes(dataset: nil, type: :all, drop_retry_on_error: true) ⇒ Object

Create the indexes on this dataset.

Parameters:

  • dataset (String) (defaults to: nil)

    Specify on which dataset the operation will be performed. Default: the adatpter’s settings’ dataset.

  • type (Symbol) (defaults to: :all)

    select which indexes type to create. Can be :all (default), :unique_only, :non_unique_only



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 191

def create_indexes(dataset: nil, type: :all, drop_retry_on_error: true)
  dataset ||= write_dataset_name
  return unless settings.indexes.present?

  indexes = (settings.indexes || [])

  case type
  when :unique_only
    indexes = indexes.select { |idx| idx['unique'] }
  when :non_unique_only
    indexes = indexes.reject { |idx| idx['unique'] }
  end

  indexes = indexes.map { |x| format_index(x) }
  client[dataset].indexes.create_many(indexes)
rescue Mongo::Error::OperationFailure => e
  raise e unless drop_retry_on_error
  client[dataset].indexes.drop_all
  create_indexes(drop_retry_on_error: false)
end

#delete(where: {}) ⇒ Object

Delete records that match the options.

Parameters:

  • where (defaults to: {})

    query to apply on the delete operation.



169
170
171
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 169

def delete(where: {})
  client[read_dataset_name].delete_many(transform_to_query(where))
end

#drop_dataset(dataset) ⇒ Object



181
182
183
184
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 181

def drop_dataset(dataset)
  collection = client[dataset]
  collection.drop
end

#dump(base_folder:, read_dataset_idx:) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 226

def dump(base_folder:, read_dataset_idx:)
  archive_path = "#{base_folder}/#{@settings.db_name}/#{@settings.dataset_name}.#{read_dataset_idx}.gz"
  options = "--archive=#{archive_path} --db=#{@settings.db_name} --collection=#{read_dataset_name} "
  options += "--host=#{@settings.db_host} " if @settings.db_host.present?
  options += "--port=#{@settings.db_port} " if @settings.db_port.present?
  options += "--username=#{@settings.db_user} " if @settings.db_user.present?
  options += "--password=#{@settings.db_password} " if @settings.db_password.present?

  `mkdir -p #{base_folder}/#{@settings.db_name}`
  `mongodump #{options} --gzip`
  archive_path
end

#find(where: {}, fields: [], sort: {}, offset: 0) ⇒ Object

retrieve a single element from a data node



47
48
49
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 47

def find(where: {}, fields: [], sort: {}, offset: 0)
  all(where: where, fields: fields, sort: sort, offset: offset, limit: 1).first
end

#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object

Create queries that permit processing the whole dataset in parallel without using offsets.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 117

def ordered_system_id_queries(batch_size:, where: {})
  ids = all(fields: [SYSTEM_ID], where: where, sort: { SYSTEM_ID => 1 }).map { |x| x[SYSTEM_ID].to_s }
  queries_count = (ids.size / batch_size.to_f).ceil
  Array.new(queries_count) do |i|
    from = ids[i * batch_size]
    to = ids[(i + 1) * batch_size] || ids[-1]
    is_last = i == queries_count - 1

    where_query = { SYSTEM_ID => { '>=' => from } }
    operator = is_last ? '<=' : '<'
    where_query[SYSTEM_ID][operator] = to

    where_query
  end
end

#recreate_dataset(dataset: nil) ⇒ Object

recreate the table/collection



174
175
176
177
178
179
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 174

def recreate_dataset(dataset: nil)
  dataset ||= write_dataset_name
  drop_dataset(dataset)
  collection = client[dataset]
  collection.create
end

#restore(filepath:, dataset_name:) ⇒ Object



239
240
241
242
243
244
245
246
247
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 239

def restore(filepath:, dataset_name:)
  options = "--archive=#{filepath} --db=#{@settings.db_name} --collection=#{dataset_name} "
  options += "--host=#{@settings.db_host} " if @settings.db_host.present?
  options += "--port=#{@settings.db_port} " if @settings.db_port.present?
  options += "--username=#{@settings.db_user} " if @settings.db_user.present?
  options += "--password=#{@settings.db_password} " if @settings.db_password.present?

  `mongorestore #{options} --drop --gzip`
end

#retrieve_dataset_indexes(collection) ⇒ Object



289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 289

def retrieve_dataset_indexes(collection)
  mongo_indexes = client[collection].indexes
  mongo_indexes.map do |idx|
    # skip the default index
    next if idx['key'].keys == ['_id']

    index = { 'key' => idx['key'].keys }
    index['unique'] = true if idx['unique']
    index
  end.compact
rescue Mongo::Error::OperationFailure
  []
end

#save(records:, replace_by: nil) ⇒ Object

Save the given records.

Parameters:

  • replace_by (Array) (defaults to: nil)

    if the replace_by key is provided, it will try to replace records with the matching key, or insert if none is found.



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 142

def save(records:, replace_by: nil)
  if replace_by.present?
    replace_keys = Array(replace_by)
    bulk_ops = records.map do |record|
      filter = replace_keys.map { |x| [x, record[x]] }.to_h
      {
        replace_one: {
          filter: filter,
          replacement: record,
          upsert: true
        }
      }
    end
    client[write_dataset_name].bulk_write(bulk_ops, ordered: false)
  else
    client[write_dataset_name].insert_many(records, ordered: false)
  end
rescue Mongo::Error::BulkWriteError => e
  dup_key_error = e.result['writeErrors'].all? { |x| x['code'] == 11_000 }
  # don't raise if it is errors about duplicated keys
  unless dup_key_error
    raise e
  end
end

#transform_to_query(opts) ⇒ Object



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 249

def transform_to_query(opts)
  sanitized_opts = {}
  opts.each do |k, v|
    if v.is_a? Array
      # e.g. { 'id' => [1,2] } transform to mongodb IN clauses
      sanitized_opts[k] = { '$in' => v.map { |value| try_cast_value(k, value) } }
    elsif v.is_a? Hash
      sanitized_opts[k] = {}
      v.each do |operator, value|
        case operator.to_s
        when '!='
          # we still need to check and transform into
          if value.is_a? Array
            # { '$nin' => [value] }
            sanitized_opts[k]['$nin'] = value.map { |x| try_cast_value(k, x) }
          else
            # or {'$ne' => value }
            sanitized_opts[k]['$ne'] = try_cast_value(k, value)
          end
        when '<'
          sanitized_opts[k]['$lt'] = try_cast_value(k, value)
        when '<='
          sanitized_opts[k]['$lte'] = try_cast_value(k, value)
        when '>'
          sanitized_opts[k]['$gt'] = try_cast_value(k, value)
        when '>='
          sanitized_opts[k]['$gte'] = try_cast_value(k, value)
        when '~*' # match regex /regex/i (case insensitive)
          sanitized_opts[k]['$regex'] = /#{value}/i
        when '~'  # match regex /regex/  (case sensitive)
          sanitized_opts[k]['$regex'] = /#{value}/
        end
      end
    else
      sanitized_opts[k] = try_cast_value(k, v)
    end
  end
  sanitized_opts
end

#update_settings(args) ⇒ Object



42
43
44
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 42

def update_settings(args)
  @settings = Dataflow::Adapters::Settings.new(args)
end

#usage(dataset:) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 212

def usage(dataset:)
  command = { collstats: dataset }
  result = client.database.command(command).documents[0]
  {
    memory: result['size'],
    storage: result['storageSize'],
  }
rescue Mongo::Error::OperationFailure, Mongo::Error::InvalidCollectionName
  {
    memory: 0,
    storage: 0,
  }
end