Class: Dataflow::Nodes::SnapshotNode

Inherits:
DataNode
  • Object
show all
Includes:
Mixin::AddInternalTimestamp, Mixin::RenameDottedFields
Defined in:
lib/dataflow/nodes/snapshot_node.rb

Overview

TODO: extend the unique node?

Constant Summary

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Instance Method Summary collapse

Methods included from Mixin::AddInternalTimestamp

#add_internal_timestamp, included

Methods included from Mixin::RenameDottedFields

included, #rename_dotted_fields, #traverse_and_rename_dotted_fields, #traverse_and_rename_dotted_fields_in_array

Methods inherited from DataNode

#all, #all_paginated, #clear, #count, #create_non_unique_indexes, #create_unique_indexes, #db_indexes, #drop_dataset!, #dump_dataset, #explain_update, #export, #find, #handle_dataset_settings_changed, #import, #info, #ordered_system_id_queries, #read_dataset_name, #read_dataset_name=, #recreate_dataset, #required_by, #restore_dataset, #safely_clear_write_dataset, #swap_read_write_datasets!, #update_schema, #updated?, #use_symbols?, #write_dataset_name

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Methods included from Dataflow::Node

#all_dependencies, find, #metadata, #recompute, #required_by, #valid_for_computation?, #validate!

Instance Method Details

#add(records:) ⇒ Object

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/dataflow/nodes/snapshot_node.rb', line 32

def add(records:)
  raise ArgumentError, "records must be an array of documents. Received: '#{records.class}'." unless records.is_a?(Array)
  records = records.compact
  return if records.blank?

  # TODO: create a chain of behavior "before add"
  rename_dotted_fields(records: records)
  add_internal_timestamp(records: records)

  records.delete_if do |record|
    convert_update_at_key(record)
    is_record_redundant?(record: record)
  end.compact
  super(records: records)
end

#set_defaultsObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/dataflow/nodes/snapshot_node.rb', line 16

def set_defaults
  super

  self.indexes ||= []
  # get rid of keys/string confusion
  self.indexes = JSON.parse(self.indexes.to_json)

  # add keys for the index, updated_at and unique keys
  self.indexes += [{ 'key' => index_key }] if index_key
  self.indexes += [{ 'key' => updated_at_key }] if updated_at_key
  self.indexes += [{ 'key' => [index_key, updated_at_key], 'unique' => true }] if index_key && updated_at_key
  self.indexes.uniq!

  self.updated_at ||= Time.now
end