Module: Dataflow
- Defined in:
- lib/dataflow-rb.rb,
lib/dataflow-rb.rb,
lib/dataflow/node.rb,
lib/dataflow/logger.rb,
lib/dataflow/version.rb,
lib/dataflow/executor.rb,
lib/dataflow/event_mixin.rb,
lib/dataflow/schema_mixin.rb,
lib/dataflow/remote_worker.rb,
lib/dataflow/nodes/map_node.rb,
lib/dataflow/nodes/data_node.rb,
lib/dataflow/nodes/join_node.rb,
lib/dataflow/nodes/merge_node.rb,
lib/dataflow/properties_mixin.rb,
lib/dataflow/adapters/settings.rb,
lib/dataflow/nodes/upsert_node.rb,
lib/dataflow/nodes/compute_node.rb,
lib/dataflow/nodes/snapshot_node.rb,
lib/dataflow/adapters/csv_adapter.rb,
lib/dataflow/adapters/sql_adapter.rb,
lib/dataflow/nodes/sql_query_node.rb,
lib/dataflow/adapters/psql_adapter.rb,
lib/dataflow/adapters/mysql_adapter.rb,
lib/dataflow/nodes/select_keys_node.rb,
lib/dataflow/nodes/filter/where_node.rb,
lib/dataflow/nodes/export/to_csv_node.rb,
lib/dataflow/nodes/filter/newest_node.rb,
lib/dataflow/nodes/runtime_query_node.rb,
lib/dataflow/adapters/mongo_db_adapter.rb,
lib/dataflow/nodes/read_only_data_node.rb,
lib/dataflow/nodes/filter/drop_while_node.rb,
lib/dataflow/errors/remote_execution_error.rb,
lib/dataflow/nodes/mixin/rename_dotted_fields.rb,
lib/dataflow/nodes/transformation/to_time_node.rb,
lib/dataflow/errors/invalid_configuration_error.rb,
lib/dataflow/nodes/mixin/add_internal_timestamp.rb
Overview
Override the #constantize in active_support/inflector/methods.rb to rescue from Dataflow::Nodes::… name errors. In such cases, we return a generic Dataflow::Nodes::DataNode instead. This is used within mongoid to instance the correct node types.
Defined Under Namespace
Modules: Adapters, ConstantizePatch, Errors, EventMixin, Node, Nodes, PropertiesMixin, SchemaMixin Classes: Executor, Logger, RemoteWorker
Constant Summary collapse
- CsvPath =
"#{Dir.pwd}/datanodes/csv"
- VERSION =
'0.16.0'
Class Method Summary collapse
-
.clear_tmp_datasets ⇒ Object
helper that helps clearing un-used datasets NOTE: although there is a best attempt to not delete datasets that are currently being written to, this is not safe to use while executing in parallel.
-
.compute_node(id) ⇒ Object
helper that tries to find a computed node by id and then name.
-
.data_node(id) ⇒ Object
helper that tries to find a data node by id and then by name.
-
.export(nodes:, export_dir: './flows', include_data: false) ⇒ Object
Exports nodes and their data.
- .import(archive_path:) ⇒ Object
Class Method Details
.clear_tmp_datasets ⇒ Object
helper that helps clearing un-used datasets NOTE: although there is a best attempt to not delete datasets that are currently being written to, this is not safe to use while executing in parallel.
80 81 82 |
# File 'lib/dataflow-rb.rb', line 80 def self.clear_tmp_datasets Dataflow::Nodes::DataNode.all.each(&:safely_clear_write_dataset) end |
.compute_node(id) ⇒ Object
helper that tries to find a computed node by id and then name
71 72 73 74 75 |
# File 'lib/dataflow-rb.rb', line 71 def self.compute_node(id) Dataflow::Nodes::ComputeNode.find(id) rescue Mongoid::Errors::DocumentNotFound Dataflow::Nodes::ComputeNode.find_by(name: id) end |
.data_node(id) ⇒ Object
helper that tries to find a data node by id and then by name
64 65 66 67 68 |
# File 'lib/dataflow-rb.rb', line 64 def self.data_node(id) Dataflow::Nodes::DataNode.find(id) rescue Mongoid::Errors::DocumentNotFound Dataflow::Nodes::DataNode.find_by(name: id) end |
.export(nodes:, export_dir: './flows', include_data: false) ⇒ Object
Exports nodes and their data. Use #import to re-import them elsewhere.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/dataflow-rb.rb', line 85 def self.export(nodes:, export_dir: './flows', include_data: false) raise ArgumentError, 'nodes must be an array of nodes' unless nodes.is_a?(Array) # make a tmp folder with the export dir archive_name = "flow_#{Time.now.strftime("%Y-%m-%d_%H-%M-%S")}" tmp_dir = "#{export_dir}/#{archive_name}" `mkdir -p #{tmp_dir}` # export all the dependencies all_nodes = nodes + nodes.flat_map(&:all_dependencies) # and all the compute node's datasets all_nodes += all_nodes.select { |x| x.is_a?(Dataflow::Nodes::ComputeNode) } .map { |x| x.data_node } # get all the nodes' metadata in the yaml format = all_nodes.compact.uniq.map(&:metadata).to_yaml File.write("#{tmp_dir}/metadata.yaml", ) # add the dataset's data if necessary if include_data all_nodes.select { |x| x.is_a?(Dataflow::Nodes::DataNode) } .each { |x| x.dump_dataset(base_folder: tmp_dir) } end # pack all the content in a tar archive archive_path = "#{archive_name}.tar" `(cd #{export_dir} && tar -cvf #{archive_path} #{archive_name})` # clear the tmp folder `rm -rf #{tmp_dir}` "#{export_dir}/#{archive_path}" end |
.import(archive_path:) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/dataflow-rb.rb', line 117 def self.import(archive_path:) raise ArgumentError, 'expecting a tar archive file' unless archive_path.end_with?('.tar') # extract the tar folder_name = archive_path.split('/')[-1].split('.')[0] `tar -xvf #{archive_path}` # load and restore the content in the metadata.yaml = YAML.load_file("#{folder_name}/metadata.yaml") # restore the nodes .each do |m| klass = m[:_type].constantize # try to delete previously existing node begin previous_node = klass.find(m[:_id]) previous_node.delete rescue Mongoid::Errors::DocumentNotFound end # create the node klass.create(m) end # look for dataset dumps and restore them filepaths = Dir["./#{folder_name}/**/*.gz"] + Dir["./#{folder_name}/**/*.dump"] filepaths.each do |filepath| # filepath: "./folder/db_name/dataset.1.gz" db_name = filepath.split('/')[2] dataset = filepath.split('/')[3].split('.')[0] n = Dataflow::Nodes::DataNode.find_by(db_name: db_name, name: dataset) n.restore_dataset(filepath: filepath) end # clean up the extracted folder `rm -rf #{folder_name}` end |