Class: Apollo::Cluster

Inherits:
Object
  • Object
show all
Defined in:
lib/apollo.rb

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Cluster

Creates a new cluster

Parameters:

  • opts (Hash) (defaults to: {})

    .

Options Hash (opts):

  • :filename (String)

    The complete path to the inventory file (“#Dir.pwd/inventory.yml”)


15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/apollo.rb', line 15

def initialize(opts = {})
  filename = opts.fetch(:filename, "#{Dir.pwd}/inventory.yml")
  inventory = YAML.load_file(filename)

  begin
    hosts = inventory.fetch('hosts')
    raise 'host list empty' if hosts.nil?
    @hosts = process_host_list hosts
  rescue NoMethodError
    raise 'host key not defined in inventory file'
  end
end

Instance Method Details

#address(host) ⇒ String

Gets the specified host’s proper addressing

Parameters:

  • host (Hash)

    The host that we want to address

Returns:

  • (String)

    The address to connect to the host on


142
143
144
145
146
147
148
# File 'lib/apollo.rb', line 142

def address(host)
  unless host['ip'].nil?
    host['ip']
  else
    host['hostname']
  end
end

#check_queue_length(host, queue, opts = {}) ⇒ Integer

Connects to the rabbitmq admin port on the specified host and gets the number of messages waiting in the specified queue

Parameters:

  • host (Symbol)

    The host of the rabbitmq server

  • queue (String)

    The queue to check

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :vhost (String)

    The vhost (‘/’) that the queue is in

Returns:

  • (Integer)

    The number of messages waiting in the specified queue


73
74
75
76
77
78
79
80
81
82
# File 'lib/apollo.rb', line 73

def check_queue_length(host, queue, opts={})
  host = @hosts[host]
  vhost = opts.fetch(:vhost, '/')
  username = CGI.escape host.fetch('rmq_admin_username', 'guest')
  password = CGI.escape host.fetch('rmq_admin_password', 'guest')
  port = host.fetch('rmq_port', 15672)

  manager = RabbitMQManager.new "http://#{username}:#{password}@#{address host}:#{port}"
  manager.queue(vhost, queue)['messages']
end

#create_rmq_listener(host, exchange, key) ⇒ Apollo::Rabbitmq::Listener

create_rmq_listener creates an exclusive queue with a randomized name bound to the specified exchange with the specified routing key

Parameters:

  • host (Symbol)

    the host that the queue is on

  • exchange (String)

    the exchange to bind the queue to

  • key (String)

    The routing key to use to bind the queue to the specified exchange

Returns:


90
91
92
93
94
# File 'lib/apollo.rb', line 90

def create_rmq_listener(host, exchange, key)
  sym_hash = Hash.new
  @hosts[host].each { |k, v| sym_hash[k.to_sym] = v}
  Apollo::Rabbitmq::Listener.new(exchange, key, sym_hash)
end

#get_host(host) ⇒ Hash?

Gets a host

Parameters:

  • host (Symbol)

    The host to get

Returns:

  • (Hash, nil)

    Either the host or nil if the host doesn’t exist


32
33
34
# File 'lib/apollo.rb', line 32

def get_host(host)
  @hosts[host]
end

#run(on, command = '/bin/true', opts = {}) ⇒ String

Runs the specified command on the specified host

Parameters:

Options Hash (opts):

  • :forward_agent (bool) — default: true

    whether to forward the current user’s agent

  • :allow_unsuccessful (bool) — default: false

    whether a non-zero exit call raises an exception or not

Returns:

  • (String)

107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/apollo.rb', line 107

def run(on, command = '/bin/true', opts= {})
  host = @hosts[on]
  raise "#{on} doesn't exist in the inventory" if host.nil?
  opts[:forward_agent] = opts.fetch(:forward_agent, true)

  output = ""
  rc = 0
  Net::SSH.start(address(host), host['user'], opts) do |ssh|
    chan = ssh.open_channel do |ch|
      ch.exec command do |ch, success|
        raise "#{command} didn't complete successfully" if not success and not opts.fetch(:allow_unsuccessful, false)
      end

      ch.on_data do |c, data|
        output += data
      end

      ch.on_extended_data do |c, type, data|
        output += data
      end

      ch.on_request "exit-status" do |ch, data|
        rc = data.read_long
      end
    end

    chan.wait
    raise "#{command} didn't complete successfully" unless rc == 0
    return output
  end
end

#wait_for_queue_drain(host, queue, opts = {}) ⇒ void

This method returns an undefined value.

Connects to the rabbitmq admin port on the specified host and waits until the specified queue has no messags

Parameters:

  • host (Symbol)

    The host that the queue is on

  • queue (String)

    The name of the queue to wait on

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :sleep_duration (Float)

    The number of seconds (1) to wait between checking the queue

  • :timeout (Float, nil)

    The number of seconds (nil) to wait before throwing an exception

  • :vhost (String)

    The vhost (‘/’) that the queue is in

Raises:

  • (RuntimeError)

    when the host is not in the inventory or the timeout has been exceeded


49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/apollo.rb', line 49

def wait_for_queue_drain(host, queue, opts = {})
  raise "host #{host} not configured in the inventory" if @hosts[host].nil?

  sleep_duration = opts.fetch(:sleep_duration, 1)
  timeout = opts.fetch(:timeout, nil)

  start = Time.now
  while check_queue_length(host, queue, opts) != 0
    if not timeout.nil? and (Time.now.to_f - start.to_f) > timeout
      raise 'wait_for_queue_drain exceeded timeout'
    end
    sleep sleep_duration
  end
end