Class: Skynet::MessageQueueAdapter::TupleSpace
Constant Summary
collapse
- USE_FALLBACK_TASKS =
true
- @@ts =
nil
- @@curhostidx =
0
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
-
#clear_outstanding_tasks ⇒ Object
-
#get_worker_version ⇒ Object
-
#initialize(options = {}) ⇒ TupleSpace
constructor
A new instance of TupleSpace.
-
#list_results ⇒ Object
-
#list_tasks(iteration = nil, queue_id = 0) ⇒ Object
-
#set_worker_version(ver = nil) ⇒ Object
-
#stats ⇒ Object
-
#take_next_task(curver, timeout = nil, payload_type = nil, queue_id = 0) ⇒ Object
-
#take_result(job_id, timeout = nil) ⇒ Object
-
#version_active?(curver = nil, queue_id = 0) ⇒ Boolean
-
#write_error(message, error = '', timeout = nil) ⇒ Object
-
#write_message(message, timeout = nil) ⇒ Object
-
#write_result(message, result = [], timeout = nil) ⇒ Object
#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn
#read_all_worker_statuses, #take_worker_status, #write_worker_status
Constructor Details
#initialize(options = {}) ⇒ TupleSpace
Returns a new instance of TupleSpace.
41
42
43
44
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 41
def initialize(options={})
@start_options = options
@ts = self.class.get_tuple_space(options)
end
|
Instance Attribute Details
#start_options ⇒ Object
Returns the value of attribute start_options.
39
40
41
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 39
def start_options
@start_options
end
|
Class Method Details
.adapter ⇒ Object
35
36
37
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 35
def self.adapter
:tuplespace
end
|
.debug_class_desc ⇒ Object
31
32
33
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 31
def self.debug_class_desc
"TUPLESPACE"
end
|
Instance Method Details
#clear_outstanding_tasks ⇒ Object
#get_worker_version ⇒ Object
#list_tasks(iteration = nil, queue_id = 0) ⇒ Object
#set_worker_version(ver = nil) ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 108
def set_worker_version(ver=nil)
begin
messages = read_all(Skynet::WorkerVersionMessage.template).collect {|ret| Skynet::WorkerVersionMessage.new(ret)}
curver = 0
messages.each do |message|
curver = message.version
debug "CURRENT WORKER VERSION #{curver}"
curvmessage = Skynet::WorkerVersionMessage.new(take(message.template,0.00001))
if curvmessage
curver = curvmessage.version
else
curver=0
end
end
rescue Skynet::RequestExpiredError => e
curver = 0
end
newver = ver ? ver : curver + 1
debug "WRITING CURRENT WORKER REV #{newver}"
write(Skynet::WorkerVersionMessage.new(:version=>newver))
newver
end
|
#stats ⇒ Object
153
154
155
156
157
158
159
160
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 153
def stats
t1 = Time.now
tasks = list_tasks
results = list_results
t2 = Time.now - t1
p_tasks = tasks.partition {|task| task[9] == 0}
{:taken_tasks => p_tasks[1].size, :untaken_tasks => p_tasks[0].size, :results => list_results.size, :time => t2.to_f}
end
|
#take_next_task(curver, timeout = nil, payload_type = nil, queue_id = 0) ⇒ Object
46
47
48
49
50
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 46
def take_next_task(curver,timeout=nil,payload_type=nil,queue_id=0)
message = Skynet::Message.new(take(Skynet::Message.next_task_template(curver,payload_type, queue_id),timeout))
write_fallback_task(message)
message
end
|
#take_result(job_id, timeout = nil) ⇒ Object
#version_active?(curver = nil, queue_id = 0) ⇒ Boolean
83
84
85
86
87
88
89
90
91
92
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 83
def version_active?(curver=nil, queue_id= 0)
return true unless curver
begin
message_row = read(Skynet::Message.next_task_template(curver, nil, queue_id),0.00001)
true
rescue Skynet::RequestExpiredError
return true if curver.to_i == get_worker_version.to_i
false
end
end
|
#write_error(message, error = '', timeout = nil) ⇒ Object
69
70
71
72
73
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 69
def write_error(message,error='',timeout=nil)
timeout ||= message.expiry
write(message.error_message(error),timeout)
take_fallback_message(message)
end
|
#write_message(message, timeout = nil) ⇒ Object
52
53
54
55
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 52
def write_message(message,timeout=nil)
timeout ||= message.expiry
write(message,timeout)
end
|
#write_result(message, result = [], timeout = nil) ⇒ Object
57
58
59
60
61
62
63
|
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 57
def write_result(message,result=[],timeout=nil)
result_message = message.result_message(result).to_a
timeout ||= result_message.expiry
write(result_message,timeout)
take_fallback_message(message)
result_message
end
|