Module: Mqjob::Worker::ClassMethods
- Defined in:
- lib/mqjob/worker.rb
Instance Attribute Summary collapse
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
-
#topic_opts ⇒ Object
readonly
Returns the value of attribute topic_opts.
Instance Method Summary collapse
-
#enqueue(msg, opts = {}) ⇒ Object
opts in publish message in X seconds at publish message at specific time init_subscription Boolean 是否先初始化一个订阅 perform_now Boolean 立即执行,通常用于测试环境减少流程.
-
#from_topic(name, opts = {}) ⇒ Object
client: MQ, plugin: :pulsar, prefetch: 1, subscription_mode: SUBSCRIPTION_MODES, # 不同类型需要不同配置参数,互斥模式下需要指定订阅名 subscription_name logger: MyLogger topic_type [:normal, :regex] default normal.
Instance Attribute Details
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
89 90 91 |
# File 'lib/mqjob/worker.rb', line 89 def topic @topic end |
#topic_opts ⇒ Object (readonly)
Returns the value of attribute topic_opts.
88 89 90 |
# File 'lib/mqjob/worker.rb', line 88 def topic_opts @topic_opts end |
Instance Method Details
#enqueue(msg, opts = {}) ⇒ Object
opts
in publish in X seconds
at publish at specific time
init_subscription Boolean
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/mqjob/worker.rb', line 113 def enqueue(msg, opts={}) if topic_opts[:topic_type] != :normal ::Mqjob.logger.error(__method__){ "message enqueue only support topic_type set to normal, but got 「#{topic_opts[:topic_type]}」! After action skipped!" } return false end if !opts[:perform_now] @mq ||= Plugin.client(topic_opts[:client]) @mq.publish(topic, msg, topic_opts.merge(opts)) return true end begin worker = self.new({}) if worker.respond_to?(:perform) msg = JSON.parse(JSON.dump(msg)) ::Mqjob.logger.info('perform message now'){msg.inspect} worker.send(:process_work, nil, OpenStruct.new(payload: msg)) else ::Mqjob.logger.error('perform_now required 「perform」 method, 「perform_full_msg」not supported!') end rescue => exp ::Mqjob.logger.error("#{self.name} perform_now") {exp} end true end |
#from_topic(name, opts = {}) ⇒ Object
client: MQ, plugin: :pulsar, prefetch: 1, subscription_mode: SUBSCRIPTION_MODES, # 不同类型需要不同配置参数,互斥模式下需要指定订阅名subscription_name logger: MyLogger topic_type [:normal, :regex] default normal
98 99 100 101 102 103 104 105 106 |
# File 'lib/mqjob/worker.rb', line 98 def from_topic(name, opts={}) @topic = name.respond_to?(:call) ? name.call : name @topic_opts = opts topic_type = @topic_opts[:topic_type]&.to_sym @topic_opts[:topic_type] = topic_type || :normal @topic_opts[:subscription_name] ||= (self.name.split('::') << 'Consumer').join end |