Module: Consumer::Postgres::PositionStore::StreamName

Defined in:
lib/consumer/postgres/position_store/stream_name.rb

Defined Under Namespace

Modules: Type

Constant Summary collapse

Error =
Class.new(RuntimeError)

Class Method Summary collapse

Class Method Details

.position_stream_name(stream_name, consumer_identifier: nil) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/consumer/postgres/position_store/stream_name.rb', line 7

def self.position_stream_name(stream_name, consumer_identifier: nil)
  if not MessageStore::StreamName.category?(stream_name)
    raise Error, "Position store's stream name must be a category (Stream Name: #{stream_name})"
  end

  entity_name = MessageStore::StreamName.get_entity_name(stream_name)
  type_list = MessageStore::StreamName.get_types(stream_name)

  position_type = Type.get

  if not type_list.include?(position_type)
    type_list << position_type
  end

  stream_id = nil
  if not consumer_identifier.nil?
    stream_id = consumer_identifier
  end

  MessageStore::StreamName.stream_name(
    entity_name,
    stream_id,
    types: type_list
  )
end