Module: LogStash::PluginMixins::RabbitMQConnection

Defined in:
lib/logstash/plugin_mixins/rabbitmq_connection.rb

Defined Under Namespace

Classes: HareInfo

Constant Summary collapse

EXCHANGE_TYPES =
["fanout", "direct", "topic", "x-consistent-hash", "x-modulus-hash"]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



16
17
18
19
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 16

def self.included(base)
  base.extend(self)
  base.setup_rabbitmq_connection_config
end

Instance Method Details

#addresses_from_hosts_and_port(hosts, port) ⇒ Object



154
155
156
157
158
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 154

def addresses_from_hosts_and_port(hosts, port)
  # Expand host to include port, unless port is already present
  # (Allowing hosts with port was a previously undocumented feature)
  hosts.map{|host|format_address(host, port)}
end

#channel_open?Boolean

Returns:

  • (Boolean)


193
194
195
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 193

def channel_open?
  @hare_info && @hare_info.channel && @hare_info.channel.open?
end

#close_connectionObject



88
89
90
91
92
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 88

def close_connection
  @rabbitmq_connection_stopping = true
  @hare_info.channel.close if channel_open?
  @hare_info.connection.close if connection_open?
end

#conn_strObject



84
85
86
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 84

def conn_str
  "amqp://#{@user}@#{@host}:#{@port}#{@vhost}"
end

#connect!Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 161

def connect!
  @hare_info = connect() unless @hare_info # Don't duplicate the conn!
rescue MarchHare::Exception, java.io.IOException => e
  error_message = if e.message.empty? && e.is_a?(java.io.IOException)
    # IOException with an empty message is probably an instance of
    # these problems:
    # https://github.com/logstash-plugins/logstash-output-rabbitmq/issues/52
    # https://github.com/rabbitmq/rabbitmq-java-client/issues/100
    #
    # Best guess is to help the user understand that there is probably
    # some kind of configuration problem causing the error, but we
    # can't really offer any more detailed hints :     "An unknown error occurred. RabbitMQ gave no hints as to the cause. Maybe this is a configuration error (invalid vhost, for example). I recommend checking the RabbitMQ server logs for clues about this failure."
  else
    e.message
  end

  if @logger.debug?
    @logger.error("RabbitMQ connection error, will retry.",
                  :error_message => error_message,
                  :exception => e.class.name,
                  :backtrace => e.backtrace)
  else
    @logger.error("RabbitMQ connection error, will retry.",
                  :error_message => error_message,
                  :exception => e.class.name)
  end

  sleep_for_retry
  retry
end

#connected?Boolean

Returns:

  • (Boolean)


201
202
203
204
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 201

def connected?
  return nil unless @hare_info && @hare_info.connection
  @hare_info.connection.connected?
end

#connection_open?Boolean

Returns:

  • (Boolean)


197
198
199
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 197

def connection_open?
  @hare_info && @hare_info.connection && @hare_info.connection.open?
end

#format_address(host, port) ⇒ Object

Adds port to the value of host if it is not already supplied



127
128
129
130
131
132
133
134
135
136
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 127

def format_address(host, port)
  case host.count(':')
  when 0
    "#{host}:#{port}"
  when 1
    host
  else
    format_ipv6(host, port)
  end
end

#format_ipv6(host, port) ⇒ Object

Formats an IPv6 to include the port, if it is not already given, conforming to the format used for literal IPv6 addresses in URIs, eg [2406:da00:ff00::6b15:edbc]:80



141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 141

def format_ipv6(host, port)
  last_bracket = host.rindex(']')
  if last_bracket
    if last_bracket < host.rindex(':')
      host
    else
      "#{host}:#{port}"
    end
  else
    "[#{host}]:#{port}"
  end
end

#rabbitmq_settingsObject



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 94

def rabbitmq_settings
  return @rabbitmq_settings if @rabbitmq_settings

  s = {
    :vhost => @vhost,
    :addresses => addresses_from_hosts_and_port(@host, @port),
    :user  => @user,
    :automatic_recovery => @automatic_recovery,
    :pass => @password ? @password.value : "guest",
  }

  s[:timeout] = @connection_timeout || 0
  s[:heartbeat] = @heartbeat || 0

  if @ssl
    s[:tls] = @ssl_version

    cert_path = @ssl_certificate_path
    cert_pass = @ssl_certificate_password.value if @ssl_certificate_password

    if !!cert_path ^ !!cert_pass
      raise LogStash::ConfigurationError, "RabbitMQ requires both ssl_certificate_path AND ssl_certificate_password to be set!"
    end

    s[:tls_certificate_path] = cert_path
    s[:tls_certificate_password] = cert_pass
  end

  @rabbitmq_settings = s
end

#setup_rabbitmq_connection_configObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 21

def setup_rabbitmq_connection_config
  # RabbitMQ server address(es)
  # host can either be a single host, or a list of hosts
  # i.e.
  #   host => "localhost"
  # or
  #   host => ["host01", "host02]
  #
  # if multiple hosts are provided on the initial connection and any subsequent
  # recovery attempts of the hosts is chosen at random and connected to.
  # Note that only one host connection is active at a time.
  config :host, :validate => :string, :required => true , :list => true

  # RabbitMQ port to connect on
  config :port, :validate => :number, :default => 5672

  # RabbitMQ username
  config :user, :validate => :string, :default => "guest"

  # RabbitMQ password
  config :password, :validate => :password, :default => "guest"

  # The vhost (virtual host) to use. If you don't know what this
  # is, leave the default. With the exception of the default
  # vhost ("/"), names of vhosts should not begin with a forward
  # slash.
  config :vhost, :validate => :string, :default => "/"

  # Enable or disable SSL.
  # Note that by default remote certificate verification is off.
  # Specify ssl_certificate_path and ssl_certificate_password if you need
  # certificate verification
  config :ssl, :validate => :boolean

  # Version of the SSL protocol to use.
  config :ssl_version, :validate => :string, :default => "TLSv1.2"

  # Path to an SSL certificate in PKCS12 (.p12) format used for verifying the remote host
  config :ssl_certificate_path, :validate => :path

  # Password for the encrypted PKCS12 (.p12) certificate file specified in ssl_certificate_path
  config :ssl_certificate_password, :validate => :password

  # Set this to automatically recover from a broken connection. You almost certainly don't want to override this!!!
  config :automatic_recovery, :validate => :boolean, :default => true

  # Time in seconds to wait before retrying a connection
  config :connect_retry_interval, :validate => :number, :default => 1

  # The default connection timeout in milliseconds. If not specified the timeout is infinite.
  config :connection_timeout, :validate => :number

  # Heartbeat delay in seconds. If unspecified no heartbeats will be sent
  config :heartbeat, :validate => :number

  # Passive queue creation? Useful for checking queue existance without modifying server state
  config :passive, :validate => :boolean, :default => false

  # Extra queue arguments as an array.
  # To make a RabbitMQ queue mirrored, use: `{"x-ha-policy" => "all"}`
  config :arguments, :validate => :array, :default => {}
end