Class: Warren::Fox
- Inherits:
-
Object
- Object
- Warren::Fox
- Extended by:
- Forwardable, Helpers::StateMachine
- Defined in:
- lib/warren/fox.rb
Overview
A fox is a rabbitMQ consumer. It handles subscription to the queue and passing message on to the registered Subscriber
Constant Summary collapse
- FOX =
A little cute fox emoji to easily flag output from the consumers
'🦊'
- MAX_RECONNECT_DELAY =
Maximum wait time between database retries: 5 minutes
60 * 5
Instance Attribute Summary collapse
-
#consumer_tag ⇒ Object
readonly
Returns the value of attribute consumer_tag.
-
#delayed ⇒ Object
readonly
Returns the value of attribute delayed.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#subscription ⇒ Object
readonly
Returns the value of attribute subscription.
Instance Method Summary collapse
-
#attempt_recovery ⇒ Object
If the fox is paused, and a recovery attempt is scheduled, will prompt the framework adaptor to attempt to recover.
-
#initialize(name:, subscription:, adaptor:, subscribed_class:, delayed:) ⇒ Fox
constructor
Creates a fox, a RabbitMQ consumer.
-
#pause! ⇒ Void
Temporarily unsubscribes the consumer, and schedules an attempted recovery.
-
#run! ⇒ Void
Starts up the fox, automatically registering the configured queues and bindings before subscribing to the queue.
-
#stop! ⇒ Void
Stop the consumer and unsubscribes from the queue.
Methods included from Helpers::StateMachine
Constructor Details
#initialize(name:, subscription:, adaptor:, subscribed_class:, delayed:) ⇒ Fox
Creates a fox, a RabbitMQ consumer. Subscribes to the queues defined in ‘subscription` and passes messages on to the subscriber
36 37 38 39 40 41 42 43 44 |
# File 'lib/warren/fox.rb', line 36 def initialize(name:, subscription:, adaptor:, subscribed_class:, delayed:) @consumer_tag = "#{adaptor.env}_#{name}_#{Process.pid}" @subscription = subscription @delayed = delayed @logger = Warren::LogTagger.new(logger: adaptor.logger, tag: "#{FOX} #{@consumer_tag}") @adaptor = adaptor @subscribed_class = subscribed_class @state = :initialized end |
Instance Attribute Details
#consumer_tag ⇒ Object (readonly)
Returns the value of attribute consumer_tag.
23 24 25 |
# File 'lib/warren/fox.rb', line 23 def consumer_tag @consumer_tag end |
#delayed ⇒ Object (readonly)
Returns the value of attribute delayed.
23 24 25 |
# File 'lib/warren/fox.rb', line 23 def delayed @delayed end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
23 24 25 |
# File 'lib/warren/fox.rb', line 23 def state @state end |
#subscription ⇒ Object (readonly)
Returns the value of attribute subscription.
23 24 25 |
# File 'lib/warren/fox.rb', line 23 def subscription @subscription end |
Instance Method Details
#attempt_recovery ⇒ Object
If the fox is paused, and a recovery attempt is scheduled, will prompt the framework adaptor to attempt to recover. (Such as reconnecting to the database). If this operation is successful will resubscribe to the queue, otherwise a further recovery attempt will be scheduled. Successive recovery attempts will be gradually further apart, up to the MAX_RECONNECT_DELAY of 5 minutes.
100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/warren/fox.rb', line 100 def attempt_recovery return unless paused? && recovery_due? warn { "Attempting recovery: #{@recovery_attempts}" } if recovered? running! subscribe! else @recovery_attempts += 1 @recover_at = Time.now + delay_for_attempt end end |
#pause! ⇒ Void
Temporarily unsubscribes the consumer, and schedules an attempted recovery. Recovery is triggered by the #attempt_recovery method which gets called periodically by Client
85 86 87 88 89 90 91 92 |
# File 'lib/warren/fox.rb', line 85 def pause! return unless running? unsubscribe! @recovery_attempts = 0 @recover_at = Time.now paused! end |
#run! ⇒ Void
Starts up the fox, automatically registering the configured queues and bindings before subscribing to the queue.
55 56 57 58 59 60 61 62 63 |
# File 'lib/warren/fox.rb', line 55 def run! starting! subscription.activate! # Set up the queues delayed.activate! running! # Transition to running state subscribe! # Subscribe to the queue info { 'Started consumer' } end |
#stop! ⇒ Void
Stop the consumer and unsubscribes from the queue. Blocks until fully unsubscribed.
70 71 72 73 74 75 76 |
# File 'lib/warren/fox.rb', line 70 def stop! info { 'Stopping consumer' } stopping! unsubscribe! info { 'Stopped consumer' } stopped! end |