Module: Mqjob::Worker
- Defined in:
- lib/mqjob/worker.rb
Defined Under Namespace
Modules: ClassMethods
Constant Summary collapse
- SUBSCRIPTION_MODES =
[:exclusive, :failover, :shared].freeze
Instance Method Summary collapse
- #ack! ⇒ Object
- #do_work(cmd, msg) ⇒ Object
- #initialize(opts) ⇒ Object
- #perform(msg) ⇒ Object
- #reject! ⇒ Object
- #requeue! ⇒ Object
- #run ⇒ Object
- #stop ⇒ Object
Instance Method Details
#ack! ⇒ Object
13 |
# File 'lib/mqjob/worker.rb', line 13 def ack!; :ack end |
#do_work(cmd, msg) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/mqjob/worker.rb', line 17 def do_work(cmd, msg) @pool.post do begin wrap_perform = ::Mqjob.hooks&.wrap_perform ::Mqjob.logger.debug(__method__){'Begin process'} if wrap_perform.nil? process_work(cmd, msg) else wrap_perform.call do process_work(cmd, msg) end end ::Mqjob.logger.debug(__method__){'Finish process'} rescue => exp ::Mqjob.logger.error(__method__){"message process error: #{exp.}! cmd: #{cmd}, msg: #{msg}"} ::Mqjob.logger.error(__method__){exp} end end end |
#initialize(opts) ⇒ Object
5 6 7 8 9 10 11 |
# File 'lib/mqjob/worker.rb', line 5 def initialize(opts) @pool = opts[:pool] @topic = self.class.topic @topic_opts = self.class.topic_opts @mq = Plugin.client(@topic_opts[:client]) end |
#perform(msg) ⇒ Object
48 |
# File 'lib/mqjob/worker.rb', line 48 def perform(msg); end |
#reject! ⇒ Object
14 |
# File 'lib/mqjob/worker.rb', line 14 def reject!; :reject; end |
#requeue! ⇒ Object
15 |
# File 'lib/mqjob/worker.rb', line 15 def requeue!; :requeue; end |
#run ⇒ Object
40 41 42 |
# File 'lib/mqjob/worker.rb', line 40 def run @mq.listen(@topic, self, @topic_opts) end |
#stop ⇒ Object
44 45 46 |
# File 'lib/mqjob/worker.rb', line 44 def stop @mq.close_listen end |