Class: ASIR::Transport::Resque
- Inherits:
-
ConnectionOriented
- Object
- ConnectionOriented
- ASIR::Transport::Resque
- Includes:
- PollThrottle
- Defined in:
- lib/asir/transport/resque.rb
Overview
!SLIDE Resque Transport
Constant Summary collapse
- DEFAULT_QUEUE =
'asir'.freeze
- @@redis_server_version =
nil
Instance Attribute Summary collapse
-
#namespace ⇒ Object
Returns the value of attribute namespace.
-
#queue ⇒ Object
Defaults to ‘asir’.
-
#queues ⇒ Object
Returns the value of attribute queues.
-
#throttle ⇒ Object
Returns the value of attribute throttle.
Class Method Summary collapse
-
.perform(metadata, payload = nil) ⇒ Object
Class method entry point from Resque::Job.perform.
Instance Method Summary collapse
-
#_client_connect! ⇒ Object
!SLIDE Resque client.
- #_receive_message(state) ⇒ Object
- #_receive_result(state) ⇒ Object
- #_send_message(state) ⇒ Object
- #_send_result(state) ⇒ Object
-
#_server! ⇒ Object
!SLIDE Resque server (worker).
- #_server_accept_connection!(server) ⇒ Object
-
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for Resque.
- #_start_conduit! ⇒ Object
-
#initialize(*args) ⇒ Resque
constructor
A new instance of Resque.
-
#namespace_ ⇒ Object
Defaults to ‘asir’.
-
#queues_ ⇒ Object
Defaults to [ ‘asir’ ].
- #redis_server_version ⇒ Object
- #resque_connect! ⇒ Object
- #resque_disconnect! ⇒ Object
- #resque_uri ⇒ Object
- #resque_worker ⇒ Object
-
#serve_stream_message!(in_stream, out_stream) ⇒ Object
ignored.
- #server_on_start! ⇒ Object
- #server_on_stop! ⇒ Object
-
#stream_eof?(stream) ⇒ Boolean
Resque is message-oriented, process only one message per “connection”.
Constructor Details
#initialize(*args) ⇒ Resque
Returns a new instance of Resque.
14 15 16 17 18 19 |
# File 'lib/asir/transport/resque.rb', line 14 def initialize *args @port_default = 6379 @scheme_default = 'redis'.freeze super self.one_way = true end |
Instance Attribute Details
#namespace ⇒ Object
Returns the value of attribute namespace.
12 13 14 |
# File 'lib/asir/transport/resque.rb', line 12 def namespace @namespace end |
#queue ⇒ Object
Defaults to ‘asir’.
82 83 84 |
# File 'lib/asir/transport/resque.rb', line 82 def queue @queue end |
#queues ⇒ Object
Returns the value of attribute queues.
12 13 14 |
# File 'lib/asir/transport/resque.rb', line 12 def queues @queues end |
#throttle ⇒ Object
Returns the value of attribute throttle.
12 13 14 |
# File 'lib/asir/transport/resque.rb', line 12 def throttle @throttle end |
Class Method Details
.perform(metadata, payload = nil) ⇒ Object
Class method entry point from Resque::Job.perform.
124 125 126 127 128 129 130 |
# File 'lib/asir/transport/resque.rb', line 124 def self.perform , payload = nil payload ||= # old calling signature (just payload). # $stderr.puts " #{self} process_job payload=#{payload.inspect}" t = ::Thread.current[:asir_transport_resque_instance] # Pass payload as in_stream; _receive_message will return it. t. payload, nil end |
Instance Method Details
#_client_connect! ⇒ Object
!SLIDE Resque client.
23 24 25 26 27 28 |
# File 'lib/asir/transport/resque.rb', line 23 def _client_connect! # $stderr.puts " #{$$} #{self} _client_connect!" resque_connect! rescue ::Exception => exc raise exc.class, "#{self.class} #{uri}: #{exc.}", exc.backtrace end |
#_receive_message(state) ⇒ Object
132 133 134 |
# File 'lib/asir/transport/resque.rb', line 132 def state state. = state.in_stream end |
#_receive_result(state) ⇒ Object
40 41 42 43 |
# File 'lib/asir/transport/resque.rb', line 40 def _receive_result state return nil if one_way || state..one_way super end |
#_send_message(state) ⇒ Object
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/asir/transport/resque.rb', line 50 def state stream.with_stream! do | io | # Force connect = state. queue = [:resque_queue] || [:queue] || self.queue $stderr.puts " #{$$} #{self} _send_message #{.inspect} to queue=#{queue.inspect} as #{self.class} :process_job" if @verbose >= 2 # Invokes Transport::Resque.perform(metadata, payload) = [:resque_metadata] || .description ::Resque.enqueue_to(queue, self.class, , state.) end end |
#_send_result(state) ⇒ Object
45 46 47 48 |
# File 'lib/asir/transport/resque.rb', line 45 def _send_result state return nil if one_way || state..one_way super end |
#_server! ⇒ Object
!SLIDE Resque server (worker).
32 33 34 35 36 37 38 |
# File 'lib/asir/transport/resque.rb', line 32 def _server! # $stderr.puts " #{$$} #{self} _server!" resque_connect! resque_worker rescue ::Exception => exc raise exc.class, "#{self.class} #{uri}: #{exc.}", exc.backtrace end |
#_server_accept_connection!(server) ⇒ Object
93 94 95 |
# File 'lib/asir/transport/resque.rb', line 93 def _server_accept_connection! server [ server, server ] end |
#_server_close_connection!(in_stream, out_stream) ⇒ Object
Nothing to be closed for Resque.
103 104 105 |
# File 'lib/asir/transport/resque.rb', line 103 def _server_close_connection! in_stream, out_stream # NOTHING end |
#_start_conduit! ⇒ Object
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/asir/transport/resque.rb', line 217 def _start_conduit! @redis_dir ||= "/tmp" @redis_conf ||= "#{@redis_dir}/asir-redis-#{port}.conf" @redis_log ||= "#{@redis_dir}/asir-redis-#{port}.log" @redis_cmd = [ 'redis-server' ] case redis_server_version when /^2\.4/ ::File.open(@redis_conf, "w+") do | out | out.puts "daemonize no" out.puts "port #{port}" out.puts "loglevel warning" out.puts "logfile #{@redis_log}" end @redis_cmd << @redis_conf else @redis_cmd << '--port' << port << '--loglevel' << 'warning' << '--logfile' << @redis_log end @redis_cmd.map! { | x | x.to_s } # $stderr.puts " redis_cmd = #{@redis_cmd * ' '}" if @verbose >= 1 exec *@redis_cmd end |
#namespace_ ⇒ Object
Defaults to ‘asir’.
87 88 89 |
# File 'lib/asir/transport/resque.rb', line 87 def namespace_ @namespace_ ||= namespace || DEFAULT_QUEUE end |
#queues_ ⇒ Object
Defaults to [ ‘asir’ ].
76 77 78 79 |
# File 'lib/asir/transport/resque.rb', line 76 def queues_ @queues_ ||= queues.empty? ? [ DEFAULT_QUEUE ] : queues.freeze end |
#redis_server_version ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/asir/transport/resque.rb', line 202 def redis_server_version @@redis_server_version ||= begin case v = `redis-server --version` when /v=([.0-9]+)/ # 3.x v = $1 when / version ([.0-9]+)/ # 2.x v = $1 else v = 'UNKNOWN' end v end end |
#resque_connect! ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/asir/transport/resque.rb', line 148 def resque_connect! @redis_config = { :host => host, :port => port, :thread_safe => true, } @redis = ::Redis.new(@redis_config) if namespace_ ::Resque.redis = @redis = ::Redis::Namespace.new(namespace_, :redis => @redis) ::Resque.redis.namespace = namespace_ else ::Resque.redis = @redis end # $stderr.puts " *** #{$$} #{self} resque_connect! #{@redis.inspect}" @redis end |
#resque_disconnect! ⇒ Object
168 169 170 |
# File 'lib/asir/transport/resque.rb', line 168 def resque_disconnect! ::Resque.redis = nil end |
#resque_uri ⇒ Object
138 139 140 141 142 143 144 145 146 |
# File 'lib/asir/transport/resque.rb', line 138 def resque_uri @resque_uri ||= ( unless scheme == 'redis' raise ArgumentError, "Invalid resque URI: #{uri.inspect}" end _uri ) end |
#resque_worker ⇒ Object
172 173 174 |
# File 'lib/asir/transport/resque.rb', line 172 def resque_worker @resque_worker ||= ::Resque::Worker.new(*queues_) end |
#serve_stream_message!(in_stream, out_stream) ⇒ Object
ignored
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/asir/transport/resque.rb', line 107 def in_stream, out_stream # ignored save = ::Thread.current[:asir_transport_resque_instance] ::Thread.current[:asir_transport_resque_instance] = self poll_throttle throttle do $stderr.puts " #{$$} #{self} serve_stream_message!: resque_worker = #{resque_worker} on queues #{resque_worker.queues.inspect}" if @verbose >= 3 if job = resque_worker.reserve $stderr.puts " #{$$} #{self} serve_stream_message! job=#{job.class}:#{job.inspect}" if @verbose >= 2 resque_worker.process(job) end job end self ensure ::Thread.current[:asir_transport_resque_instance] = save end |
#server_on_start! ⇒ Object
176 177 178 179 180 181 182 183 184 185 |
# File 'lib/asir/transport/resque.rb', line 176 def server_on_start! # prune_dead_workers expects processes to have "resque " in the name. @save_progname ||= $0.dup $0 = "resque #{$0}" if worker = resque_worker worker.prune_dead_workers worker.register_worker end self end |
#server_on_stop! ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/asir/transport/resque.rb', line 187 def server_on_stop! $0 = @save_progname if @save_progname if worker = @resque_worker worker.unregister_worker end self rescue Redis::CannotConnectError # This error is not actionable since server # is stopping. nil end |
#stream_eof?(stream) ⇒ Boolean
Resque is message-oriented, process only one message per “connection”.
98 99 100 |
# File 'lib/asir/transport/resque.rb', line 98 def stream_eof? stream false end |