Class: EventSource::Iterator
- Inherits:
-
Object
- Object
- EventSource::Iterator
- Includes:
- Log::Dependency
- Defined in:
- lib/event_source/iterator.rb
Instance Attribute Summary collapse
-
#batch ⇒ Object
Returns the value of attribute batch.
- #batch_index ⇒ Object
-
#starting_position ⇒ Object
Returns the value of attribute starting_position.
Class Method Summary collapse
- .build(get, stream_name, position: nil) ⇒ Object
- .configure(receiver, get, stream_name, attr_name: nil, position: nil) ⇒ Object
Instance Method Summary collapse
- #advance_batch_index ⇒ Object
- #batch_depleted? ⇒ Boolean
- #get_batch ⇒ Object
- #last_position ⇒ Object
- #next ⇒ Object
- #next_batch_starting_position ⇒ Object
- #reset(batch) ⇒ Object
- #resupply ⇒ Object
Instance Attribute Details
#batch ⇒ Object
Returns the value of attribute batch.
6 7 8 |
# File 'lib/event_source/iterator.rb', line 6 def batch @batch end |
#batch_index ⇒ Object
8 9 10 |
# File 'lib/event_source/iterator.rb', line 8 def batch_index @batch_index ||= 0 end |
#starting_position ⇒ Object
Returns the value of attribute starting_position.
5 6 7 |
# File 'lib/event_source/iterator.rb', line 5 def starting_position @starting_position end |
Class Method Details
.build(get, stream_name, position: nil) ⇒ Object
15 16 17 18 19 20 |
# File 'lib/event_source/iterator.rb', line 15 def self.build(get, stream_name, position: nil) new(get, stream_name).tap do |instance| instance.starting_position = position Log.get(self).debug { "Built Iterator (Stream Name: #{stream_name}, Starting Position: #{position.inspect})" } end end |
.configure(receiver, get, stream_name, attr_name: nil, position: nil) ⇒ Object
22 23 24 25 26 |
# File 'lib/event_source/iterator.rb', line 22 def self.configure(receiver, get, stream_name, attr_name: nil, position: nil) attr_name ||= :iterator instance = build(get, stream_name, position: position) receiver.public_send "#{attr_name}=", instance end |
Instance Method Details
#advance_batch_index ⇒ Object
43 44 45 46 47 |
# File 'lib/event_source/iterator.rb', line 43 def advance_batch_index logger.trace { "Advancing batch index (Batch Index: #{batch_index})" } self.batch_index += 1 logger.debug { "Advanced batch index (Batch Index: #{batch_index})" } end |
#batch_depleted? ⇒ Boolean
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/event_source/iterator.rb', line 49 def batch_depleted? if batch.nil? logger.debug { "Batch is depleted (Batch is nil)" } return true end if batch.empty? logger.debug { "Batch is depleted (Batch is empty)" } return true end if batch_index == batch.length logger.debug { "Batch is depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" } return true end false end |
#get_batch ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/event_source/iterator.rb', line 77 def get_batch position = next_batch_starting_position logger.trace "Getting batch (Position: #{position.inspect})" batch = [] if position.nil? || position >= 0 batch = get.(stream_name, position: position) end logger.debug { "Finished getting batch (Count: #{batch.length}, Position: #{position.inspect})" } batch end |
#last_position ⇒ Object
105 106 107 108 109 110 111 |
# File 'lib/event_source/iterator.rb', line 105 def last_position unless EventSource::StreamName.category?(stream_name) batch.last.position else batch.last.global_position end end |
#next ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/event_source/iterator.rb', line 28 def next logger.trace { "Getting next event data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" } resupply if batch_depleted? event_data = batch[batch_index] logger.debug(tags: [:data, :event_data]) { "Next event data: #{event_data.pretty_inspect}" } logger.debug { "Done getting next event data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" } advance_batch_index event_data end |
#next_batch_starting_position ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/event_source/iterator.rb', line 92 def next_batch_starting_position if batch.nil? logger.debug { "Batch is nil (Next batch starting position: #{starting_position.inspect})" } return starting_position end previous_position = last_position next_position = previous_position + 1 logger.debug { "End of batch (Next starting position: #{next_position}, Previous Position: #{previous_position})" } next_position end |
#reset(batch) ⇒ Object
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/event_source/iterator.rb', line 113 def reset(batch) logger.trace { "Resetting batch" } self.batch = batch self.batch_index = 0 logger.debug(tags: [:data, :batch]) { "Batch set to: \n#{batch.pretty_inspect}" } logger.debug(tags: [:data, :batch]) { "Batch position set to: #{batch_index.inspect}" } logger.debug { "Done resetting batch" } end |
#resupply ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/event_source/iterator.rb', line 68 def resupply logger.trace { "Resupplying batch (Current Batch Length: #{(batch &.length).inspect})" } batch = get_batch reset(batch) logger.debug { "Batch resupplied (Next Batch Length: #{(batch &.length).inspect})" } end |