Class: Warren::Fox

Inherits:
Object
  • Object
show all
Extended by:
Forwardable, Helpers::StateMachine
Defined in:
lib/warren/fox.rb

Overview

A fox is a rabbitMQ consumer. It handles subscription to the queue and passing message on to the registered Subscriber

Constant Summary collapse

FOX =

A little cute fox emoji to easily flag output from the consumers

'🦊'
MAX_RECONNECT_DELAY =

Maximum wait time between database retries: 5 minutes

60 * 5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers::StateMachine

states

Constructor Details

#initialize(name:, subscription:, adaptor:, subscribed_class:, delayed:) ⇒ Fox

Creates a fox, a RabbitMQ consumer. Subscribes to the queues defined in ‘subscription` and passes messages on to the subscriber

Parameters:

  • name (String)

    The name of the consumer

  • subscription (Warren::Subscription)

    Describes the queue to subscribe to

  • adaptor (#recovered?, #handle, #env)

    An adaptor to handle framework specifics

  • subscribed_class (Warren::Subscriber::Base)

    The class to process received messages

  • delayed (Warren::DelayExchange)

    The details handling delayed message broadcast



36
37
38
39
40
41
42
43
44
# File 'lib/warren/fox.rb', line 36

def initialize(name:, subscription:, adaptor:, subscribed_class:, delayed:)
  @consumer_tag = "#{adaptor.env}_#{name}_#{Process.pid}"
  @subscription = subscription
  @delayed = delayed
  @logger = Warren::LogTagger.new(logger: adaptor.logger, tag: "#{FOX} #{@consumer_tag}")
  @adaptor = adaptor
  @subscribed_class = subscribed_class
  @state = :initialized
end

Instance Attribute Details

#consumer_tagObject (readonly)

Returns the value of attribute consumer_tag.



23
24
25
# File 'lib/warren/fox.rb', line 23

def consumer_tag
  @consumer_tag
end

#delayedObject (readonly)

Returns the value of attribute delayed.



23
24
25
# File 'lib/warren/fox.rb', line 23

def delayed
  @delayed
end

#stateObject (readonly)

Returns the value of attribute state.



23
24
25
# File 'lib/warren/fox.rb', line 23

def state
  @state
end

#subscriptionObject (readonly)

Returns the value of attribute subscription.



23
24
25
# File 'lib/warren/fox.rb', line 23

def subscription
  @subscription
end

Instance Method Details

#attempt_recoveryObject

If the fox is paused, and a recovery attempt is scheduled, will prompt the framework adaptor to attempt to recover. (Such as reconnecting to the database). If this operation is successful will resubscribe to the queue, otherwise a further recovery attempt will be scheduled. Successive recovery attempts will be gradually further apart, up to the MAX_RECONNECT_DELAY of 5 minutes.



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/warren/fox.rb', line 100

def attempt_recovery
  return unless paused? && recovery_due?

  warn { "Attempting recovery: #{@recovery_attempts}" }
  if recovered?
    running!
    subscribe!
  else
    @recovery_attempts += 1
    @recover_at = Time.now + delay_for_attempt
  end
end

#pause!Void

Temporarily unsubscribes the consumer, and schedules an attempted recovery. Recovery is triggered by the #attempt_recovery method which gets called periodically by Client

Returns:

  • (Void)


85
86
87
88
89
90
91
92
# File 'lib/warren/fox.rb', line 85

def pause!
  return unless running?

  unsubscribe!
  @recovery_attempts = 0
  @recover_at = Time.now
  paused!
end

#run!Void

Starts up the fox, automatically registering the configured queues and bindings before subscribing to the queue.

Returns:

  • (Void)


55
56
57
58
59
60
61
62
63
# File 'lib/warren/fox.rb', line 55

def run!
  starting!
  subscription.activate! # Set up the queues
  delayed.activate!
  running!            # Transition to running state
  subscribe!          # Subscribe to the queue

  info { 'Started consumer' }
end

#stop!Void

Stop the consumer and unsubscribes from the queue. Blocks until fully unsubscribed.

Returns:

  • (Void)


70
71
72
73
74
75
76
# File 'lib/warren/fox.rb', line 70

def stop!
  info { 'Stopping consumer' }
  stopping!
  unsubscribe!
  info { 'Stopped consumer' }
  stopped!
end