Module: LogStash::PluginMixins::RabbitMQConnection
- Defined in:
- lib/logstash/plugin_mixins/rabbitmq_connection.rb
Defined Under Namespace
Classes: HareInfo
Constant Summary collapse
- EXCHANGE_TYPES =
["fanout", "direct", "topic", "x-consistent-hash", "x-modulus-hash"]
Class Method Summary collapse
Instance Method Summary collapse
- #addresses_from_hosts_and_port(hosts, port) ⇒ Object
- #channel_open? ⇒ Boolean
- #close_connection ⇒ Object
- #conn_str ⇒ Object
- #connect! ⇒ Object
- #connected? ⇒ Boolean
- #connection_open? ⇒ Boolean
-
#format_address(host, port) ⇒ Object
Adds port to the value of host if it is not already supplied.
-
#format_ipv6(host, port) ⇒ Object
Formats an IPv6 to include the port, if it is not already given, conforming to the format used for literal IPv6 addresses in URIs, eg [2406:da00:ff00::6b15:edbc]:80.
- #rabbitmq_settings ⇒ Object
- #setup_rabbitmq_connection_config ⇒ Object
Class Method Details
.included(base) ⇒ Object
16 17 18 19 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 16 def self.included(base) base.extend(self) base.setup_rabbitmq_connection_config end |
Instance Method Details
#addresses_from_hosts_and_port(hosts, port) ⇒ Object
154 155 156 157 158 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 154 def addresses_from_hosts_and_port(hosts, port) # Expand host to include port, unless port is already present # (Allowing hosts with port was a previously undocumented feature) hosts.map{|host|format_address(host, port)} end |
#channel_open? ⇒ Boolean
193 194 195 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 193 def channel_open? @hare_info && @hare_info.channel && @hare_info.channel.open? end |
#close_connection ⇒ Object
88 89 90 91 92 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 88 def close_connection @rabbitmq_connection_stopping = true @hare_info.channel.close if channel_open? @hare_info.connection.close if connection_open? end |
#conn_str ⇒ Object
84 85 86 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 84 def conn_str "amqp://#{@user}@#{@host}:#{@port}#{@vhost}" end |
#connect! ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 161 def connect! @hare_info = connect() unless @hare_info # Don't duplicate the conn! rescue MarchHare::Exception, java.io.IOException => e = if e..empty? && e.is_a?(java.io.IOException) # IOException with an empty message is probably an instance of # these problems: # https://github.com/logstash-plugins/logstash-output-rabbitmq/issues/52 # https://github.com/rabbitmq/rabbitmq-java-client/issues/100 # # Best guess is to help the user understand that there is probably # some kind of configuration problem causing the error, but we # can't really offer any more detailed hints : "An unknown error occurred. RabbitMQ gave no hints as to the cause. Maybe this is a configuration error (invalid vhost, for example). I recommend checking the RabbitMQ server logs for clues about this failure." else e. end if @logger.debug? @logger.error("RabbitMQ connection error, will retry.", :error_message => , :exception => e.class.name, :backtrace => e.backtrace) else @logger.error("RabbitMQ connection error, will retry.", :error_message => , :exception => e.class.name) end sleep_for_retry retry end |
#connected? ⇒ Boolean
201 202 203 204 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 201 def connected? return nil unless @hare_info && @hare_info.connection @hare_info.connection.connected? end |
#connection_open? ⇒ Boolean
197 198 199 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 197 def connection_open? @hare_info && @hare_info.connection && @hare_info.connection.open? end |
#format_address(host, port) ⇒ Object
Adds port to the value of host if it is not already supplied
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 127 def format_address(host, port) case host.count(':') when 0 "#{host}:#{port}" when 1 host else format_ipv6(host, port) end end |
#format_ipv6(host, port) ⇒ Object
Formats an IPv6 to include the port, if it is not already given, conforming to the format used for literal IPv6 addresses in URIs, eg [2406:da00:ff00::6b15:edbc]:80
141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 141 def format_ipv6(host, port) last_bracket = host.rindex(']') if last_bracket if last_bracket < host.rindex(':') host else "#{host}:#{port}" end else "[#{host}]:#{port}" end end |
#rabbitmq_settings ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 94 def rabbitmq_settings return @rabbitmq_settings if @rabbitmq_settings s = { :vhost => @vhost, :addresses => addresses_from_hosts_and_port(@host, @port), :user => @user, :automatic_recovery => @automatic_recovery, :pass => @password ? @password.value : "guest", } s[:timeout] = @connection_timeout || 0 s[:heartbeat] = @heartbeat || 0 if @ssl s[:tls] = @ssl_version cert_path = @ssl_certificate_path cert_pass = @ssl_certificate_password.value if @ssl_certificate_password if !!cert_path ^ !!cert_pass raise LogStash::ConfigurationError, "RabbitMQ requires both ssl_certificate_path AND ssl_certificate_password to be set!" end s[:tls_certificate_path] = cert_path s[:tls_certificate_password] = cert_pass end @rabbitmq_settings = s end |
#setup_rabbitmq_connection_config ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/logstash/plugin_mixins/rabbitmq_connection.rb', line 21 def setup_rabbitmq_connection_config # RabbitMQ server address(es) # host can either be a single host, or a list of hosts # i.e. # host => "localhost" # or # host => ["host01", "host02] # # if multiple hosts are provided on the initial connection and any subsequent # recovery attempts of the hosts is chosen at random and connected to. # Note that only one host connection is active at a time. config :host, :validate => :string, :required => true , :list => true # RabbitMQ port to connect on config :port, :validate => :number, :default => 5672 # RabbitMQ username config :user, :validate => :string, :default => "guest" # RabbitMQ password config :password, :validate => :password, :default => "guest" # The vhost (virtual host) to use. If you don't know what this # is, leave the default. With the exception of the default # vhost ("/"), names of vhosts should not begin with a forward # slash. config :vhost, :validate => :string, :default => "/" # Enable or disable SSL. # Note that by default remote certificate verification is off. # Specify ssl_certificate_path and ssl_certificate_password if you need # certificate verification config :ssl, :validate => :boolean # Version of the SSL protocol to use. config :ssl_version, :validate => :string, :default => "TLSv1.2" # Path to an SSL certificate in PKCS12 (.p12) format used for verifying the remote host config :ssl_certificate_path, :validate => :path # Password for the encrypted PKCS12 (.p12) certificate file specified in ssl_certificate_path config :ssl_certificate_password, :validate => :password # Set this to automatically recover from a broken connection. You almost certainly don't want to override this!!! config :automatic_recovery, :validate => :boolean, :default => true # Time in seconds to wait before retrying a connection config :connect_retry_interval, :validate => :number, :default => 1 # The default connection timeout in milliseconds. If not specified the timeout is infinite. config :connection_timeout, :validate => :number # Heartbeat delay in seconds. If unspecified no heartbeats will be sent config :heartbeat, :validate => :number # Passive queue creation? Useful for checking queue existance without modifying server state config :passive, :validate => :boolean, :default => false # Extra queue arguments as an array. # To make a RabbitMQ queue mirrored, use: `{"x-ha-policy" => "all"}` config :arguments, :validate => :array, :default => {} end |