Class: Skynet::MessageQueue
- Inherits:
-
Object
- Object
- Skynet::MessageQueue
show all
- Extended by:
- Forwardable
- Includes:
- SkynetDebugger
- Defined in:
- lib/skynet/skynet_message_queue.rb
Overview
This class is the interface to the Skynet Message Queue.
Class Method Summary
collapse
Instance Method Summary
collapse
#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn
Constructor Details
#initialize(message_queue_proxy_class = Skynet::CONFIG[:MESSAGE_QUEUE_ADAPTER]) ⇒ MessageQueue
Returns a new instance of MessageQueue.
45
46
47
|
# File 'lib/skynet/skynet_message_queue.rb', line 45
def initialize(message_queue_proxy_class=Skynet::CONFIG[:MESSAGE_QUEUE_ADAPTER])
mq
end
|
Class Method Details
.adapter ⇒ Object
23
24
25
|
# File 'lib/skynet/skynet_message_queue.rb', line 23
def self.adapter
adapter_class.constantize.adapter
end
|
.adapter_class ⇒ Object
27
28
29
|
# File 'lib/skynet/skynet_message_queue.rb', line 27
def self.adapter_class
Skynet::CONFIG[:MESSAGE_QUEUE_ADAPTER]
end
|
.message_queue_proxy_class ⇒ Object
49
50
51
|
# File 'lib/skynet/skynet_message_queue.rb', line 49
def self.message_queue_proxy_class
adapter_class.constantize
end
|
.start_or_connect(adapter_class = self.adapter_class) ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/skynet/skynet_message_queue.rb', line 31
def self.start_or_connect(adapter_class = self.adapter_class)
begin
mq = new
rescue Skynet::ConnectionError
if self.adapter == :tuplespace
pid = fork do
exec("skynet_tuplespace_server start")
end
sleep 5
end
new
end
end
|
Instance Method Details
#ansi_clear ⇒ Object
104
105
106
|
# File 'lib/skynet/skynet_message_queue.rb', line 104
def ansi_clear
puts "\033[2J\033[H"
end
|
#get_worker_version ⇒ Object
Retrieves the current worker version
63
64
65
|
# File 'lib/skynet/skynet_message_queue.rb', line 63
def get_worker_version
mq.get_worker_version
end
|
#increment_worker_version ⇒ Object
Increments the current worker version (causing workers to restart)
73
74
75
76
77
|
# File 'lib/skynet/skynet_message_queue.rb', line 73
def increment_worker_version
newver = self.get_worker_version + 1
self.set_worker_version(newver)
newver
end
|
#list ⇒ Object
100
101
102
|
# File 'lib/skynet/skynet_message_queue.rb', line 100
def list
list_tasks + list_results
end
|
#message_fields ⇒ Object
92
93
94
|
# File 'lib/skynet/skynet_message_queue.rb', line 92
def message_fields
Skynet::Message.fields
end
|
#message_queue_proxy_class ⇒ Object
53
54
55
|
# File 'lib/skynet/skynet_message_queue.rb', line 53
def message_queue_proxy_class
@message_queue_proxy_class ||= self.class.message_queue_proxy_class
end
|
#mq ⇒ Object
79
80
81
82
83
84
85
|
# File 'lib/skynet/skynet_message_queue.rb', line 79
def mq
@mq ||= message_queue_proxy_class.new(
:use_ringserver => Skynet::CONFIG[:TS_USE_RINGSERVER],
:ringserver_hosts => Skynet::CONFIG[:TS_SERVER_HOSTS],
:drburi => Skynet::CONFIG[:TS_DRBURI]
)
end
|
#print_stats ⇒ Object
96
97
98
|
# File 'lib/skynet/skynet_message_queue.rb', line 96
def print_stats
"TAKEN TASKS: #{list_tasks(1).size}, UNTAKEN_TASKS: #{list_tasks(0).size} RESULTS: #{list_results.size}"
end
|
#set_worker_version(version) ⇒ Object
Sets the current worker version (causing workers to restart)
68
69
70
|
# File 'lib/skynet/skynet_message_queue.rb', line 68
def set_worker_version(version)
mq.set_worker_version(version)
end
|
#version_active?(version, queue_id = 0) ⇒ Boolean
Is this version still active in the queue?
58
59
60
|
# File 'lib/skynet/skynet_message_queue.rb', line 58
def version_active?(version,queue_id=0)
mq.version_active?(version,queue_id)
end
|