Class: Dataflow::Nodes::UpsertNode

Inherits:
DataNode
  • Object
show all
Includes:
Mixin::AddInternalTimestamp, Mixin::RenameDottedFields
Defined in:
lib/dataflow/nodes/upsert_node.rb

Constant Summary

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Instance Method Summary collapse

Methods included from Mixin::AddInternalTimestamp

#add_internal_timestamp, included

Methods included from Mixin::RenameDottedFields

included, #rename_dotted_fields, #traverse_and_rename_dotted_fields, #traverse_and_rename_dotted_fields_in_array

Methods inherited from DataNode

#all, #all_paginated, #clear, #count, #create_non_unique_indexes, #create_unique_indexes, #db_indexes, #drop_dataset!, #dump_dataset, #explain_update, #export, #find, #handle_dataset_settings_changed, #import, #info, #ordered_system_id_queries, #read_dataset_name, #read_dataset_name=, #recreate_dataset, #required_by, #restore_dataset, #safely_clear_write_dataset, #swap_read_write_datasets!, #update_schema, #updated?, #use_symbols?, #write_dataset_name

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Methods included from Dataflow::Node

#all_dependencies, find, #metadata, #recompute, #required_by, #valid_for_computation?, #validate!

Instance Method Details

#add(records:) ⇒ Object

Raises:

  • (ArgumentError)


44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/dataflow/nodes/upsert_node.rb', line 44

def add(records:)
  raise ArgumentError, "records must be an array of documents. Received: '#{records.class}'." unless records.is_a?(Array)
  records = records.compact
  return if records.blank?

  # TODO: create a chain of behavior "before add"
  rename_dotted_fields(records: records)
  add_internal_timestamp(records: records)

  db_adapter.save(records: records, replace_by: index_key)
  self.updated_at = Time.now
  save!
end

#set_defaultsObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/dataflow/nodes/upsert_node.rb', line 16

def set_defaults
  super

  self.indexes ||= []
  # get rid of keys/string confusion
  self.indexes = JSON.parse(self.indexes.to_json)

  # if there is no index_key, take the first unique index
  if index_key.blank?
    first_unique_index = self.indexes.find { |x| x['unique'] }
    self.index_key = (first_unique_index || {})['key']
  end

  # add keys for the unique index keys
  if index_key.present?
    auto_generated_indexes = [{ 'key' => index_key, 'unique' => true }]

    if index_key.is_a? Array
      # generated non-unique indexes for each key in a compound index
      auto_generated_indexes += index_key.map { |idx| { 'key' => idx } }
    end
    self.indexes += auto_generated_indexes
    self.indexes.uniq!
  end

  self.updated_at ||= Time.now
end