Class: MessageStore::EventStore::Get

Inherits:
Object
  • Object
show all
Includes:
Log::Dependency
Defined in:
lib/message_store/event_store/get.rb,
lib/message_store/event_store/get/last.rb,
lib/message_store/event_store/get/result.rb,
lib/message_store/event_store/get/assertions.rb

Defined Under Namespace

Modules: Assertions, Defaults, Result Classes: Last

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(batch_size: nil, long_poll_duration: nil, session: nil) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/message_store/event_store/get.rb', line 17

def self.build(batch_size: nil, long_poll_duration: nil, session: nil)
  instance = new(batch_size, long_poll_duration)

  Session.configure(instance, session: session)

  session ||= instance.session
  ::EventStore::HTTP::ReadStream.configure(instance, session: session)

  instance.configure
  instance
end

.call(stream_name, position: nil, **build_arguments) ⇒ Object



29
30
31
32
# File 'lib/message_store/event_store/get.rb', line 29

def self.call(stream_name, position: nil, **build_arguments)
  instance = build(**build_arguments)
  instance.(stream_name, position: position)
end

Instance Method Details

#batch_sizeObject



10
11
12
# File 'lib/message_store/event_store/get.rb', line 10

def batch_size
  @batch_size ||= Defaults.batch_size
end

#call(stream_name, position: nil) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/message_store/event_store/get.rb', line 43

def call(stream_name, position: nil)
  logger.trace { "Reading stream (Stream Name: #{stream_name}, Position: #{position || '(start)'}, Batch Size: #{batch_size})" }

  begin
    messages = read_stream.(
      stream_name,
      position: position,
      batch_size: batch_size
    )
  rescue ::EventStore::HTTP::ReadStream::StreamNotFoundError
    messages = []
  end

  logger.debug { "Done reading stream (Stream Name: #{stream_name}, Position: #{position || '(start)'}, Batch Size: #{batch_size}, Messages: #{messages.count})" }

  messages
end

#configureObject



34
35
36
37
38
39
40
41
# File 'lib/message_store/event_store/get.rb', line 34

def configure
  read_stream.embed_body
  read_stream.output_schema = Result

  unless long_poll_duration.nil?
    read_stream.enable_long_poll(long_poll_duration)
  end
end