Module: Pegasus::ClassMethods
- Extended by:
- Forwardable
- Included in:
- Service
- Defined in:
- lib/pegasus/service/class_methods.rb
Instance Method Summary
collapse
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(*a) ⇒ Object
49
50
51
52
53
|
# File 'lib/pegasus/service/class_methods.rb', line 49
def method_missing(*a)
super unless method_defined? "do_#{a.first}"
raise "blocks are not allowed for queued methods" if block_given?
queue_with_ticket(nil, *a)
end
|
Instance Method Details
#blpop(key, timeout = 0) ⇒ Object
14
15
16
17
|
# File 'lib/pegasus/service/class_methods.rb', line 14
def blpop key, timeout = 0
true while (x = blocking_redis.blpop(key, 10)).blank?
x
end
|
#discard_queue! ⇒ Object
55
56
57
58
59
|
# File 'lib/pegasus/service/class_methods.rb', line 55
def discard_queue!
backup_key = "#{wait_queue_key}_backup_#{Time.unix}"
renamenx wait_queue_key, backup_key
backup_key
end
|
#instance ⇒ Object
3
|
# File 'lib/pegasus/service/class_methods.rb', line 3
def instance; @instance ||= new; end
|
#next_ticket_no ⇒ Object
26
27
28
29
30
31
32
33
34
|
# File 'lib/pegasus/service/class_methods.rb', line 26
def next_ticket_no
if !@max or @current && @current >= @max
@max = incrby tickets_key, 1000
@min = @max - 1000
@current = @min
end
@current += 1
"#{ticket_prefix}#{@current}"
end
|
#playout(*svcs) ⇒ Object
8
9
10
11
12
|
# File 'lib/pegasus/service/class_methods.rb', line 8
def playout *svcs
@svcs ||= []
@svcs.concat svcs.map{ |s| s.instance }
true while @svcs.any?{ |s| s.play }
end
|
#queue_with_ticket(ticket, *a) ⇒ Object
19
20
21
22
23
24
|
# File 'lib/pegasus/service/class_methods.rb', line 19
def queue_with_ticket(ticket, *a)
a[0] = "do_#{a[0]}"
logger.debug "queueing #{a.inspect}"
rpush wait_queue_key, "#{Time.unix} #{ticket || '-'} #{serializer.dump(a)}"
trouble?
end
|
#track(*a) ⇒ Object
43
44
45
46
47
|
# File 'lib/pegasus/service/class_methods.rb', line 43
def track *a
ticket_no = next_ticket_no
queue_with_ticket(ticket_no, *a)
ticket_no
end
|
#trouble? ⇒ Boolean
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/pegasus/service/class_methods.rb', line 61
def trouble?
return false unless next_task = lindex(wait_queue_key, 0)
seconds_behind = Time.unix - next_task.split(' ', 2)[0].to_i
return false unless seconds_behind > 5
logger.warn "#{wait_queue_key} is running #{seconds_behind}s behind"
return false unless seconds_behind > 20
return false unless current_task = get(processing_key)
t, etc = current_task.split(' ', 2)
seconds = Time.unix - t.to_i
return false unless seconds > 20
logger.warn "has been #{etc ? "running task #{etc.inspect}" : "idle"} for #{seconds}s"
return true
end
|
#wait(ticket_no) ⇒ Object
36
37
38
39
40
41
|
# File 'lib/pegasus/service/class_methods.rb', line 36
def wait ticket_no
@instance.play if @instance
logger.debug "waiting on ticket #{ticket_no}"
blpop ticket_no, 0
del ticket_no
end
|