Class: Bud::PushNotIn

Inherits:
PushStatefulElement show all
Defined in:
lib/bud/executor/join.rb

Overview

Consider “u <= s.notin(t, s.a => t.b)”. notin is a non-monotonic operator, where u depends positively on s, but negatively on t. Stratification ensures that t is fully computed in a lower stratum, which means that we can expect multiple iterators on s’s side only. If t’s scanner were to push its elements down first, every insert of s merely needs to be cross checked with the cached elements of ‘t’, and pushed down to the next element if s notin

  1. However, if s’s scanner were to fire first, we have to wait until the

first flush, at which point we are sure to have seen all the t-side tuples in this tick.

Instance Attribute Summary

Attributes inherited from PushElement

#elem_name, #found_delta, #invalidated, #outputs, #pendings, #rescan, #wired_by

Attributes inherited from BudCollection

#accumulate_tick_deltas, #bud_instance, #cols, #invalidated, #is_source, #key_cols, #new_delta, #pending, #rescan, #scanner_cnt, #struct, #tabname, #wired_by

Instance Method Summary collapse

Methods inherited from PushStatefulElement

#add_rescan_invalidate

Methods inherited from PushElement

#*, #<<, #add_rescan_invalidate, #all?, #any?, #argagg, #argmax, #argmin, #check_wiring, #each_with_index, #group, #include?, #inspected, #invalidate_tables, #join, #member?, #merge, #none?, #notin, #on_include?, #one?, #print_wiring, #pro, #push_out, #push_predicate, #reduce, #set_block, #sort, #tick, #tick_deltas, #wire_to, #wirings

Methods inherited from BudCollection

#*, #<<, #<=, #[], #add_rescan_invalidate, #argagg, #argmax, #argmin, #bootstrap, #canonicalize_col, #close, #each, #each_delta, #each_raw, #each_tick_delta, #each_with_index, #empty?, #exists?, #flat_map, #flush_deltas, #group, #has_key?, #include?, #init_schema, #inspect, #inspected, #invalidate_at_tick, #keys, #length, #merge, #non_temporal_predecessors, #notin, #null_tuple, #pending_merge, #positive_predecessors, #prep_aggpairs, #pro, #qualified_tabname, #reduce, #register_coll_expr, #rename, #schema, #sort, #tick, #tick_deltas, #tick_metrics, #to_push_elem, #uniquify_tabname, #val_cols, #values

Methods included from Enumerable

#pro

Constructor Details

#initialize(rellist, bud_instance, preds, &blk) ⇒ PushNotIn

:nodoc: all



509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
# File 'lib/bud/executor/join.rb', line 509

def initialize(rellist, bud_instance, preds, &blk) # :nodoc: all
  @lhs, @rhs = rellist
  @lhs_keycols = nil
  @rhs_keycols = nil
  name_in = "#{@lhs.qualified_tabname}_notin_#{@rhs.qualified_tabname}".to_sym
  super(name_in, bud_instance, nil, @lhs.schema)
  setup_preds(preds) unless preds.empty?
  @rhs_rcvd = false
  @hash_tables = [{},{}]
  if @lhs_keycols.nil? and blk.nil?
    # Pointwise comparison. Could use zip, but it creates an array for each
    # field pair.
    blk = lambda {|lhs, rhs|
      lhs.to_a == rhs.to_a
    }
  end
  set_block(&blk)
end

Instance Method Details

#do_insert(item, offset) ⇒ Object



578
579
580
581
582
583
584
585
# File 'lib/bud/executor/join.rb', line 578

def do_insert(item, offset)
  key = get_key(item, offset)
  (@hash_tables[offset][key] ||= Set.new).add item
  if @rhs_rcvd and offset == 0
    rhs_values = @hash_tables[1][key]
    process_match(item, rhs_values)
  end
end

#find_col(colspec, rel) ⇒ Object



543
544
545
546
547
548
549
550
551
552
553
554
555
556
# File 'lib/bud/executor/join.rb', line 543

def find_col(colspec, rel)
  case colspec
  when Symbol
    unless rel.respond_to? colspec
      raise Bud::Error, "attribute :#{colspec} not found in #{rel.qualified_tabname}"
    end
    col_desc = rel.send(colspec)
  when Array
    col_desc = colspec
  else
    raise Bud::Error, "symbol or column spec expected. Got #{colspec}"
  end
  col_desc[1] # col_desc is of the form [tabname, colnum, colname, seqno]
end

#flushObject



587
588
589
590
591
592
593
594
595
596
597
598
599
# File 'lib/bud/executor/join.rb', line 587

def flush
  # When flush is called the first time, both lhs and rhs scanners have been
  # invoked, and because of stratification we know that the rhs is not
  # growing any more, until the next tick.
  unless @rhs_rcvd
    @rhs_rcvd = true
    rhs_hash = @hash_tables[1]
    @hash_tables[0].each do |key,values|
      rhs_values = rhs_hash[key]
      values.each {|item| process_match(item, rhs_values)}
    end
  end
end

#get_key(item, offset) ⇒ Object



558
559
560
561
# File 'lib/bud/executor/join.rb', line 558

def get_key(item, offset)
  keycols = (offset == 0 ? @lhs_keycols : @rhs_keycols)
  keycols.nil? ? [] : item.values_at(*keycols)
end

#insert(item, source) ⇒ Object



568
569
570
571
572
573
574
575
576
# File 'lib/bud/executor/join.rb', line 568

def insert(item, source)
  if source == @lhs && source == @rhs       # Self join
    do_insert(item, 0)
    do_insert(item, 1)
  else
    offset = source == @lhs ? 0 : 1
    do_insert(item, offset)
  end
end

#invalidate_cacheObject

Raises:



616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/bud/executor/join.rb', line 616

def invalidate_cache
  raise Bud::Error if @rhs_rcvd     # sanity check; should already be reset

  if @lhs.rescan
    puts "#{tabname} rel:#{@lhs.qualified_tabname} invalidated" if $BUD_DEBUG
    @hash_tables[0] = {}
  end
  if @rhs.rescan
    puts "#{tabname} rel:#{@rhs.qualified_tabname} invalidated" if $BUD_DEBUG
    @hash_tables[1] = {}
  end
end

#process_match(lhs_item, rhs_values) ⇒ Object



601
602
603
604
605
606
607
608
609
610
611
612
613
614
# File 'lib/bud/executor/join.rb', line 601

def process_match(lhs_item, rhs_values)
  if rhs_values.nil?
    # no corresponding rhs. Include in output
    exclude = false
  elsif not @blk.nil?
    # for any lhs * rhs pair, if block returns true, do not push lhs. lhs is pushed
    # only if there is no match (anti-join)
    exclude = rhs_values.any?{|rhs_item| @blk.call(lhs_item, rhs_item)}
  else
    exclude = true
  end

  push_out(lhs_item, false) unless exclude
end

#rescan_at_tickObject



564
565
566
# File 'lib/bud/executor/join.rb', line 564

def rescan_at_tick
  true
end

#setup_preds(preds) ⇒ Object



528
529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/bud/executor/join.rb', line 528

def setup_preds(preds)
  # This is simpler than PushSHJoin's setup_preds, because notin is a binary
  # operator where both lhs and rhs are collections. preds is an array of
  # hash_pairs. For now assume that the attributes are in the same order as
  # the tables.
  @lhs_keycols, @rhs_keycols = preds.reduce([[], []]) do |memo, item|
    # each item is a hash
    item.each_pair do |l, r|
      memo[0] << find_col(l, @lhs)
      memo[1] << find_col(r, @rhs)
    end
    memo
  end
end

#stratum_endObject



629
630
631
# File 'lib/bud/executor/join.rb', line 629

def stratum_end
  @rhs_rcvd = false
end