Class: CI::Queue::Redis::Base::HeartbeatProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/ci/queue/redis/base.rb

Instance Method Summary collapse

Constructor Details

#initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key) ⇒ HeartbeatProcess

Returns a new instance of HeartbeatProcess.



244
245
246
247
248
249
250
# File 'lib/ci/queue/redis/base.rb', line 244

def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key)
  @redis_url = redis_url
  @zset_key = zset_key
  @processed_key = processed_key
  @owners_key = owners_key
  @worker_queue_key = worker_queue_key
end

Instance Method Details

#boot!Object



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/ci/queue/redis/base.rb', line 252

def boot!
  child_read, @pipe = IO.pipe
  ready_pipe, child_write = IO.pipe
  @pipe.binmode
  @pid = Process.spawn(
    RbConfig.ruby,
    ::File.join(__dir__, "monitor.rb"),
    @redis_url,
    @zset_key,
    @processed_key,
    @owners_key,
    @worker_queue_key,
    in: child_read,
    out: child_write,
  )
  child_read.close
  child_write.close

  # Check the process is alive.
  if ready_pipe.wait_readable(10)
    ready_pipe.gets
    ready_pipe.close
    Process.kill(0, @pid)
  else
    Process.kill(0, @pid)
    Process.wait(@pid)
    raise "Monitor child wasn't ready after 10 seconds"
  end
  @pipe
end

#shutdown!Object



283
284
285
286
287
288
289
290
291
# File 'lib/ci/queue/redis/base.rb', line 283

def shutdown!
  @pipe.close
  begin
    _, status = Process.waitpid2(@pid)
    status
  rescue Errno::ECHILD
    nil
  end
end

#tick!(id) ⇒ Object



293
294
295
# File 'lib/ci/queue/redis/base.rb', line 293

def tick!(id)
  send_message(:tick!, id: id)
end