Class: MessageStore::Read::Iterator

Inherits:
Object
  • Object
show all
Includes:
Dependency, Initializer, Log::Dependency, Virtual
Defined in:
lib/message_store/read/iterator.rb

Direct Known Subclasses

Substitute

Defined Under Namespace

Classes: Substitute

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batchObject

Returns the value of attribute batch.



11
12
13
# File 'lib/message_store/read/iterator.rb', line 11

def batch
  @batch
end

#batch_indexObject



18
19
20
# File 'lib/message_store/read/iterator.rb', line 18

def batch_index
  @batch_index ||= 0
end

#starting_positionObject



13
14
15
# File 'lib/message_store/read/iterator.rb', line 13

def starting_position
  @starting_position ||= 0
end

Class Method Details

.build(position = nil) ⇒ Object



27
28
29
30
31
32
# File 'lib/message_store/read/iterator.rb', line 27

def self.build(position=nil)
  new.tap do |instance|
    instance.starting_position = position
    Log.get(self).debug { "Built Iterator (Starting Position: #{position.inspect})" }
  end
end

.configure(receiver, position = nil, attr_name: nil) ⇒ Object



34
35
36
37
38
# File 'lib/message_store/read/iterator.rb', line 34

def self.configure(receiver, position=nil, attr_name: nil)
  attr_name ||= :iterator
  instance = build(position)
  receiver.public_send "#{attr_name}=", instance
end

Instance Method Details

#advance_batch_indexObject



110
111
112
113
114
# File 'lib/message_store/read/iterator.rb', line 110

def advance_batch_index
  logger.trace { "Advancing batch index (Batch Index: #{batch_index})" }
  self.batch_index += 1
  logger.debug { "Advanced batch index (Batch Index: #{batch_index})" }
end

#batch_depleted?Boolean

Returns:

  • (Boolean)


120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/message_store/read/iterator.rb', line 120

def batch_depleted?
  if not batch_initialized?
    logger.debug { "Batch is depleted (Batch is not initialized)" }
    return true
  end

  if batch.empty?
    logger.debug { "Batch is depleted (Batch is empty)" }
    return true
  end

  if batch_index == batch.length
    logger.debug { "Batch is depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" }
    return true
  end

  logger.debug { "Batch is not depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" }
  false
end

#batch_initialized?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/message_store/read/iterator.rb', line 116

def batch_initialized?
  not batch.nil?
end

#batch_sizeObject



23
24
25
# File 'lib/message_store/read/iterator.rb', line 23

def batch_size
  get.batch_size
end

#get_batchObject



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/message_store/read/iterator.rb', line 70

def get_batch
  position = next_batch_starting_position

  logger.trace "Getting batch (Position: #{position.inspect})"

  batch = get.(position)

  logger.debug { "Finished getting batch (Count: #{batch.length}, Position: #{position.inspect})" }

  batch
end

#last_positionObject



95
96
97
# File 'lib/message_store/read/iterator.rb', line 95

def last_position
  get.last_position(batch)
end

#nextObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/message_store/read/iterator.rb', line 40

def next
  logger.trace { "Getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }

  if batch_depleted?
    resupply
  end

  message_data = batch[batch_index]

  logger.debug(tags: [:data, :message_data]) { "Next message data: #{message_data.pretty_inspect}" }
  logger.debug { "Done getting next message data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }

  advance_batch_index

  message_data
end

#next_batch_starting_positionObject



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/message_store/read/iterator.rb', line 82

def next_batch_starting_position
  if not batch_initialized?
    logger.debug { "Batch is not initialized (Next batch starting position: #{starting_position.inspect})" }
    return starting_position
  end

  previous_position = last_position
  next_position = previous_position + 1
  logger.debug { "End of batch (Next starting position: #{next_position}, Previous Position: #{previous_position})" }

  next_position
end

#reset(batch) ⇒ Object



99
100
101
102
103
104
105
106
107
108
# File 'lib/message_store/read/iterator.rb', line 99

def reset(batch)
  logger.trace { "Resetting batch" }

  self.batch = batch
  self.batch_index = 0

  logger.debug(tags: [:data, :batch]) { "Batch set to: \n#{batch.pretty_inspect}" }
  logger.debug(tags: [:data, :batch]) { "Batch position set to: #{batch_index.inspect}" }
  logger.debug { "Done resetting batch" }
end

#resupplyObject



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/message_store/read/iterator.rb', line 57

def resupply
  logger.trace { "Resupplying batch (Current Batch Length: #{(batch &.length).inspect})" }

  batch = []
  unless stream_depleted?
    batch = get_batch
  end

  reset(batch)

  logger.debug { "Batch resupplied (Next Batch Length: #{(batch &.length).inspect})" }
end

#stream_depleted?Boolean

Returns:

  • (Boolean)


140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/message_store/read/iterator.rb', line 140

def stream_depleted?
  if not batch_initialized?
    logger.debug { "Stream is not depleted (Batch Length: (batch is nil), Batch Size: #{batch_size})" }
    return false
  end

  if batch.length < batch_size
    logger.debug { "Stream is depleted (Batch Length: #{batch.length}, Batch Size: #{batch_size})" }
    return true
  end

  logger.debug { "Stream is not depleted (Batch Length: #{batch.length}, Batch Size: #{batch_size})" }
  false
end