Class: Dataflow::Nodes::Filter::DropWhileNode
- Inherits:
-
ComputeNode
- Object
- ComputeNode
- Dataflow::Nodes::Filter::DropWhileNode
- Defined in:
- lib/dataflow/nodes/filter/drop_while_node.rb
Overview
Makes a sequency based on a key (e.g. id), and order it (e.g. by time), and then applies the same logic as ruby’s drop_while. See: ruby-doc.org/core-2.4.0/Array.html#method-i-drop_while
Constant Summary collapse
- VALID_OPS =
%w(eq ne le lt ge gt).freeze
- VALID_MODES =
%w(both left right).freeze
Constants included from SchemaMixin
SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR
Instance Method Summary collapse
Methods inherited from ComputeNode
#all_dependencies, #compute, #data_node, data_node_opts, #dependencies, dependency_opts, ensure_data_node_exists, ensure_dependencies, #execute_local_batch_computation, #execute_local_computation, #execution_valid?, #explain_update, #force_computing_lock_release!, #locked_for_computing?, #make_batch_params, #needs_automatic_recomputing?, #recompute, #schema, #set_defaults, #updated?, #updated_at, #updated_at=, #valid_for_computation?
Methods included from SchemaMixin
#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer
Methods included from Dataflow::Node
#all_dependencies, find, #metadata, #recompute, #required_by, #valid_for_computation?, #validate!
Instance Method Details
#compute_impl ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/dataflow/nodes/filter/drop_while_node.rb', line 27 def compute_impl base_node = dependencies.first records_count = base_node.count return if records_count == 0 ids = base_node.all(fields: [id_key]) do |results| results.distinct(id_key) end count_per_process = (ids.count / Parallel.processor_count.to_f).ceil limit = limit_per_process.to_i count_per_process = [limit, count_per_process].min if limit > 0 parallel_each(ids.each_slice(count_per_process)) do |ids_slice| # ids.each_slice(count_per_process) do |ids_slice| process_ids(node: base_node, ids: ids_slice) end end |