Class: ASIR::Transport::Resque

Inherits:
ConnectionOriented
  • Object
show all
Includes:
PollThrottle
Defined in:
lib/asir/transport/resque.rb

Overview

!SLIDE Resque Transport

Constant Summary collapse

DEFAULT_QUEUE =
'asir'.freeze
@@redis_server_version =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Resque

Returns a new instance of Resque.



14
15
16
17
18
19
# File 'lib/asir/transport/resque.rb', line 14

def initialize *args
  @port_default = 6379
  @scheme_default = 'redis'.freeze
  super
  self.one_way = true
end

Instance Attribute Details

#namespaceObject

Returns the value of attribute namespace.



12
13
14
# File 'lib/asir/transport/resque.rb', line 12

def namespace
  @namespace
end

#queueObject

Defaults to ‘asir’.



82
83
84
# File 'lib/asir/transport/resque.rb', line 82

def queue
  @queue
end

#queuesObject

Returns the value of attribute queues.



12
13
14
# File 'lib/asir/transport/resque.rb', line 12

def queues
  @queues
end

#throttleObject

Returns the value of attribute throttle.



12
13
14
# File 'lib/asir/transport/resque.rb', line 12

def throttle
  @throttle
end

Class Method Details

.perform(metadata, payload = nil) ⇒ Object

Class method entry point from Resque::Job.perform.



124
125
126
127
128
129
130
# File 'lib/asir/transport/resque.rb', line 124

def self.perform , payload = nil
  payload ||=  # old calling signature (just payload).
  # $stderr.puts "  #{self} process_job payload=#{payload.inspect}"
  t = ::Thread.current[:asir_transport_resque_instance]
  # Pass payload as in_stream; _receive_message will return it.
  t.serve_message! payload, nil
end

Instance Method Details

#_client_connect!Object

!SLIDE Resque client.



23
24
25
26
27
28
# File 'lib/asir/transport/resque.rb', line 23

def _client_connect!
  # $stderr.puts "  #{$$} #{self} _client_connect!"
  resque_connect!
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{uri}: #{exc.message}", exc.backtrace
end

#_receive_message(state) ⇒ Object



132
133
134
# File 'lib/asir/transport/resque.rb', line 132

def _receive_message state
  state.message_payload = state.in_stream
end

#_receive_result(state) ⇒ Object



40
41
42
43
# File 'lib/asir/transport/resque.rb', line 40

def _receive_result state
  return nil if one_way || state.message.one_way
  super
end

#_send_message(state) ⇒ Object



50
51
52
53
54
55
56
57
58
59
# File 'lib/asir/transport/resque.rb', line 50

def _send_message state
  stream.with_stream! do | io |  # Force connect
    message = state.message
    queue = message[:resque_queue] || message[:queue] || self.queue
    $stderr.puts "  #{$$} #{self} _send_message #{message_payload.inspect} to queue=#{queue.inspect} as #{self.class} :process_job" if @verbose >= 2
    # Invokes Transport::Resque.perform(metadata, payload)
     = message[:resque_metadata] || message.description
    ::Resque.enqueue_to(queue, self.class, , state.message_payload)
  end
end

#_send_result(state) ⇒ Object



45
46
47
48
# File 'lib/asir/transport/resque.rb', line 45

def _send_result state
  return nil if one_way || state.message.one_way
  super
end

#_server!Object

!SLIDE Resque server (worker).



32
33
34
35
36
37
38
# File 'lib/asir/transport/resque.rb', line 32

def _server!
  # $stderr.puts "  #{$$} #{self} _server!"
  resque_connect!
  resque_worker
rescue ::Exception => exc
  raise exc.class, "#{self.class} #{uri}: #{exc.message}", exc.backtrace
end

#_server_accept_connection!(server) ⇒ Object



93
94
95
# File 'lib/asir/transport/resque.rb', line 93

def _server_accept_connection! server
  [ server, server ]
end

#_server_close_connection!(in_stream, out_stream) ⇒ Object

Nothing to be closed for Resque.



103
104
105
# File 'lib/asir/transport/resque.rb', line 103

def _server_close_connection! in_stream, out_stream
  # NOTHING
end

#_start_conduit!Object



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/asir/transport/resque.rb', line 217

def _start_conduit!
  @redis_dir ||= "/tmp"
  @redis_conf ||= "#{@redis_dir}/asir-redis-#{port}.conf"
  @redis_log ||= "#{@redis_dir}/asir-redis-#{port}.log"
  @redis_cmd = [ 'redis-server' ]
  case redis_server_version
  when /^2\.4/
    ::File.open(@redis_conf, "w+") do | out |
      out.puts "daemonize no"
      out.puts "port #{port}"
      out.puts "loglevel warning"
      out.puts "logfile #{@redis_log}"
    end
    @redis_cmd << @redis_conf
  else
    @redis_cmd <<
      '--port'     << port <<
      '--loglevel' << 'warning' <<
      '--logfile'  << @redis_log
  end
  @redis_cmd.map! { | x | x.to_s }
  # $stderr.puts "  redis_cmd = #{@redis_cmd * ' '}" if @verbose >= 1
  exec *@redis_cmd
end

#namespace_Object

Defaults to ‘asir’.



87
88
89
# File 'lib/asir/transport/resque.rb', line 87

def namespace_
  @namespace_ ||= namespace || DEFAULT_QUEUE
end

#queues_Object

Defaults to [ ‘asir’ ].



76
77
78
79
# File 'lib/asir/transport/resque.rb', line 76

def queues_
  @queues_ ||=
    queues.empty? ? [ DEFAULT_QUEUE ] : queues.freeze
end

#redis_server_versionObject



202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/asir/transport/resque.rb', line 202

def redis_server_version
  @@redis_server_version ||=
    begin
      case v = `redis-server --version`
      when /v=([.0-9]+)/ # 3.x
        v = $1
      when / version ([.0-9]+)/ # 2.x
        v = $1
      else
        v = 'UNKNOWN'
      end
      v
    end
end

#resque_connect!Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/asir/transport/resque.rb', line 148

def resque_connect!
  @redis_config = {
    :host => host,
    :port => port,
    :thread_safe => true,
  }
  @redis =
    ::Redis.new(@redis_config)
  if namespace_
    ::Resque.redis =
      @redis =
      ::Redis::Namespace.new(namespace_, :redis => @redis)
    ::Resque.redis.namespace = namespace_
  else
    ::Resque.redis = @redis
  end
  # $stderr.puts "  *** #{$$} #{self} resque_connect! #{@redis.inspect}"
  @redis
end

#resque_disconnect!Object



168
169
170
# File 'lib/asir/transport/resque.rb', line 168

def resque_disconnect!
  ::Resque.redis = nil
end

#resque_uriObject



138
139
140
141
142
143
144
145
146
# File 'lib/asir/transport/resque.rb', line 138

def resque_uri
  @resque_uri ||=
    (
    unless scheme == 'redis'
      raise ArgumentError, "Invalid resque URI: #{uri.inspect}"
    end
    _uri
    )
end

#resque_workerObject



172
173
174
# File 'lib/asir/transport/resque.rb', line 172

def resque_worker
  @resque_worker ||= ::Resque::Worker.new(*queues_)
end

#serve_stream_message!(in_stream, out_stream) ⇒ Object

ignored



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/asir/transport/resque.rb', line 107

def serve_stream_message! in_stream, out_stream # ignored
  save = ::Thread.current[:asir_transport_resque_instance]
  ::Thread.current[:asir_transport_resque_instance] = self
  poll_throttle throttle do
    $stderr.puts "  #{$$} #{self} serve_stream_message!: resque_worker = #{resque_worker} on queues #{resque_worker.queues.inspect}" if @verbose >= 3
    if job = resque_worker.reserve
      $stderr.puts "  #{$$} #{self} serve_stream_message! job=#{job.class}:#{job.inspect}" if @verbose >= 2
      resque_worker.process(job)
    end
    job
  end
  self
ensure
  ::Thread.current[:asir_transport_resque_instance] = save
end

#server_on_start!Object



176
177
178
179
180
181
182
183
184
185
# File 'lib/asir/transport/resque.rb', line 176

def server_on_start!
  # prune_dead_workers expects processes to have "resque " in the name.
  @save_progname ||= $0.dup
  $0 = "resque #{$0}"
  if worker = resque_worker
    worker.prune_dead_workers
    worker.register_worker
  end
  self
end

#server_on_stop!Object



187
188
189
190
191
192
193
194
195
196
197
# File 'lib/asir/transport/resque.rb', line 187

def server_on_stop!
  $0 = @save_progname if @save_progname
  if worker = @resque_worker
    worker.unregister_worker
  end
  self
rescue Redis::CannotConnectError
  # This error is not actionable since server
  # is stopping.
  nil
end

#stream_eof?(stream) ⇒ Boolean

Resque is message-oriented, process only one message per “connection”.

Returns:

  • (Boolean)


98
99
100
# File 'lib/asir/transport/resque.rb', line 98

def stream_eof? stream
  false
end