Module: Consumer::Postgres

Included in:
Controls::Consumer::Example, Controls::Consumer::Identifier
Defined in:
lib/consumer/postgres/postgres.rb,
lib/consumer/postgres/identifier.rb,
lib/consumer/postgres/controls/id.rb,
lib/consumer/postgres/position_store.rb,
lib/consumer/postgres/controls/handler.rb,
lib/consumer/postgres/controls/message.rb,
lib/consumer/postgres/controls/category.rb,
lib/consumer/postgres/controls/consumer.rb,
lib/consumer/postgres/controls/position.rb,
lib/consumer/postgres/controls/identifier.rb,
lib/consumer/postgres/controls/stream_name.rb,
lib/consumer/postgres/position_store/recorded.rb,
lib/consumer/postgres/position_store/stream_name.rb,
lib/consumer/postgres/controls/position/stream/write.rb,
lib/consumer/postgres/controls/position/store/recorded.rb

Defined Under Namespace

Modules: Controls, Identifier Classes: PositionStore

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(cls) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
# File 'lib/consumer/postgres/postgres.rb', line 3

def self.included(cls)
  cls.class_exec do
    include ::Consumer

    attr_accessor :batch_size
    attr_accessor :correlation
    attr_accessor :group_member
    attr_accessor :group_size
    attr_accessor :condition
  end
end

Instance Method Details

#configure(session_settings: nil, batch_size: nil, correlation: nil, group_member: nil, group_size: nil, condition: nil, settings: nil) ⇒ Object

Remove deprecated settings argument that has been replaced with session_settings when no longer in use (Nathan Ladd, Mon Nov 30 2020) Under what conditions will the settings argument no longer be in use? (Scott Bellware, Thu Apr 29 2021)



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/consumer/postgres/postgres.rb', line 44

def configure(session_settings: nil, batch_size: nil, correlation: nil, group_member: nil, group_size: nil, condition: nil, settings: nil)
  self.batch_size = batch_size
  self.correlation = correlation
  self.group_member = group_member
  self.group_size = group_size
  self.condition = condition

  session_settings ||= settings

  if not session_settings.nil?
    if not session_settings.is_a?(::Settings)
      session_settings = ::Settings.build(session_settings)
    end
  end

  MessageStore::Postgres::Session.configure(self, settings: session_settings)
  session = self.session

  MessageStore::Postgres::Get::Category.configure(
    self,
    category,
    batch_size: batch_size,
    correlation: correlation,
    consumer_group_member: group_member,
    consumer_group_size: group_size,
    condition: condition,
    session: session
  )

  PositionStore.configure(
    self,
    category,
    consumer_identifier: identifier,
    session: session
  )
end

#log_startup_infoObject



28
29
30
31
32
33
34
# File 'lib/consumer/postgres/postgres.rb', line 28

def log_startup_info
  logger.info(tags: [:consumer, :start]) { "Correlation: #{correlation.inspect} (Consumer: #{self.class.name})" }
  logger.info(tags: [:consumer, :start]) { "Batch Size: #{get.batch_size.inspect} (Consumer: #{self.class.name})" }
  logger.info(tags: [:consumer, :start]) { "Group Member: #{group_member.inspect} (Consumer: #{self.class.name})" }
  logger.info(tags: [:consumer, :start]) { "Group Size: #{group_size.inspect} (Consumer: #{self.class.name})" }
  logger.info(tags: [:consumer, :start]) { "Condition: #{condition.inspect} (Consumer: #{self.class.name})" }
end


15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/consumer/postgres/postgres.rb', line 15

def print_startup_info
  STDOUT.puts "      Correlation: #{correlation || '(none)'}"

  unless group_member.nil? && group_size.nil?
    STDOUT.puts "      Group Member: #{group_member.inspect}"
    STDOUT.puts "      Group Size: #{group_size.inspect}"
  end

  unless condition.nil?
    STDOUT.puts "      Condition: #{condition.inspect || '(none)'}"
  end
end

#startingObject



36
37
38
39
40
# File 'lib/consumer/postgres/postgres.rb', line 36

def starting
  if identifier.nil? && !group_member.nil? && !group_size.nil?
    raise Identifier::Error, 'Identifier must not be omitted when the consumer is a member of a group'
  end
end