Class: Bud::PushArgAgg

Inherits:
PushGroup show all
Defined in:
lib/bud/executor/group.rb

Instance Attribute Summary

Attributes inherited from PushElement

#elem_name, #found_delta, #invalidated, #outputs, #pendings, #rescan, #wired_by

Attributes inherited from BudCollection

#accumulate_tick_deltas, #bud_instance, #cols, #invalidated, #is_source, #key_cols, #new_delta, #pending, #rescan, #scanner_cnt, #struct, #tabname, #wired_by

Instance Method Summary collapse

Methods inherited from PushGroup

#add_rescan_invalidate

Methods inherited from PushStatefulElement

#add_rescan_invalidate

Methods inherited from PushElement

#*, #<<, #add_rescan_invalidate, #all?, #any?, #argagg, #argmax, #argmin, #check_wiring, #each_with_index, #group, #include?, #inspected, #invalidate_tables, #join, #member?, #merge, #none?, #notin, #on_include?, #one?, #print_wiring, #pro, #push_out, #push_predicate, #reduce, #rescan_at_tick, #set_block, #sort, #stratum_end, #tick, #tick_deltas, #wire_to, #wirings

Methods inherited from BudCollection

#*, #<<, #<=, #[], #add_rescan_invalidate, #argagg, #argmax, #argmin, #bootstrap, #canonicalize_col, #close, #do_insert, #each, #each_delta, #each_raw, #each_tick_delta, #each_with_index, #empty?, #exists?, #flat_map, #flush_deltas, #group, #has_key?, #include?, #init_schema, #inspect, #inspected, #invalidate_at_tick, #keys, #length, #merge, #non_temporal_predecessors, #notin, #null_tuple, #pending_merge, #positive_predecessors, #prep_aggpairs, #pro, #qualified_tabname, #reduce, #register_coll_expr, #rename, #schema, #sort, #tick, #tick_deltas, #tick_metrics, #to_push_elem, #uniquify_tabname, #val_cols, #values

Methods included from Enumerable

#pro

Constructor Details

#initialize(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk) ⇒ PushArgAgg

Returns a new instance of PushArgAgg.



92
93
94
95
96
97
98
# File 'lib/bud/executor/group.rb', line 92

def initialize(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk)
  unless aggpairs_in.length == 1
    raise Bud::Error, "multiple aggpairs #{aggpairs_in.map{|a| a.class.name}} in ArgAgg; only one allowed"
  end
  super(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk)
  @winners = {}
end

Instance Method Details

#flushObject



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/bud/executor/group.rb', line 139

def flush
  # Don't emit fresh output unless a rescan is needed
  return unless @rescan
  @rescan = false

  @groups.each_key do |g|
    @winners[g].each do |t|
      push_out(t)
    end
  end
end

#insert(item, source) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/bud/executor/group.rb', line 106

def insert(item, source)
  key = @keys.map{|k| item[k]}
  group_state = @groups[key]
  if group_state.nil?
    @groups[key] = @aggpairs.map do |ap|
      @winners[key] = [item]
      input_vals = item.values_at(*ap[1])
      ap[0].init(*input_vals)
    end
  else
    @aggpairs.each_with_index do |ap, agg_ix|
      input_vals = item.values_at(*ap[1])
      state_val, flag, *rest = ap[0].trans(group_state[agg_ix], *input_vals)
      group_state[agg_ix] = state_val

      case flag
      when :ignore
        # do nothing
      when :replace
        @winners[key] = [item]
      when :keep
        @winners[key] << item
      when :delete
        rest.each do |t|
          @winners[key].delete t
        end
      else
        raise Bud::Error, "strange result from argagg transition func: #{flag}"
      end
    end
  end
end

#invalidate_cacheObject



101
102
103
104
# File 'lib/bud/executor/group.rb', line 101

def invalidate_cache
  super
  @winners.clear
end