Class: Skynet::MessageQueue

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
SkynetDebugger
Defined in:
lib/skynet/skynet_message_queue.rb

Overview

This class is the interface to the Skynet Message Queue.

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SkynetDebugger

#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn

Constructor Details

#initialize(message_queue_proxy_class = Skynet::CONFIG[:MESSAGE_QUEUE_ADAPTER]) ⇒ MessageQueue

Returns a new instance of MessageQueue.



45
46
47
# File 'lib/skynet/skynet_message_queue.rb', line 45

def initialize(message_queue_proxy_class=Skynet::CONFIG[:MESSAGE_QUEUE_ADAPTER])
  mq
end

Class Method Details

.adapterObject



23
24
25
# File 'lib/skynet/skynet_message_queue.rb', line 23

def self.adapter
  adapter_class.constantize.adapter
end

.adapter_classObject



27
28
29
# File 'lib/skynet/skynet_message_queue.rb', line 27

def self.adapter_class
  Skynet::CONFIG[:MESSAGE_QUEUE_ADAPTER]
end

.message_queue_proxy_classObject



49
50
51
# File 'lib/skynet/skynet_message_queue.rb', line 49

def self.message_queue_proxy_class
  adapter_class.constantize
end

.start_or_connect(adapter_class = self.adapter_class) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/skynet/skynet_message_queue.rb', line 31

def self.start_or_connect(adapter_class = self.adapter_class)
  begin
    mq = new
  rescue Skynet::ConnectionError
    if self.adapter == :tuplespace
      pid = fork do
        exec("skynet_tuplespace_server start")
      end
      sleep 5
    end
    new
  end
end

Instance Method Details

#ansi_clearObject



104
105
106
# File 'lib/skynet/skynet_message_queue.rb', line 104

def ansi_clear
  puts "\033[2J\033[H"
end

#get_worker_versionObject

Retrieves the current worker version



63
64
65
# File 'lib/skynet/skynet_message_queue.rb', line 63

def get_worker_version
  mq.get_worker_version
end

#increment_worker_versionObject

Increments the current worker version (causing workers to restart)



73
74
75
76
77
# File 'lib/skynet/skynet_message_queue.rb', line 73

def increment_worker_version
  newver = self.get_worker_version + 1
  self.set_worker_version(newver)
  newver
end

#listObject



100
101
102
# File 'lib/skynet/skynet_message_queue.rb', line 100

def list
  list_tasks + list_results
end

#message_fieldsObject



92
93
94
# File 'lib/skynet/skynet_message_queue.rb', line 92

def message_fields
  Skynet::Message.fields
end

#message_queue_proxy_classObject



53
54
55
# File 'lib/skynet/skynet_message_queue.rb', line 53

def message_queue_proxy_class
  @message_queue_proxy_class ||= self.class.message_queue_proxy_class
end

#mqObject



79
80
81
82
83
84
85
# File 'lib/skynet/skynet_message_queue.rb', line 79

def mq
  @mq ||= message_queue_proxy_class.new(
    :use_ringserver   => Skynet::CONFIG[:TS_USE_RINGSERVER],
    :ringserver_hosts => Skynet::CONFIG[:TS_SERVER_HOSTS],
    :drburi           => Skynet::CONFIG[:TS_DRBURI]
  )
end


96
97
98
# File 'lib/skynet/skynet_message_queue.rb', line 96

def print_stats
  "TAKEN TASKS: #{list_tasks(1).size}, UNTAKEN_TASKS: #{list_tasks(0).size} RESULTS: #{list_results.size}"
end

#set_worker_version(version) ⇒ Object

Sets the current worker version (causing workers to restart)



68
69
70
# File 'lib/skynet/skynet_message_queue.rb', line 68

def set_worker_version(version)
  mq.set_worker_version(version)
end

#version_active?(version, queue_id = 0) ⇒ Boolean

Is this version still active in the queue?

Returns:

  • (Boolean)


58
59
60
# File 'lib/skynet/skynet_message_queue.rb', line 58

def version_active?(version,queue_id=0)
  mq.version_active?(version,queue_id)
end