Class: Bud::PushReduce

Inherits:
PushStatefulElement show all
Defined in:
lib/bud/executor/elements.rb

Instance Attribute Summary

Attributes inherited from PushElement

#elem_name, #found_delta, #invalidated, #outputs, #pendings, #rescan, #wired_by

Attributes inherited from BudCollection

#accumulate_tick_deltas, #bud_instance, #cols, #invalidated, #is_source, #key_cols, #new_delta, #pending, #rescan, #scanner_cnt, #struct, #tabname, #wired_by

Instance Method Summary collapse

Methods inherited from PushStatefulElement

#add_rescan_invalidate

Methods inherited from PushElement

#*, #<<, #add_rescan_invalidate, #all?, #any?, #argagg, #argmax, #argmin, #check_wiring, #each_with_index, #group, #include?, #inspected, #invalidate_tables, #join, #member?, #merge, #none?, #notin, #on_include?, #one?, #print_wiring, #pro, #push_out, #push_predicate, #reduce, #rescan_at_tick, #set_block, #sort, #stratum_end, #tick, #tick_deltas, #wire_to, #wirings

Methods inherited from BudCollection

#*, #<<, #<=, #[], #add_rescan_invalidate, #argagg, #argmax, #argmin, #bootstrap, #canonicalize_col, #close, #do_insert, #each, #each_delta, #each_raw, #each_tick_delta, #each_with_index, #empty?, #exists?, #flat_map, #flush_deltas, #group, #has_key?, #include?, #init_schema, #inspect, #inspected, #invalidate_at_tick, #keys, #length, #merge, #non_temporal_predecessors, #notin, #null_tuple, #pending_merge, #positive_predecessors, #prep_aggpairs, #pro, #qualified_tabname, #reduce, #register_coll_expr, #rename, #schema, #sort, #tick, #tick_deltas, #tick_metrics, #to_push_elem, #uniquify_tabname, #val_cols, #values

Methods included from Enumerable

#pro

Constructor Details

#initialize(elem_name, bud_instance, collection_name, schema_in, initial, &blk) ⇒ PushReduce

Returns a new instance of PushReduce.



506
507
508
509
510
511
512
# File 'lib/bud/executor/elements.rb', line 506

def initialize(elem_name, bud_instance, collection_name,
               schema_in, initial, &blk)
  @initial = initial
  @blk = blk
  reset_memo
  super(elem_name, bud_instance, collection_name, schema)
end

Instance Method Details

#flushObject



528
529
530
531
532
533
534
535
# File 'lib/bud/executor/elements.rb', line 528

def flush
  unless @memo.kind_of? Enumerable
    raise Bud::TypeError, "output of reduce must be Enumerable: #{@memo.inspect}"
  end
  @memo.each do |t|
    push_out(t, false)
  end
end

#insert(i, source = nil) ⇒ Object



514
515
516
# File 'lib/bud/executor/elements.rb', line 514

def insert(i, source=nil)
  @memo = @blk.call(@memo, i)
end

#invalidate_cacheObject



518
519
520
521
# File 'lib/bud/executor/elements.rb', line 518

def invalidate_cache
  puts "#{self.class}/#{self.tabname} invalidated" if $BUD_DEBUG
  reset_memo
end

#reset_memoObject



523
524
525
# File 'lib/bud/executor/elements.rb', line 523

def reset_memo
  @memo = Marshal.load(Marshal.dump(@initial))
end