Class: EventSource::Iterator

Inherits:
Object
  • Object
show all
Includes:
Log::Dependency
Defined in:
lib/event_source/iterator.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batchObject

Returns the value of attribute batch.



6
7
8
# File 'lib/event_source/iterator.rb', line 6

def batch
  @batch
end

#batch_indexObject



8
9
10
# File 'lib/event_source/iterator.rb', line 8

def batch_index
  @batch_index ||= 0
end

#starting_positionObject

Returns the value of attribute starting_position.



5
6
7
# File 'lib/event_source/iterator.rb', line 5

def starting_position
  @starting_position
end

Class Method Details

.build(get, stream_name, position: nil) ⇒ Object



15
16
17
18
19
20
# File 'lib/event_source/iterator.rb', line 15

def self.build(get, stream_name, position: nil)
  new(get, stream_name).tap do |instance|
    instance.starting_position = position
    Log.get(self).debug { "Built Iterator (Stream Name: #{stream_name}, Starting Position: #{position.inspect})" }
  end
end

.configure(receiver, get, stream_name, attr_name: nil, position: nil) ⇒ Object



22
23
24
25
26
# File 'lib/event_source/iterator.rb', line 22

def self.configure(receiver, get, stream_name, attr_name: nil, position: nil)
  attr_name ||= :iterator
  instance = build(get, stream_name, position: position)
  receiver.public_send "#{attr_name}=", instance
end

Instance Method Details

#advance_batch_indexObject



43
44
45
46
47
# File 'lib/event_source/iterator.rb', line 43

def advance_batch_index
  logger.trace { "Advancing batch index (Batch Index: #{batch_index})" }
  self.batch_index += 1
  logger.debug { "Advanced batch index (Batch Index: #{batch_index})" }
end

#batch_depleted?Boolean

Returns:

  • (Boolean)


49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/event_source/iterator.rb', line 49

def batch_depleted?
  if batch.nil?
    logger.debug { "Batch is depleted (Batch is nil)" }
    return true
  end

  if batch.empty?
    logger.debug { "Batch is depleted (Batch is empty)" }
    return true
  end

  if batch_index == batch.length
    logger.debug { "Batch is depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" }
    return true
  end

  false
end

#get_batchObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/event_source/iterator.rb', line 77

def get_batch
  position = next_batch_starting_position

  logger.trace "Getting batch (Position: #{position.inspect})"

  batch = []
  if position.nil? || position >= 0
    batch = get.(stream_name, position: position)
  end

  logger.debug { "Finished getting batch (Count: #{batch.length}, Position: #{position.inspect})" }

  batch
end

#last_positionObject



105
106
107
108
109
110
111
# File 'lib/event_source/iterator.rb', line 105

def last_position
  unless EventSource::StreamName.category?(stream_name)
    batch.last.position
  else
    batch.last.global_position
  end
end

#nextObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/event_source/iterator.rb', line 28

def next
  logger.trace { "Getting next event data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }

  resupply if batch_depleted?

  event_data = batch[batch_index]

  logger.debug(tags: [:data, :event_data]) { "Next event data: #{event_data.pretty_inspect}" }
  logger.debug { "Done getting next event data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }

  advance_batch_index

  event_data
end

#next_batch_starting_positionObject



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/event_source/iterator.rb', line 92

def next_batch_starting_position
  if batch.nil?
    logger.debug { "Batch is nil (Next batch starting position: #{starting_position.inspect})" }
    return starting_position
  end

  previous_position = last_position
  next_position = previous_position + 1
  logger.debug { "End of batch (Next starting position: #{next_position}, Previous Position: #{previous_position})" }

  next_position
end

#reset(batch) ⇒ Object



113
114
115
116
117
118
119
120
121
122
# File 'lib/event_source/iterator.rb', line 113

def reset(batch)
  logger.trace { "Resetting batch" }

  self.batch = batch
  self.batch_index = 0

  logger.debug(tags: [:data, :batch]) { "Batch set to: \n#{batch.pretty_inspect}" }
  logger.debug(tags: [:data, :batch]) { "Batch position set to: #{batch_index.inspect}" }
  logger.debug { "Done resetting batch" }
end

#resupplyObject



68
69
70
71
72
73
74
75
# File 'lib/event_source/iterator.rb', line 68

def resupply
  logger.trace { "Resupplying batch (Current Batch Length: #{(batch &.length).inspect})" }

  batch = get_batch
  reset(batch)

  logger.debug { "Batch resupplied (Next Batch Length: #{(batch &.length).inspect})" }
end