Class: Dataflow::Nodes::Filter::DropWhileNode

Inherits:
ComputeNode
  • Object
show all
Defined in:
lib/dataflow/nodes/filter/drop_while_node.rb

Overview

Makes a sequency based on a key (e.g. id), and order it (e.g. by time), and then applies the same logic as ruby’s drop_while. See: ruby-doc.org/core-2.4.0/Array.html#method-i-drop_while

Constant Summary collapse

VALID_OPS =
%w(eq ne le lt ge gt).freeze
VALID_MODES =
%w(both left right).freeze

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Instance Method Summary collapse

Methods inherited from ComputeNode

#all_dependencies, #compute, #data_node, data_node_opts, #dependencies, dependency_opts, ensure_data_node_exists, ensure_dependencies, #execute_local_batch_computation, #execute_local_computation, #execution_valid?, #explain_update, #force_computing_lock_release!, #locked_for_computing?, #make_batch_params, #needs_automatic_recomputing?, #recompute, #schema, #set_defaults, #updated?, #updated_at, #updated_at=, #valid_for_computation?

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Methods included from Dataflow::Node

#all_dependencies, find, #metadata, #recompute, #required_by, #valid_for_computation?, #validate!

Instance Method Details

#compute_implObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/dataflow/nodes/filter/drop_while_node.rb', line 27

def compute_impl
  base_node = dependencies.first
  records_count = base_node.count
  return if records_count == 0

  ids = base_node.all(fields: [id_key]) do |results|
    results.distinct(id_key)
  end
  count_per_process = (ids.count / Parallel.processor_count.to_f).ceil
  limit = limit_per_process.to_i
  count_per_process = [limit, count_per_process].min if limit > 0

  parallel_each(ids.each_slice(count_per_process)) do |ids_slice|
    # ids.each_slice(count_per_process) do |ids_slice|
    process_ids(node: base_node, ids: ids_slice)
  end
end