Class: Dataflow::Nodes::DataNode

Inherits:
Object
  • Object
show all
Includes:
EventMixin, Dataflow::Node, PropertiesMixin, SchemaMixin, Mongoid::Document
Defined in:
lib/dataflow/nodes/data_node.rb

Overview

Data nodes are used to build a data computing/transformation graph. At each step we can save the results to a (temp) table.

Nodes::DataNode represents one of the data nodes. It is meant to be treated as an interface and should not be used directly.

Direct Known Subclasses

ReadOnlyDataNode, SnapshotNode, UpsertNode

Constant Summary

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Instance Method Summary collapse

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Methods included from Dataflow::Node

#all_dependencies, find, #metadata, #recompute, #valid_for_computation?, #validate!

Instance Method Details

#add(records:) ⇒ Object

Adds the given records to the dataset and updates the updated_at time.

Parameters:

  • records (Array)

    an array of the records to be added.

Raises:

  • (ArgumentError)


187
188
189
190
191
192
193
194
# File 'lib/dataflow/nodes/data_node.rb', line 187

def add(records:)
  raise ArgumentError, "records must be an array of documents. Received: '#{records.class}'." unless records.is_a?(Array)
  records = records.compact
  return if records.blank?
  db_adapter.save(records: records)
  self.updated_at = Time.now
  save!
end

#all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, include_system_id: false) {|db_client| ... } ⇒ Object

Returns all the records from a dataset that match the options.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.

  • fields (Array) (defaults to: [])

    Array of strings representing which fields to include. e.g.: [‘id’, ‘updated_at’] will only return these two fields.

  • sort (Hash) (defaults to: {})

    represents the sorting of the returned dataset. e.g. { ‘id’ => 1, ‘updated_at’ => -1 } will sort by id ASC and by updated_at DESC.

  • limit (Integer) (defaults to: 0)

    limits the amount of records returned.

  • offset (Integer) (defaults to: 0)

    starting offset of the records returned. Use with limit to implement pagination.

Yields:

  • (db_client)

    When a block is passed, yields the db client on which .each can be called to stream the results rather than load everything in memory. Other methods can also be called depending on the backend, the downside being back-end portability (use at your own risk).



147
148
149
# File 'lib/dataflow/nodes/data_node.rb', line 147

def all(where: {}, fields: [], sort: {}, limit: 0, offset: 0, include_system_id: false, &block)
  db_adapter.all(where: where, fields: fields, sort: sort, limit: limit, offset: offset, include_system_id: include_system_id, &block)
end

#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash

Supports paginating efficiently through the dataset.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record. IMPORTANT: do not use the system id in the query. It will be overwritten.

  • fields (Array) (defaults to: [])

    Array of strings representing which fields to include. e.g.: [‘id’, ‘updated_at’] will only return these two fields.

  • limit (Integer)

    limits the amount of records returned.

  • cursor (String) (defaults to: nil)

    indicates from which page should the results be returned.

Returns:

  • (Hash)

    with 2 fields:

    • data [Array] that contains the fetched records

    • next_cursor [String] a string to pass into the sub-sequent

      calls to fetch the next page of the data
      


164
165
166
# File 'lib/dataflow/nodes/data_node.rb', line 164

def all_paginated(where: {}, fields: [], cursor: nil)
  db_adapter.all_paginated(where: where, fields: fields, cursor: cursor)
end

#clear(where: {}) ⇒ Object

Clear the data that matches the options.



197
198
199
# File 'lib/dataflow/nodes/data_node.rb', line 197

def clear(where: {})
  db_adapter.delete(where: where)
end

#count(where: {}) ⇒ Integer

Counts how many records matches the condition or all if no condition is given.

Returns:

  • (Integer)

    the record count.



181
182
183
# File 'lib/dataflow/nodes/data_node.rb', line 181

def count(where: {})
  db_adapter.count(where: where)
end

#create_non_unique_indexes(dataset_type: :read) ⇒ Object

Applies non-unique indexes on the dataset. For performance reasons, these indexes are best applied after adding data (especially on large import operations).



229
230
231
232
# File 'lib/dataflow/nodes/data_node.rb', line 229

def create_non_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :non_unique_only)
end

#create_unique_indexes(dataset_type: :read) ⇒ Object

Applies unique indexes on the dataset. As this will be enforcing constraints, it is best applied before adding any data.

Parameters:

  • dataset_type (Symbol) (defaults to: :read)

    select which dataset to recreate. Can :read or :write.



221
222
223
224
# File 'lib/dataflow/nodes/data_node.rb', line 221

def create_unique_indexes(dataset_type: :read)
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.create_indexes(dataset: dataset, type: :unique_only)
end

#db_indexes(write_dataset: false) ⇒ Object



307
308
309
310
# File 'lib/dataflow/nodes/data_node.rb', line 307

def db_indexes(write_dataset: false)
  dataset = write_dataset ? write_dataset_name : read_dataset_name
  db_adapter.retrieve_dataset_indexes(dataset)
end

#drop_dataset!Object



343
344
345
346
347
# File 'lib/dataflow/nodes/data_node.rb', line 343

def drop_dataset!
  db_adapter.drop_dataset(write_dataset_name)
  return unless use_double_buffering
  db_adapter.drop_dataset(read_dataset_name)
end

#dump_dataset(base_folder: './dump') ⇒ String

Dump a backup of this dataset to a file.

Returns:

  • (String)

    the filepath to the dump file. The filename is formatted as <node_name>.<read_dataset_idx>.<ext>



352
353
354
355
356
357
# File 'lib/dataflow/nodes/data_node.rb', line 352

def dump_dataset(base_folder: './dump')
  read_idx = 0
  read_idx = read_dataset_idx if use_double_buffering

  db_adapter.dump(base_folder: base_folder, read_dataset_idx: read_idx)
end

#explain_update(depth: 0, verbose: false) ⇒ Object



320
321
322
# File 'lib/dataflow/nodes/data_node.rb', line 320

def explain_update(depth: 0, verbose: false)
  logger.log("#{'>' * (depth + 1)} #{name} [Dataset] | UPDATED = #{updated_at}")
end

#export(connection_opts: { db_backend: :csv }, keys: [], where: {}) ⇒ Object



278
279
280
281
282
283
284
285
286
287
# File 'lib/dataflow/nodes/data_node.rb', line 278

def export(connection_opts: { db_backend: :csv }, keys: [], where: {})
  on_export_started(connection_opts: connection_opts, keys: keys)
  # instanciate and export without saving anything
  Export::ToCsvNode.new(
    dependency_ids: [self],
    query: where.to_json,
    keys: keys
  ).compute_impl
  on_export_finished
end

#find(where: {}) ⇒ Hash

Finds and return from the dataset, based on the given options.

Parameters:

  • where (Hash) (defaults to: {})

    the condition to apply for retrieving the element. e.g.: { ‘id’ => 1 } will fetch a record with the id 1. An empty option hash will retrieve any record.

Returns:

  • (Hash)

    returns a single record from the dataset.



127
128
129
# File 'lib/dataflow/nodes/data_node.rb', line 127

def find(where: {})
  db_adapter.find(where: where)
end

#handle_dataset_settings_changedObject

When the dataset properties changed notify the adapter to handle the new settings.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/dataflow/nodes/data_node.rb', line 107

def handle_dataset_settings_changed
  db_adapter.update_settings(data_node: self)

  # if we're using double buffering, just wait for the next buffer
  # to be created to apply the changes.
  return if use_double_buffering

  # recreate the dataset if there is no data
  if db_adapter.count.zero?
    db_adapter.recreate_dataset(dataset: read_dataset_name)
  end

  db_adapter.create_indexes(dataset: read_dataset_name)
end

#import(connection_opts: {}, keys: nil) ⇒ Object



272
273
274
275
276
# File 'lib/dataflow/nodes/data_node.rb', line 272

def import(connection_opts: {}, keys: nil)
  importer = db_adapter(connection_opts)
  records = importer.all
  add(records: records)
end

#info(write_dataset: false) ⇒ Object

retrieves some informations about this node and its usage



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/dataflow/nodes/data_node.rb', line 290

def info(write_dataset: false)
  dataset = write_dataset ? write_dataset_name : read_dataset_name
  usage = db_adapter.usage(dataset: dataset)
  {
    name: name,
    type: self.class.to_s,
    dataset: dataset,
    db_backend: db_backend,
    updated_at: updated_at,
    record_count: count,
    indexes: indexes,
    db_indexes: db_indexes(write_dataset: write_dataset),
    mem_usage: usage[:memory],
    storage_usage: usage[:storage]
  }
end

#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object

Return a list of order (ASC) system IDs. These can be used to process the dataset in parallel by querying on a sub-section: queries = node.ordered_system_id_queries Parallel.each(queries) do |query|

process(node.all(where: query))

end

Parameters:

  • batch_size (Integer)

    how many IDs to select per query.



175
176
177
# File 'lib/dataflow/nodes/data_node.rb', line 175

def ordered_system_id_queries(batch_size:, where: {})
  db_adapter.ordered_system_id_queries(batch_size: batch_size, where: {})
end

#read_dataset_nameObject



234
235
236
237
238
239
240
241
242
# File 'lib/dataflow/nodes/data_node.rb', line 234

def read_dataset_name
  return @temporary_read_dataset if @temporary_read_dataset

  if use_double_buffering
    "#{name}_#{double_buffer_prefix}#{read_dataset_idx}"
  else
    name
  end
end

#read_dataset_name=(dataset) ⇒ Object

Use to select from which dataset you want to read. A possible use case is to read from an old dataset name.

Parameters:

  • dataset (String)

    the dataset name from where to read from. It must be a valid dataset name for the current settings.



256
257
258
259
260
261
# File 'lib/dataflow/nodes/data_node.rb', line 256

def read_dataset_name=(dataset)
  return unless valid_dataset_names.include?(dataset)
  @temporary_read_dataset = dataset
  db_adapter.update_settings(data_node: self)
  dataset
end

#recreate_dataset(dataset_type: :read) ⇒ Object

Recreates a dataset.

Parameters:

  • dataset_type (Symbol) (defaults to: :read)

    select which dataset to recreate. Can :read or :write.



210
211
212
213
214
# File 'lib/dataflow/nodes/data_node.rb', line 210

def recreate_dataset(dataset_type: :read)
  # fetch the proper dataset name
  dataset = send("#{dataset_type}_dataset_name")
  db_adapter.recreate_dataset(dataset: dataset)
end

#required_byObject



324
325
326
327
328
# File 'lib/dataflow/nodes/data_node.rb', line 324

def required_by
  super + Dataflow::Nodes::ComputeNode.where(data_node_id: _id).map do |node|
    { node: node, type: 'dataset' }
  end
end

#restore_dataset(filepath:) ⇒ Object

Restore a dump of this dataset

Parameters:

  • files (String)

    the filepath to the dump file. The filename has to be formatted as <node_name>.<read_dataset_idx>.<ext>



362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# File 'lib/dataflow/nodes/data_node.rb', line 362

def restore_dataset(filepath:)
  filename = filepath.split('/')[-1]
  read_idx = if filename.count('.') < 2
               # for compatibility reasons: previously we were not
               # exporting the read idx in the filename
               0
             else
               filename.split('.')[1].to_i
             end

  raise "Called #restore_dataset with incompatible datasets settings: #{filepath} contains a single buffer dataset but node '#{name}' is expecting a double buffered one." if read_idx == 0 && use_double_buffering
  raise "Called #restore_dataset with incompatible datasets settings: #{filepath} contains a double buffer dataset but node '#{name}' is expecting a single buffered one." if read_idx > 0 && !use_double_buffering

  if use_double_buffering
    dataset_name = dataset_name_for_buffer(read_idx)
  else
    dataset_name = name
  end

  db_adapter.restore(filepath: filepath, dataset_name: dataset_name)
  self.read_dataset_idx = read_idx
  save

  db_adapter.update_settings(data_node: self)

  true
end

#safely_clear_write_datasetObject

this is not safe if there is some parallel processing going on



331
332
333
334
335
336
337
338
339
340
341
# File 'lib/dataflow/nodes/data_node.rb', line 331

def safely_clear_write_dataset
  # we can only clear the write dataset if we're using double buffering
  return unless use_double_buffering
  # check if there is any node that is currently computing to this dataset
  used_by = required_by.select { |x| x[:type] == 'dataset' && x[:node].locked_for_computing? }
  return if used_by.present?

  logger.log("Dropping #{db_name}.#{write_dataset_name} on #{db_backend}.")
  # TODO: lock the node?
  db_adapter.drop_dataset(write_dataset_name)
end

#set_defaultsObject

Sets the default parameters before creating the object.



82
83
84
85
86
87
88
89
90
91
# File 'lib/dataflow/nodes/data_node.rb', line 82

def set_defaults
  self.schema = schema || {}

  # Use the schema as the inferred schema if none is provided.
  # This useful when there is no need to infer schemas (e.g. in SQL)
  self.inferred_schema ||= schema

  # This is needed for the flow to compute properly
  self.updated_at = Time.now
end

#swap_read_write_datasets!Object



263
264
265
266
267
268
269
270
# File 'lib/dataflow/nodes/data_node.rb', line 263

def swap_read_write_datasets!
  raise Dataflow::Errors::InvalidConfigurationError, '#swap_read_write_dataset_names! called on "#{self.name}" but "use_double_buffering" is not activated.' unless use_double_buffering
  tmp = read_dataset_idx
  self.read_dataset_idx = write_dataset_idx
  self.write_dataset_idx = tmp
  db_adapter.update_settings(data_node: self)
  save!
end

#update_schema(sch) ⇒ Object

Update this node’s schema.



202
203
204
205
# File 'lib/dataflow/nodes/data_node.rb', line 202

def update_schema(sch)
  self.schema = sch
  db_adapter.update_settings(data_node: self)
end

#updated?Boolean

Returns:

  • (Boolean)


316
317
318
# File 'lib/dataflow/nodes/data_node.rb', line 316

def updated?
  true
end

#use_symbols?Boolean

Returns:

  • (Boolean)


312
313
314
# File 'lib/dataflow/nodes/data_node.rb', line 312

def use_symbols?
  (db_backend.to_s =~ /sql/).present?
end

#write_dataset_nameObject



244
245
246
247
248
249
250
# File 'lib/dataflow/nodes/data_node.rb', line 244

def write_dataset_name
  if use_double_buffering
    "#{name}_#{double_buffer_prefix}#{write_dataset_idx}"
  else
    name
  end
end