Class: Dataflow::Nodes::SnapshotNode
- Defined in:
- lib/dataflow/nodes/snapshot_node.rb
Overview
TODO: extend the unique node?
Constant Summary
Constants included from SchemaMixin
SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR
Instance Method Summary collapse
Methods included from Mixin::AddInternalTimestamp
#add_internal_timestamp, included
Methods included from Mixin::RenameDottedFields
included, #rename_dotted_fields, #traverse_and_rename_dotted_fields, #traverse_and_rename_dotted_fields_in_array
Methods inherited from DataNode
#all, #all_paginated, #clear, #count, #create_non_unique_indexes, #create_unique_indexes, #db_indexes, #drop_dataset!, #dump_dataset, #explain_update, #export, #find, #handle_dataset_settings_changed, #import, #info, #ordered_system_id_queries, #read_dataset_name, #read_dataset_name=, #recreate_dataset, #required_by, #restore_dataset, #safely_clear_write_dataset, #swap_read_write_datasets!, #update_schema, #updated?, #use_symbols?, #write_dataset_name
Methods included from SchemaMixin
#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer
Methods included from Dataflow::Node
#all_dependencies, find, #metadata, #recompute, #required_by, #valid_for_computation?, #validate!
Instance Method Details
#add(records:) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/dataflow/nodes/snapshot_node.rb', line 32 def add(records:) raise ArgumentError, "records must be an array of documents. Received: '#{records.class}'." unless records.is_a?(Array) records = records.compact return if records.blank? # TODO: create a chain of behavior "before add" rename_dotted_fields(records: records) (records: records) records.delete_if do |record| convert_update_at_key(record) is_record_redundant?(record: record) end.compact super(records: records) end |
#set_defaults ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/dataflow/nodes/snapshot_node.rb', line 16 def set_defaults super self.indexes ||= [] # get rid of keys/string confusion self.indexes = JSON.parse(self.indexes.to_json) # add keys for the index, updated_at and unique keys self.indexes += [{ 'key' => index_key }] if index_key self.indexes += [{ 'key' => updated_at_key }] if updated_at_key self.indexes += [{ 'key' => [index_key, updated_at_key], 'unique' => true }] if index_key && updated_at_key self.indexes.uniq! self.updated_at ||= Time.now end |