Module: Dataflow

Defined in:
lib/dataflow-rb.rb,
lib/dataflow-rb.rb,
lib/dataflow/node.rb,
lib/dataflow/logger.rb,
lib/dataflow/version.rb,
lib/dataflow/executor.rb,
lib/dataflow/event_mixin.rb,
lib/dataflow/schema_mixin.rb,
lib/dataflow/remote_worker.rb,
lib/dataflow/nodes/map_node.rb,
lib/dataflow/nodes/data_node.rb,
lib/dataflow/nodes/join_node.rb,
lib/dataflow/nodes/merge_node.rb,
lib/dataflow/properties_mixin.rb,
lib/dataflow/adapters/settings.rb,
lib/dataflow/nodes/upsert_node.rb,
lib/dataflow/nodes/compute_node.rb,
lib/dataflow/nodes/snapshot_node.rb,
lib/dataflow/adapters/csv_adapter.rb,
lib/dataflow/adapters/sql_adapter.rb,
lib/dataflow/nodes/sql_query_node.rb,
lib/dataflow/adapters/psql_adapter.rb,
lib/dataflow/adapters/mysql_adapter.rb,
lib/dataflow/nodes/select_keys_node.rb,
lib/dataflow/nodes/filter/where_node.rb,
lib/dataflow/nodes/export/to_csv_node.rb,
lib/dataflow/nodes/filter/newest_node.rb,
lib/dataflow/nodes/runtime_query_node.rb,
lib/dataflow/adapters/mongo_db_adapter.rb,
lib/dataflow/nodes/read_only_data_node.rb,
lib/dataflow/nodes/filter/drop_while_node.rb,
lib/dataflow/errors/remote_execution_error.rb,
lib/dataflow/nodes/mixin/rename_dotted_fields.rb,
lib/dataflow/nodes/transformation/to_time_node.rb,
lib/dataflow/errors/invalid_configuration_error.rb,
lib/dataflow/nodes/mixin/add_internal_timestamp.rb

Overview

Override the #constantize in active_support/inflector/methods.rb to rescue from Dataflow::Nodes::… name errors. In such cases, we return a generic Dataflow::Nodes::DataNode instead. This is used within mongoid to instance the correct node types.

Defined Under Namespace

Modules: Adapters, ConstantizePatch, Errors, EventMixin, Node, Nodes, PropertiesMixin, SchemaMixin Classes: Executor, Logger, RemoteWorker

Constant Summary collapse

CsvPath =
"#{Dir.pwd}/datanodes/csv"
VERSION =
'0.16.0'

Class Method Summary collapse

Class Method Details

.clear_tmp_datasetsObject

helper that helps clearing un-used datasets NOTE: although there is a best attempt to not delete datasets that are currently being written to, this is not safe to use while executing in parallel.



80
81
82
# File 'lib/dataflow-rb.rb', line 80

def self.clear_tmp_datasets
  Dataflow::Nodes::DataNode.all.each(&:safely_clear_write_dataset)
end

.compute_node(id) ⇒ Object

helper that tries to find a computed node by id and then name



71
72
73
74
75
# File 'lib/dataflow-rb.rb', line 71

def self.compute_node(id)
  Dataflow::Nodes::ComputeNode.find(id)
rescue Mongoid::Errors::DocumentNotFound
  Dataflow::Nodes::ComputeNode.find_by(name: id)
end

.data_node(id) ⇒ Object

helper that tries to find a data node by id and then by name



64
65
66
67
68
# File 'lib/dataflow-rb.rb', line 64

def self.data_node(id)
  Dataflow::Nodes::DataNode.find(id)
rescue Mongoid::Errors::DocumentNotFound
  Dataflow::Nodes::DataNode.find_by(name: id)
end

.export(nodes:, export_dir: './flows', include_data: false) ⇒ Object

Exports nodes and their data. Use #import to re-import them elsewhere.

Raises:

  • (ArgumentError)


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/dataflow-rb.rb', line 85

def self.export(nodes:, export_dir: './flows', include_data: false)
  raise ArgumentError, 'nodes must be an array of nodes' unless nodes.is_a?(Array)
  # make a tmp folder with the export dir
  archive_name = "flow_#{Time.now.strftime("%Y-%m-%d_%H-%M-%S")}"
  tmp_dir = "#{export_dir}/#{archive_name}"
  `mkdir -p #{tmp_dir}`

  # export all the dependencies
  all_nodes = nodes + nodes.flat_map(&:all_dependencies)
  # and all the compute node's datasets
  all_nodes += all_nodes.select { |x| x.is_a?(Dataflow::Nodes::ComputeNode) }
                        .map { |x| x.data_node }
  # get all the nodes' metadata in the yaml format
   = all_nodes.compact.uniq.map(&:metadata).to_yaml
  File.write("#{tmp_dir}/metadata.yaml", )

  # add the dataset's data if necessary
  if include_data
    all_nodes.select { |x| x.is_a?(Dataflow::Nodes::DataNode) }
             .each { |x| x.dump_dataset(base_folder: tmp_dir) }
  end

  # pack all the content in a tar archive
  archive_path = "#{archive_name}.tar"
  `(cd #{export_dir} && tar -cvf #{archive_path} #{archive_name})`

  # clear the tmp folder
  `rm -rf #{tmp_dir}`

  "#{export_dir}/#{archive_path}"
end

.import(archive_path:) ⇒ Object

Raises:

  • (ArgumentError)


117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/dataflow-rb.rb', line 117

def self.import(archive_path:)
  raise ArgumentError, 'expecting a tar archive file' unless archive_path.end_with?('.tar')

  # extract the tar
  folder_name = archive_path.split('/')[-1].split('.')[0]
  `tar -xvf #{archive_path}`

  # load and restore the content in the metadata.yaml
   = YAML.load_file("#{folder_name}/metadata.yaml")

  # restore the nodes
  .each do |m|
    klass = m[:_type].constantize

    # try to delete previously existing node
    begin
      previous_node = klass.find(m[:_id])
      previous_node.delete
    rescue Mongoid::Errors::DocumentNotFound
    end

    # create the node
    klass.create(m)
  end

  # look for dataset dumps and restore them
  filepaths = Dir["./#{folder_name}/**/*.gz"] + Dir["./#{folder_name}/**/*.dump"]

  filepaths.each do |filepath|
    # filepath: "./folder/db_name/dataset.1.gz"
    db_name = filepath.split('/')[2]
    dataset = filepath.split('/')[3].split('.')[0]
    n = Dataflow::Nodes::DataNode.find_by(db_name: db_name, name: dataset)
    n.restore_dataset(filepath: filepath)
  end


  # clean up the extracted folder
  `rm -rf #{folder_name}`
end