Class: Bud::PushNotIn
Overview
Consider “u <= s.notin(t, s.a => t.b)”. notin is a non-monotonic operator, where u depends positively on s, but negatively on t. Stratification ensures that t is fully computed in a lower stratum, which means that we can expect multiple iterators on s’s side only. If t’s scanner were to push its elements down first, every insert of s merely needs to be cross checked with the cached elements of ‘t’, and pushed down to the next element if s notin
-
However, if s’s scanner were to fire first, we have to wait until the
first flush, at which point we are sure to have seen all the t-side tuples in this tick.
Instance Attribute Summary
Attributes inherited from PushElement
#elem_name, #found_delta, #invalidated, #outputs, #pendings, #rescan, #wired_by
#accumulate_tick_deltas, #bud_instance, #cols, #invalidated, #is_source, #key_cols, #new_delta, #pending, #rescan, #scanner_cnt, #struct, #tabname, #wired_by
Instance Method Summary
collapse
#add_rescan_invalidate
Methods inherited from PushElement
#*, #<<, #add_rescan_invalidate, #all?, #any?, #argagg, #argmax, #argmin, #check_wiring, #each_with_index, #group, #include?, #inspected, #invalidate_tables, #join, #member?, #merge, #none?, #notin, #on_include?, #one?, #print_wiring, #pro, #push_out, #push_predicate, #reduce, #set_block, #sort, #tick, #tick_deltas, #wire_to, #wirings
#*, #<<, #<=, #[], #add_rescan_invalidate, #argagg, #argmax, #argmin, #bootstrap, #canonicalize_col, #close, #each, #each_delta, #each_raw, #each_tick_delta, #each_with_index, #empty?, #exists?, #flat_map, #flush_deltas, #group, #has_key?, #include?, #init_schema, #inspect, #inspected, #invalidate_at_tick, #keys, #length, #merge, #non_temporal_predecessors, #notin, #null_tuple, #pending_merge, #positive_predecessors, #prep_aggpairs, #pro, #qualified_tabname, #reduce, #register_coll_expr, #rename, #schema, #sort, #tick, #tick_deltas, #tick_metrics, #to_push_elem, #uniquify_tabname, #val_cols, #values
Methods included from Enumerable
#pro
Constructor Details
#initialize(rellist, bud_instance, preds, &blk) ⇒ PushNotIn
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
|
# File 'lib/bud/executor/join.rb', line 509
def initialize(rellist, bud_instance, preds, &blk) @lhs, @rhs = rellist
@lhs_keycols = nil
@rhs_keycols = nil
name_in = "#{@lhs.qualified_tabname}_notin_#{@rhs.qualified_tabname}".to_sym
super(name_in, bud_instance, nil, @lhs.schema)
setup_preds(preds) unless preds.empty?
@rhs_rcvd = false
@hash_tables = [{},{}]
if @lhs_keycols.nil? and blk.nil?
blk = lambda {|lhs, rhs|
lhs.to_a == rhs.to_a
}
end
set_block(&blk)
end
|
Instance Method Details
#do_insert(item, offset) ⇒ Object
578
579
580
581
582
583
584
585
|
# File 'lib/bud/executor/join.rb', line 578
def do_insert(item, offset)
key = get_key(item, offset)
(@hash_tables[offset][key] ||= Set.new).add item
if @rhs_rcvd and offset == 0
rhs_values = @hash_tables[1][key]
process_match(item, rhs_values)
end
end
|
#find_col(colspec, rel) ⇒ Object
543
544
545
546
547
548
549
550
551
552
553
554
555
556
|
# File 'lib/bud/executor/join.rb', line 543
def find_col(colspec, rel)
case colspec
when Symbol
unless rel.respond_to? colspec
raise Bud::Error, "attribute :#{colspec} not found in #{rel.qualified_tabname}"
end
col_desc = rel.send(colspec)
when Array
col_desc = colspec
else
raise Bud::Error, "symbol or column spec expected. Got #{colspec}"
end
col_desc[1] end
|
#flush ⇒ Object
587
588
589
590
591
592
593
594
595
596
597
598
599
|
# File 'lib/bud/executor/join.rb', line 587
def flush
unless @rhs_rcvd
@rhs_rcvd = true
rhs_hash = @hash_tables[1]
@hash_tables[0].each do |key,values|
rhs_values = rhs_hash[key]
values.each {|item| process_match(item, rhs_values)}
end
end
end
|
#get_key(item, offset) ⇒ Object
558
559
560
561
|
# File 'lib/bud/executor/join.rb', line 558
def get_key(item, offset)
keycols = (offset == 0 ? @lhs_keycols : @rhs_keycols)
keycols.nil? ? [] : item.values_at(*keycols)
end
|
#insert(item, source) ⇒ Object
568
569
570
571
572
573
574
575
576
|
# File 'lib/bud/executor/join.rb', line 568
def insert(item, source)
if source == @lhs && source == @rhs do_insert(item, 0)
do_insert(item, 1)
else
offset = source == @lhs ? 0 : 1
do_insert(item, offset)
end
end
|
#invalidate_cache ⇒ Object
616
617
618
619
620
621
622
623
624
625
626
627
|
# File 'lib/bud/executor/join.rb', line 616
def invalidate_cache
raise Bud::Error if @rhs_rcvd
if @lhs.rescan
puts "#{tabname} rel:#{@lhs.qualified_tabname} invalidated" if $BUD_DEBUG
@hash_tables[0] = {}
end
if @rhs.rescan
puts "#{tabname} rel:#{@rhs.qualified_tabname} invalidated" if $BUD_DEBUG
@hash_tables[1] = {}
end
end
|
#process_match(lhs_item, rhs_values) ⇒ Object
601
602
603
604
605
606
607
608
609
610
611
612
613
614
|
# File 'lib/bud/executor/join.rb', line 601
def process_match(lhs_item, rhs_values)
if rhs_values.nil?
exclude = false
elsif not @blk.nil?
exclude = rhs_values.any?{|rhs_item| @blk.call(lhs_item, rhs_item)}
else
exclude = true
end
push_out(lhs_item, false) unless exclude
end
|
#rescan_at_tick ⇒ Object
564
565
566
|
# File 'lib/bud/executor/join.rb', line 564
def rescan_at_tick
true
end
|
#setup_preds(preds) ⇒ Object
528
529
530
531
532
533
534
535
536
537
538
539
540
541
|
# File 'lib/bud/executor/join.rb', line 528
def setup_preds(preds)
@lhs_keycols, @rhs_keycols = preds.reduce([[], []]) do |memo, item|
item.each_pair do |l, r|
memo[0] << find_col(l, @lhs)
memo[1] << find_col(r, @rhs)
end
memo
end
end
|
#stratum_end ⇒ Object
629
630
631
|
# File 'lib/bud/executor/join.rb', line 629
def stratum_end
@rhs_rcvd = false
end
|