Class: MessageStore::Read::Iterator
- Inherits:
-
Object
- Object
- MessageStore::Read::Iterator
- Includes:
- Dependency, Initializer, Log::Dependency, Virtual
- Defined in:
- lib/message_store/read/iterator.rb
Direct Known Subclasses
Defined Under Namespace
Classes: Substitute
Instance Attribute Summary collapse
-
#batch ⇒ Object
Returns the value of attribute batch.
- #batch_index ⇒ Object
- #starting_position ⇒ Object
Class Method Summary collapse
Instance Method Summary collapse
- #advance_batch_index ⇒ Object
- #batch_depleted? ⇒ Boolean
- #batch_initialized? ⇒ Boolean
- #batch_size ⇒ Object
- #get_batch ⇒ Object
- #last_position ⇒ Object
- #next ⇒ Object
- #next_batch_starting_position ⇒ Object
- #reset(batch) ⇒ Object
- #resupply ⇒ Object
- #stream_depleted? ⇒ Boolean
Instance Attribute Details
#batch ⇒ Object
Returns the value of attribute batch.
11 12 13 |
# File 'lib/message_store/read/iterator.rb', line 11 def batch @batch end |
#batch_index ⇒ Object
18 19 20 |
# File 'lib/message_store/read/iterator.rb', line 18 def batch_index @batch_index ||= 0 end |
#starting_position ⇒ Object
13 14 15 |
# File 'lib/message_store/read/iterator.rb', line 13 def starting_position @starting_position ||= 0 end |
Class Method Details
.build(position = nil) ⇒ Object
27 28 29 30 31 32 |
# File 'lib/message_store/read/iterator.rb', line 27 def self.build(position=nil) new.tap do |instance| instance.starting_position = position Log.get(self).debug { "Built Iterator (Starting Position: #{position.inspect})" } end end |
.configure(receiver, position = nil, attr_name: nil) ⇒ Object
34 35 36 37 38 |
# File 'lib/message_store/read/iterator.rb', line 34 def self.configure(receiver, position=nil, attr_name: nil) attr_name ||= :iterator instance = build(position) receiver.public_send "#{attr_name}=", instance end |
Instance Method Details
#advance_batch_index ⇒ Object
110 111 112 113 114 |
# File 'lib/message_store/read/iterator.rb', line 110 def advance_batch_index logger.trace { "Advancing batch index (Batch Index: #{batch_index})" } self.batch_index += 1 logger.debug { "Advanced batch index (Batch Index: #{batch_index})" } end |
#batch_depleted? ⇒ Boolean
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/message_store/read/iterator.rb', line 120 def batch_depleted? if not batch_initialized? logger.debug { "Batch is depleted (Batch is not initialized)" } return true end if batch.empty? logger.debug { "Batch is depleted (Batch is empty)" } return true end if batch_index == batch.length logger.debug { "Batch is depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" } return true end logger.debug { "Batch is not depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" } false end |
#batch_initialized? ⇒ Boolean
116 117 118 |
# File 'lib/message_store/read/iterator.rb', line 116 def batch_initialized? not batch.nil? end |
#batch_size ⇒ Object
23 24 25 |
# File 'lib/message_store/read/iterator.rb', line 23 def batch_size get.batch_size end |
#get_batch ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/message_store/read/iterator.rb', line 70 def get_batch position = next_batch_starting_position logger.trace "Getting batch (Position: #{position.inspect})" batch = get.(position) logger.debug { "Finished getting batch (Count: #{batch.length}, Position: #{position.inspect})" } batch end |
#last_position ⇒ Object
95 96 97 |
# File 'lib/message_store/read/iterator.rb', line 95 def last_position get.last_position(batch) end |
#next ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/message_store/read/iterator.rb', line 40 def next logger.trace { "Getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" } if batch_depleted? resupply end = batch[batch_index] logger.debug(tags: [:data, :message_data]) { "Next message data: #{.pretty_inspect}" } logger.debug { "Done getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" } advance_batch_index end |
#next_batch_starting_position ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/message_store/read/iterator.rb', line 82 def next_batch_starting_position if not batch_initialized? logger.debug { "Batch is not initialized (Next batch starting position: #{starting_position.inspect})" } return starting_position end previous_position = last_position next_position = previous_position + 1 logger.debug { "End of batch (Next starting position: #{next_position}, Previous Position: #{previous_position})" } next_position end |
#reset(batch) ⇒ Object
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/message_store/read/iterator.rb', line 99 def reset(batch) logger.trace { "Resetting batch" } self.batch = batch self.batch_index = 0 logger.debug(tags: [:data, :batch]) { "Batch set to: \n#{batch.pretty_inspect}" } logger.debug(tags: [:data, :batch]) { "Batch position set to: #{batch_index.inspect}" } logger.debug { "Done resetting batch" } end |
#resupply ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/message_store/read/iterator.rb', line 57 def resupply logger.trace { "Resupplying batch (Current Batch Length: #{(batch &.length).inspect})" } batch = [] unless stream_depleted? batch = get_batch end reset(batch) logger.debug { "Batch resupplied (Next Batch Length: #{(batch &.length).inspect})" } end |
#stream_depleted? ⇒ Boolean
140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/message_store/read/iterator.rb', line 140 def stream_depleted? if not batch_initialized? logger.debug { "Stream is not depleted (Batch Length: (batch is nil), Batch Size: #{batch_size})" } return false end if batch.length < batch_size logger.debug { "Stream is depleted (Batch Length: #{batch.length}, Batch Size: #{batch_size})" } return true end logger.debug { "Stream is not depleted (Batch Length: #{batch.length}, Batch Size: #{batch_size})" } false end |