Module: Mqjob::Worker

Defined in:
lib/mqjob/worker.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

SUBSCRIPTION_MODES =
[:exclusive, :failover, :shared].freeze

Instance Method Summary collapse

Instance Method Details

#ack!Object



13
# File 'lib/mqjob/worker.rb', line 13

def ack!; :ack end

#do_work(cmd, msg) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/mqjob/worker.rb', line 17

def do_work(cmd, msg)
  @pool.post do
    begin
      wrap_perform = ::Mqjob.hooks&.wrap_perform

      ::Mqjob.logger.debug(__method__){'Begin process'}

      if wrap_perform.nil?
        process_work(cmd, msg)
      else
        wrap_perform.call do
          process_work(cmd, msg)
        end
      end

      ::Mqjob.logger.debug(__method__){'Finish process'}
    rescue => exp
      ::Mqjob.logger.error(__method__){"message process error: #{exp.message}! cmd: #{cmd}, msg: #{msg}"}
      ::Mqjob.logger.error(__method__){exp}
    end
  end
end

#initialize(opts) ⇒ Object



5
6
7
8
9
10
11
# File 'lib/mqjob/worker.rb', line 5

def initialize(opts)
  @pool = opts[:pool]
  @topic = self.class.topic
  @topic_opts = self.class.topic_opts

  @mq = Plugin.client(@topic_opts[:client])
end

#perform(msg) ⇒ Object



48
# File 'lib/mqjob/worker.rb', line 48

def perform(msg); end

#reject!Object



14
# File 'lib/mqjob/worker.rb', line 14

def reject!; :reject; end

#requeue!Object



15
# File 'lib/mqjob/worker.rb', line 15

def requeue!; :requeue; end

#runObject



40
41
42
# File 'lib/mqjob/worker.rb', line 40

def run
  @mq.listen(@topic, self, @topic_opts)
end

#stopObject



44
45
46
# File 'lib/mqjob/worker.rb', line 44

def stop
  @mq.close_listen
end