Class: Apollo::Cluster
- Inherits:
-
Object
- Object
- Apollo::Cluster
- Defined in:
- lib/apollo.rb
Instance Method Summary collapse
-
#address(host) ⇒ String
Gets the specified host’s proper addressing.
-
#check_queue_length(host, queue, opts = {}) ⇒ Integer
Connects to the rabbitmq admin port on the specified host and gets the number of messages waiting in the specified queue.
-
#create_rmq_listener(host, exchange, key) ⇒ Apollo::Rabbitmq::Listener
create_rmq_listener creates an exclusive queue with a randomized name bound to the specified exchange with the specified routing key.
-
#get_host(host) ⇒ Hash?
Gets a host.
-
#initialize(opts = {}) ⇒ Cluster
constructor
Creates a new cluster.
-
#run(on, command = '/bin/true', opts = {}) ⇒ String
Runs the specified command on the specified host.
-
#wait_for_queue_drain(host, queue, opts = {}) ⇒ void
Connects to the rabbitmq admin port on the specified host and waits until the specified queue has no messags.
Constructor Details
#initialize(opts = {}) ⇒ Cluster
Creates a new cluster
15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/apollo.rb', line 15 def initialize(opts = {}) filename = opts.fetch(:filename, "#{Dir.pwd}/inventory.yml") inventory = YAML.load_file(filename) begin hosts = inventory.fetch('hosts') raise 'host list empty' if hosts.nil? @hosts = process_host_list hosts rescue NoMethodError raise 'host key not defined in inventory file' end end |
Instance Method Details
#address(host) ⇒ String
Gets the specified host’s proper addressing
142 143 144 145 146 147 148 |
# File 'lib/apollo.rb', line 142 def address(host) unless host['ip'].nil? host['ip'] else host['hostname'] end end |
#check_queue_length(host, queue, opts = {}) ⇒ Integer
Connects to the rabbitmq admin port on the specified host and gets the number of messages waiting in the specified queue
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/apollo.rb', line 73 def check_queue_length(host, queue, opts={}) host = @hosts[host] vhost = opts.fetch(:vhost, '/') username = CGI.escape host.fetch('rmq_admin_username', 'guest') password = CGI.escape host.fetch('rmq_admin_password', 'guest') port = host.fetch('rmq_port', 15672) manager = RabbitMQManager.new "http://#{username}:#{password}@#{address host}:#{port}" manager.queue(vhost, queue)['messages'] end |
#create_rmq_listener(host, exchange, key) ⇒ Apollo::Rabbitmq::Listener
create_rmq_listener creates an exclusive queue with a randomized name bound to the specified exchange with the specified routing key
90 91 92 93 94 |
# File 'lib/apollo.rb', line 90 def create_rmq_listener(host, exchange, key) sym_hash = Hash.new @hosts[host].each { |k, v| sym_hash[k.to_sym] = v} Apollo::Rabbitmq::Listener.new(exchange, key, sym_hash) end |
#get_host(host) ⇒ Hash?
Gets a host
32 33 34 |
# File 'lib/apollo.rb', line 32 def get_host(host) @hosts[host] end |
#run(on, command = '/bin/true', opts = {}) ⇒ String
Runs the specified command on the specified host
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/apollo.rb', line 107 def run(on, command = '/bin/true', opts= {}) host = @hosts[on] raise "#{on} doesn't exist in the inventory" if host.nil? opts[:forward_agent] = opts.fetch(:forward_agent, true) output = "" rc = 0 Net::SSH.start(address(host), host['user'], opts) do |ssh| chan = ssh.open_channel do |ch| ch.exec command do |ch, success| raise "#{command} didn't complete successfully" if not success and not opts.fetch(:allow_unsuccessful, false) end ch.on_data do |c, data| output += data end ch.on_extended_data do |c, type, data| output += data end ch.on_request "exit-status" do |ch, data| rc = data.read_long end end chan.wait raise "#{command} didn't complete successfully" unless rc == 0 return output end end |
#wait_for_queue_drain(host, queue, opts = {}) ⇒ void
This method returns an undefined value.
Connects to the rabbitmq admin port on the specified host and waits until the specified queue has no messags
49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/apollo.rb', line 49 def wait_for_queue_drain(host, queue, opts = {}) raise "host #{host} not configured in the inventory" if @hosts[host].nil? sleep_duration = opts.fetch(:sleep_duration, 1) timeout = opts.fetch(:timeout, nil) start = Time.now while check_queue_length(host, queue, opts) != 0 if not timeout.nil? and (Time.now.to_f - start.to_f) > timeout raise 'wait_for_queue_drain exceeded timeout' end sleep sleep_duration end end |