Module: OneApm::Collector::CollectorService::HttpConnection
- Included in:
- OneApm::Collector::CollectorService
- Defined in:
- lib/one_apm/collector/collector/http_connection.rb
Constant Summary collapse
- OA_CONNECTION_ERRORS =
These include Errno connection errors, and all indicate that the underlying TCP connection may be in a bad state.
[Timeout::Error, EOFError, SystemCallError, SocketError].freeze
Instance Method Summary collapse
-
#cert_file_path ⇒ Object
The path to the certificate file used to verify the SSL connection if verify_peer is enabled.
-
#check_post_size(post_string) ⇒ Object
Raises an UnrecoverableServerException if the post_string is longer than the limit configured in the probe object.
- #close_shared_connection ⇒ Object
-
#compress_request(data) ⇒ Object
Compress request data.
- #create_and_start_http_connection ⇒ Object
-
#create_http_connection ⇒ Object
Return the Net::HTTP with proxy configuration given the OneApm::Support::Server object.
-
#decompress_response(response) ⇒ Object
Decompresses the response from the server, if it is gzip encoded, otherwise returns it verbatim.
- #establish_shared_connection ⇒ Object
- #http_connection ⇒ Object
- #invoke_remote(method, payload = [], options = {}) ⇒ Object
-
#remote_method_uri(method, format) ⇒ Object
The path on the server that we should post our data to.
-
#send_request(opts) ⇒ Object
Posts to the specified server.
- #session(&block) ⇒ Object
- #session_with_keepalive(&block) ⇒ Object
- #session_without_keepalive(&block) ⇒ Object
- #setup_connection_for_ssl(conn) ⇒ Object
- #setup_connection_timeouts(conn) ⇒ Object
- #ssl_cert_store ⇒ Object
- #start_connection(conn) ⇒ Object
Instance Method Details
#cert_file_path ⇒ Object
The path to the certificate file used to verify the SSL connection if verify_peer is enabled
207 208 209 210 211 212 213 214 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 207 def cert_file_path if path_override = OneApm::Manager.config[:ca_bundle_path] OneApm::Manager.logger.warn("Couldn't find CA bundle from configured ca_bundle_path: #{path_override}") unless File.exists? path_override path_override else File.(File.join(probe.oneapm_root, 'config', 'cert', 'cacert.pem')) end end |
#check_post_size(post_string) ⇒ Object
Raises an UnrecoverableServerException if the post_string is longer than the limit configured in the probe object
236 237 238 239 240 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 236 def check_post_size(post_string) return if post_string.size < OneApm::Manager.config[:post_size_limit] OneApm::Manager.logger.debug "Tried to send too much data: #{post_string.size} bytes" raise UnrecoverableServerException.new('413 Request Entity Too Large') end |
#close_shared_connection ⇒ Object
138 139 140 141 142 143 144 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 138 def close_shared_connection if @shared_tcp_connection OneApm::Manager.logger.debug("Closing shared TCP connection to #{@shared_tcp_connection.address}:#{@shared_tcp_connection.port}") @shared_tcp_connection.finish if @shared_tcp_connection.started? @shared_tcp_connection = nil end end |
#compress_request(data) ⇒ Object
Compress request data
217 218 219 220 221 222 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 217 def compress_request(data) encoding = 'deflate' data = OneApm::Support::Encoders::Compressed.encode(data) check_post_size(data) [data, encoding] end |
#create_and_start_http_connection ⇒ Object
155 156 157 158 159 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 155 def create_and_start_http_connection conn = create_http_connection start_connection(conn) conn end |
#create_http_connection ⇒ Object
Return the Net::HTTP with proxy configuration given the OneApm::Support::Server object.
168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 168 def create_http_connection # Proxy returns regular HTTP if @proxy_host is nil (the default) http_class = Net::HTTP::Proxy(proxy_server.name, proxy_server.port, proxy_server.user, proxy_server.password) conn = http_class.new((@collector.ip || @collector.name), @collector.port) setup_connection_for_ssl(conn) if OneApm::Manager.config[:ssl] setup_connection_timeouts(conn) OneApm::Manager.logger.debug("Created net/http handle to #{conn.address}:#{conn.port}") conn end |
#decompress_response(response) ⇒ Object
Decompresses the response from the server, if it is gzip encoded, otherwise returns it verbatim
226 227 228 229 230 231 232 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 226 def decompress_response(response) if response['content-encoding'] == 'gzip' Zlib::GzipReader.new(StringIO.new(response.body)).read else response.body end end |
#establish_shared_connection ⇒ Object
131 132 133 134 135 136 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 131 def establish_shared_connection unless @shared_tcp_connection @shared_tcp_connection = create_and_start_http_connection end @shared_tcp_connection end |
#http_connection ⇒ Object
123 124 125 126 127 128 129 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 123 def http_connection if @in_session establish_shared_connection else create_http_connection end end |
#invoke_remote(method, payload = [], options = {}) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 10 def invoke_remote(method, payload = [], = {}) start_ts = Time.now data = nil begin data = @marshaller.dump(payload, ) rescue StandardError, SystemStackError => e handle_serialization_error(method, e) end serialize_finish_ts = Time.now data, encoding = compress_request(data) size = data.size uri = remote_method_uri(method, @marshaller.format) full_uri = "#{@collector}#{uri}" @audit_logger.log_request(full_uri, payload, @marshaller) response = send_request(:data => data, :uri => uri, :encoding => encoding, :collector => @collector) @marshaller.load(decompress_response(response)) ensure record_timing_supportability_metrics(method, start_ts, serialize_finish_ts) if size record_size_supportability_metrics(method, size, [:item_count]) end end |
#remote_method_uri(method, format) ⇒ Object
The path on the server that we should post our data to
41 42 43 44 45 46 47 48 49 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 41 def remote_method_uri(method, format) params = {'run_id' => @agent_id, 'marshal_format' => format} uri = "/tpm/agent.do?PROTOCOL_VERSION=#{OA_PROTOCOL_VERSION}&license_key=#{@license_key}&method=#{method}" uri << '&' + params.map do |k,v| next unless v "#{k}=#{v}" end.compact.join('&') uri end |
#send_request(opts) ⇒ Object
Posts to the specified server
Options:
- :uri => the path to request on the server (a misnomer of
course)
- :encoding => the encoding to pass to the server
- :collector => a URI object that responds to the 'name' method
and returns the name of the collector to
contact
- :data => the data to send as the body of the request
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 61 def send_request(opts) request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name) request['user-agent'] = user_agent request.body = opts[:data] request.content_type = "multipart/form-data; boundary=-oneapm-#{request.object_id}-" response = nil attempts = 0 max_attempts = 2 begin attempts += 1 conn = http_connection OneApm::Manager.logger.debug "Sending request to #{opts[:collector]}#{opts[:uri]}" OneApm::TimerLib.timeout(@request_timeout) do response = conn.request(request) end rescue *OA_CONNECTION_ERRORS => e close_shared_connection if attempts < max_attempts OneApm::Manager.logger.debug("Retrying request to #{opts[:collector]}#{opts[:uri]} after #{e}") retry else raise ServerConnectionException, "Recoverable error talking to #{@collector} after #{attempts} attempts: #{e}" end end log_and_return_response response end |
#session(&block) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 90 def session(&block) raise ArgumentError, "#{self.class}#shared_connection must be passed a block" unless block_given? begin t0 = Time.now @in_session = true if OneApm::Manager.config[:aggressive_keepalive] session_with_keepalive(&block) else session_without_keepalive(&block) end rescue *OA_CONNECTION_ERRORS => e elapsed = Time.now - t0 raise OneApm::ServerConnectionException, "Recoverable error connecting to #{@collector} after #{elapsed} seconds: #{e}" ensure @in_session = false end end |
#session_with_keepalive(&block) ⇒ Object
109 110 111 112 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 109 def session_with_keepalive(&block) establish_shared_connection block.call end |
#session_without_keepalive(&block) ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 114 def session_without_keepalive(&block) begin establish_shared_connection block.call ensure close_shared_connection end end |
#setup_connection_for_ssl(conn) ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 181 def setup_connection_for_ssl(conn) # Jruby 1.6.8 requires a gem for full ssl support and will throw # an error when use_ssl=(true) is called and jruby-openssl isn't # installed conn.use_ssl = true conn.verify_mode = OpenSSL::SSL::VERIFY_PEER conn.cert_store = ssl_cert_store rescue StandardError, LoadError msg = "Agent is configured to use SSL, but SSL is not available in the environment. " msg << "Either disable SSL in the agent configuration, or install SSL support." raise UnrecoverableAgentException.new(msg) end |
#setup_connection_timeouts(conn) ⇒ Object
146 147 148 149 150 151 152 153 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 146 def setup_connection_timeouts(conn) # We use Timeout explicitly instead of this conn.read_timeout = nil if conn.respond_to?(:keep_alive_timeout) && OneApm::Manager.config[:aggressive_keepalive] conn.keep_alive_timeout = OneApm::Manager.config[:keep_alive_timeout] end end |
#ssl_cert_store ⇒ Object
194 195 196 197 198 199 200 201 202 203 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 194 def ssl_cert_store path = cert_file_path if !@ssl_cert_store || path != @cached_cert_store_path OneApm::Manager.logger.debug("Creating SSL certificate store from file at #{path}") @ssl_cert_store = OpenSSL::X509::Store.new @ssl_cert_store.add_file(path) @cached_cert_store_path = path end @ssl_cert_store end |
#start_connection(conn) ⇒ Object
161 162 163 164 165 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 161 def start_connection(conn) OneApm::Manager.logger.debug("Opening TCP connection to #{conn.address}:#{conn.port}") OneApm::TimerLib.timeout(@request_timeout) { conn.start } conn end |