Module: Pegasus::ClassMethods

Extended by:
Forwardable
Included in:
Service
Defined in:
lib/pegasus/service/class_methods.rb

Instance Method Summary collapse

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*a) ⇒ Object



49
50
51
52
53
# File 'lib/pegasus/service/class_methods.rb', line 49

def method_missing(*a)
  super unless method_defined? "do_#{a.first}"
  raise "blocks are not allowed for queued methods" if block_given?
  queue_with_ticket(nil, *a)
end

Instance Method Details

#blpop(key, timeout = 0) ⇒ Object



14
15
16
17
# File 'lib/pegasus/service/class_methods.rb', line 14

def blpop key, timeout = 0
  true while (x = blocking_redis.blpop(key, 10)).blank?
  x
end

#discard_queue!Object



55
56
57
58
59
# File 'lib/pegasus/service/class_methods.rb', line 55

def discard_queue!
  backup_key = "#{wait_queue_key}_backup_#{Time.unix}"
  renamenx wait_queue_key, backup_key
  backup_key
end

#instanceObject



3
# File 'lib/pegasus/service/class_methods.rb', line 3

def instance; @instance ||= new; end

#next_ticket_noObject



26
27
28
29
30
31
32
33
34
# File 'lib/pegasus/service/class_methods.rb', line 26

def next_ticket_no
  if !@max or @current && @current >= @max
    @max = incrby tickets_key, 1000
    @min = @max - 1000
    @current = @min
  end
  @current += 1
  "#{ticket_prefix}#{@current}"
end

#playout(*svcs) ⇒ Object



8
9
10
11
12
# File 'lib/pegasus/service/class_methods.rb', line 8

def playout *svcs
  @svcs ||= []
  @svcs.concat svcs.map{ |s| s.instance }
  true while @svcs.any?{ |s| s.play }
end

#queue_with_ticket(ticket, *a) ⇒ Object



19
20
21
22
23
24
# File 'lib/pegasus/service/class_methods.rb', line 19

def queue_with_ticket(ticket, *a)
  a[0] = "do_#{a[0]}"
  logger.debug "queueing #{a.inspect}"
  rpush wait_queue_key, "#{Time.unix} #{ticket || '-'} #{serializer.dump(a)}"
  trouble?
end

#track(*a) ⇒ Object



43
44
45
46
47
# File 'lib/pegasus/service/class_methods.rb', line 43

def track *a
  ticket_no = next_ticket_no
  queue_with_ticket(ticket_no, *a)
  ticket_no
end

#trouble?Boolean

Returns:

  • (Boolean)


61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/pegasus/service/class_methods.rb', line 61

def trouble?
  return false unless next_task = lindex(wait_queue_key, 0)
  seconds_behind = Time.unix - next_task.split(' ', 2)[0].to_i
  return false unless seconds_behind > 5
  logger.warn "#{wait_queue_key} is running #{seconds_behind}s behind"
  return false unless seconds_behind > 20
  return false unless current_task = get(processing_key)
  t, etc = current_task.split(' ', 2)
  seconds = Time.unix - t.to_i
  return false unless seconds > 20
  logger.warn "has been #{etc ? "running task #{etc.inspect}" : "idle"} for #{seconds}s"
  return true
end

#wait(ticket_no) ⇒ Object



36
37
38
39
40
41
# File 'lib/pegasus/service/class_methods.rb', line 36

def wait ticket_no
  @instance.play if @instance
  logger.debug "waiting on ticket #{ticket_no}"
  blpop ticket_no, 0
  del ticket_no
end