Class: Dataflow::Nodes::Export::ToCsvNode
- Inherits:
-
ComputeNode
- Object
- ComputeNode
- Dataflow::Nodes::Export::ToCsvNode
- Defined in:
- lib/dataflow/nodes/export/to_csv_node.rb
Overview
Export a dataset to CSV
Constant Summary
Constants included from SchemaMixin
SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR
Instance Method Summary collapse
- #compute_impl ⇒ Object
-
#transform_fields(db_backend, keys) ⇒ Object
Transform the keys to the field that need to be selected on the backend.
Methods inherited from ComputeNode
#all_dependencies, #compute, #data_node, data_node_opts, #dependencies, dependency_opts, ensure_data_node_exists, ensure_dependencies, #execute_local_batch_computation, #execute_local_computation, #execution_valid?, #explain_update, #force_computing_lock_release!, #locked_for_computing?, #make_batch_params, #needs_automatic_recomputing?, #recompute, #schema, #set_defaults, #updated?, #updated_at, #updated_at=, #valid_for_computation?
Methods included from SchemaMixin
#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer
Methods included from Dataflow::Node
#all_dependencies, find, #metadata, #recompute, #required_by, #valid_for_computation?, #validate!
Instance Method Details
#compute_impl ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/dataflow/nodes/export/to_csv_node.rb', line 15 def compute_impl node = dependencies.first where = JSON.parse(query) # fetch the schema sch = if keys.present? keys.map { |k| [k, { type: 'string' }] }.to_h else node.infer_partial_schema(where: where, extended: true) end # create the dataset csv_adapter = Adapters::CsvAdapter.new(data_node: node) csv_adapter.set_schema(sch) csv_adapter.recreate_dataset # export in parallel max_per_process = 1000 max_per_process = limit_per_process if limit_per_process < 0 data_count = [node.count(where: where), 1].max equal_split_per_process = (data_count / Parallel.processor_count.to_f).ceil count_per_process = [max_per_process, equal_split_per_process].min queries = node.ordered_system_id_queries(batch_size: count_per_process) system_id = node.send(:db_adapter).class::SYSTEM_ID parallel_each(queries.each_with_index) do |query, idx| # TODO: re-enabled event on_export_progressed # progress = (idx / queries.count.to_f * 100).ceil # on_export_progressed(pct_complete: progress) fields = transform_fields(node.db_backend, sch.keys) batch = node.all(where: query.merge(where), fields: fields, sort: { system_id => 1 }) csv_adapter.save(records: batch, part: idx.to_s.rjust(queries.count.to_s.length, "0")) end # needed by the csv exporter to finalize in a single file csv_adapter.on_save_finished end |
#transform_fields(db_backend, keys) ⇒ Object
Transform the keys to the field that need to be selected on the backend. That’s a fix meant especially for selecting nested values on mongo
59 60 61 62 63 64 65 66 |
# File 'lib/dataflow/nodes/export/to_csv_node.rb', line 59 def transform_fields(db_backend, keys) return keys unless db_backend == :mongodb # replace the separator with a dot and make sure we don't select individual # array keys... it seems to breakdown mongodb keys.map { |k| k.gsub(Dataflow::SchemaMixin::SEPARATOR, '.') } .map { |k| k.gsub(/\.[0-9]+/, '') }.uniq end |