Module: Mqjob::Worker::ClassMethods

Defined in:
lib/mqjob/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#topicObject (readonly)

Returns the value of attribute topic.



89
90
91
# File 'lib/mqjob/worker.rb', line 89

def topic
  @topic
end

#topic_optsObject (readonly)

Returns the value of attribute topic_opts.



88
89
90
# File 'lib/mqjob/worker.rb', line 88

def topic_opts
  @topic_opts
end

Instance Method Details

#enqueue(msg, opts = {}) ⇒ Object

opts

in publish message in X seconds
at publish message at specific time
init_subscription Boolean 


113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/mqjob/worker.rb', line 113

def enqueue(msg, opts={})
  if topic_opts[:topic_type] != :normal
    ::Mqjob.logger.error(__method__){
      "message enqueue only support topic_type set to normal, but got 「#{topic_opts[:topic_type]}」! After action skipped!"
    }
    return false
  end

  if !opts[:perform_now]
    @mq ||= Plugin.client(topic_opts[:client])
    @mq.publish(topic, msg, topic_opts.merge(opts))
    return true
  end

  begin
    worker = self.new({})
    if worker.respond_to?(:perform)
      msg = JSON.parse(JSON.dump(msg))
      ::Mqjob.logger.info('perform message now'){msg.inspect}
      worker.send(:process_work, nil, OpenStruct.new(payload: msg))
    else
      ::Mqjob.logger.error('perform_now required 「perform」 method, 「perform_full_msg」not supported!')
    end
  rescue => exp
    ::Mqjob.logger.error("#{self.name} perform_now") {exp}
  end
  true
end

#from_topic(name, opts = {}) ⇒ Object

client: MQ, plugin: :pulsar, prefetch: 1, subscription_mode: SUBSCRIPTION_MODES, # 不同类型需要不同配置参数,互斥模式下需要指定订阅名subscription_name logger: MyLogger topic_type [:normal, :regex] default normal



98
99
100
101
102
103
104
105
106
# File 'lib/mqjob/worker.rb', line 98

def from_topic(name, opts={})
  @topic = name.respond_to?(:call) ? name.call : name
  @topic_opts = opts

  topic_type = @topic_opts[:topic_type]&.to_sym
  @topic_opts[:topic_type] = topic_type || :normal

  @topic_opts[:subscription_name] ||= (self.name.split('::') << 'Consumer').join
end