Class: RubyEventStore::ComposedBroker

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/composed_broker.rb

Instance Method Summary collapse

Constructor Details

#initialize(*brokers, multiple_brokers: false) ⇒ ComposedBroker

Returns a new instance of ComposedBroker.



5
6
7
8
# File 'lib/ruby_event_store/composed_broker.rb', line 5

def initialize(*brokers, multiple_brokers: false)
  @brokers = brokers
  @multiple_brokers = multiple_brokers
end

Instance Method Details

#add_global_subscription(subscriber) ⇒ Object



27
28
29
30
31
# File 'lib/ruby_event_store/composed_broker.rb', line 27

def add_global_subscription(subscriber)
  brokers = verified_brokers(nil)
  raise SubscriptionsNotSupported, "No broker found for global subscription." if brokers.empty?
  brokers.each { |broker| broker.add_global_subscription(subscriber) }
end

#add_subscription(subscriber, topics) ⇒ Object



19
20
21
22
23
24
25
# File 'lib/ruby_event_store/composed_broker.rb', line 19

def add_subscription(subscriber, topics)
  topics.each do |topic|
    brokers = verified_brokers(topic)
    raise SubscriptionsNotSupported, "No broker found for topic '#{topic}'." if brokers.empty?
    brokers.each { |broker| broker.add_subscription(subscriber, topic) }
  end
end

#add_thread_global_subscription(subscriber) ⇒ Object



41
42
43
44
45
# File 'lib/ruby_event_store/composed_broker.rb', line 41

def add_thread_global_subscription(subscriber)
  brokers = verified_brokers(nil)
  raise SubscriptionsNotSupported, "No broker found for global subscription." if brokers.empty?
  brokers.each { |broker| broker.add_thread_global_subscription(subscriber) }
end

#add_thread_subscription(subscriber, topics) ⇒ Object



33
34
35
36
37
38
39
# File 'lib/ruby_event_store/composed_broker.rb', line 33

def add_thread_subscription(subscriber, topics)
  topics.each do |topic|
    brokers = verified_brokers(topic)
    raise SubscriptionsNotSupported, "No broker found for topic '#{topic}'." if brokers.empty?
    brokers.each { |broker| broker.add_thread_subscription(subscriber, topic) }
  end
end

#all_subscriptions_for(topic) ⇒ Object



47
48
49
# File 'lib/ruby_event_store/composed_broker.rb', line 47

def all_subscriptions_for(topic)
  @brokers.flat_map { |broker| broker.all_subscriptions_for(topic) }
end

#call(event, record, topic) ⇒ Object



10
11
12
13
14
15
16
17
# File 'lib/ruby_event_store/composed_broker.rb', line 10

def call(event, record, topic)
  brokers = verified_brokers(topic)
  if brokers.empty?
    warn "No broker found for topic '#{topic}'. Event #{event.event_id} will not be processed."
  else
    brokers.each { |broker| broker.call(event, record, topic) }
  end
end

#verify(topic) ⇒ Object



51
52
53
# File 'lib/ruby_event_store/composed_broker.rb', line 51

def verify(topic)
  !verified_brokers(topic).empty?
end