Class: Skynet::MessageQueueAdapter::TupleSpace

Inherits:
Skynet::MessageQueueAdapter show all
Includes:
SkynetDebugger
Defined in:
lib/skynet/message_queue_adapters/tuple_space.rb

Constant Summary collapse

USE_FALLBACK_TASKS =
true
@@ts =
nil
@@curhostidx =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SkynetDebugger

#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn

Methods inherited from Skynet::MessageQueueAdapter

#read_all_worker_statuses, #take_worker_status, #write_worker_status

Constructor Details

#initialize(options = {}) ⇒ TupleSpace

Returns a new instance of TupleSpace.



41
42
43
44
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 41

def initialize(options={})
  @start_options = options
  @ts = self.class.get_tuple_space(options)
end

Instance Attribute Details

#start_optionsObject

Returns the value of attribute start_options.



39
40
41
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 39

def start_options
  @start_options
end

Class Method Details

.adapterObject



35
36
37
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 35

def self.adapter
  :tuplespace
end

.debug_class_descObject



31
32
33
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 31

def self.debug_class_desc
  "TUPLESPACE"
end

Instance Method Details

#clear_outstanding_tasksObject



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 132

def clear_outstanding_tasks
  begin
    tasks = read_all(Skynet::Message.outstanding_tasks_template)
  rescue DRb::DRbConnError, Errno::ECONNREFUSED => e
    error "ERROR #{e.inspect}", caller
  end

  tasks.size.times do |ii|
    take(Skynet::Message.outstanding_tasks_template,0.00001)
  end

  results = read_all(Skynet::Message.outstanding_results_template)
  results.size.times do |ii|
    take(Skynet::Message.outstanding_results_template,0.00001)
  end

  task_tuples = read_all(Skynet::Message.outstanding_tasks_template)
  result_tuples = read_all(Skynet::Message.outstanding_results_template)
  return task_tuples + result_tuples
end

#get_worker_versionObject



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 94

def get_worker_version
  begin
    message = Skynet::WorkerVersionMessage.new(read(Skynet::WorkerVersionMessage.template,0.00001))
    if message
      curver = message.version
    else
      curver=0
    end
  rescue Skynet::RequestExpiredError => e
    curver = 0
  end
  curver
end

#list_resultsObject



79
80
81
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 79

def list_results
  read_all(Skynet::Message.outstanding_results_template)
end

#list_tasks(iteration = nil, queue_id = 0) ⇒ Object



75
76
77
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 75

def list_tasks(iteration=nil,queue_id=0)
  read_all(Skynet::Message.outstanding_tasks_template(iteration,queue_id))
end

#set_worker_version(ver = nil) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 108

def set_worker_version(ver=nil)
  begin
    messages = read_all(Skynet::WorkerVersionMessage.template).collect {|ret| Skynet::WorkerVersionMessage.new(ret)}
    curver = 0
    messages.each do |message|
      curver = message.version
      debug "CURRENT WORKER VERSION #{curver}"
      curvmessage = Skynet::WorkerVersionMessage.new(take(message.template,0.00001))
      if curvmessage
        curver = curvmessage.version
      else
        curver=0
      end
    end
  rescue Skynet::RequestExpiredError => e
    curver = 0
  end

  newver = ver ? ver : curver + 1
  debug "WRITING CURRENT WORKER REV #{newver}"
  write(Skynet::WorkerVersionMessage.new(:version=>newver))
  newver
end

#statsObject



153
154
155
156
157
158
159
160
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 153

def stats
  t1 = Time.now
  tasks = list_tasks
  results = list_results
  t2 = Time.now - t1
  p_tasks = tasks.partition {|task| task[9] == 0}
  {:taken_tasks => p_tasks[1].size, :untaken_tasks => p_tasks[0].size, :results => list_results.size, :time => t2.to_f}
end

#take_next_task(curver, timeout = nil, payload_type = nil, queue_id = 0) ⇒ Object



46
47
48
49
50
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 46

def take_next_task(curver,timeout=nil,payload_type=nil,queue_id=0)
  message = Skynet::Message.new(take(Skynet::Message.next_task_template(curver,payload_type, queue_id),timeout))
  write_fallback_task(message)
  message
end

#take_result(job_id, timeout = nil) ⇒ Object



65
66
67
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 65

def take_result(job_id,timeout=nil)
  Skynet::Message.new(take(Skynet::Message.result_template(job_id),timeout))
end

#version_active?(curver = nil, queue_id = 0) ⇒ Boolean

Returns:

  • (Boolean)


83
84
85
86
87
88
89
90
91
92
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 83

def version_active?(curver=nil, queue_id= 0)
  return true unless curver
  begin
    message_row = read(Skynet::Message.next_task_template(curver, nil, queue_id),0.00001)
    true
  rescue Skynet::RequestExpiredError
    return true if curver.to_i == get_worker_version.to_i
    false
  end
end

#write_error(message, error = '', timeout = nil) ⇒ Object



69
70
71
72
73
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 69

def write_error(message,error='',timeout=nil)
  timeout ||= message.expiry
  write(message.error_message(error),timeout)
  take_fallback_message(message)
end

#write_message(message, timeout = nil) ⇒ Object



52
53
54
55
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 52

def write_message(message,timeout=nil)
  timeout ||= message.expiry
  write(message,timeout)
end

#write_result(message, result = [], timeout = nil) ⇒ Object



57
58
59
60
61
62
63
# File 'lib/skynet/message_queue_adapters/tuple_space.rb', line 57

def write_result(message,result=[],timeout=nil)
  result_message = message.result_message(result).to_a
  timeout ||= result_message.expiry
  write(result_message,timeout)
  take_fallback_message(message)
  result_message
end