Module: OneApm::Collector::CollectorService::HttpConnection
- Included in:
- OneApm::Collector::CollectorService
- Defined in:
- lib/one_apm/collector/collector/http_connection.rb
Constant Summary collapse
- OA_CONNECTION_ERRORS =
These include Errno connection errors, and all indicate that the underlying TCP connection may be in a bad state.
[Timeout::Error, EOFError, SystemCallError, SocketError].freeze
- OA_API_VERSION =
'1.0'.freeze
Instance Method Summary collapse
-
#cert_file_path ⇒ Object
The path to the certificate file used to verify the SSL connection if verify_peer is enabled.
-
#check_post_size(post_string) ⇒ Object
Raises an UnrecoverableServerException if the post_string is longer than the limit configured in the probe object.
- #close_shared_connection ⇒ Object
-
#compress_request(data) ⇒ Object
Compress request data.
- #create_and_start_http_connection ⇒ Object
-
#create_http_connection ⇒ Object
Return the Net::HTTP with proxy configuration given the OneApm::Support::Server object.
-
#decompress_response(response) ⇒ Object
Decompresses the response from the server, if it is gzip encoded, otherwise returns it verbatim.
- #establish_shared_connection ⇒ Object
- #http_connection ⇒ Object
- #invoke_remote(method, payload = [], options = {}, query_params = {}) ⇒ Object
-
#remote_method_uri(method, format, query_params = {}) ⇒ Object
The path on the server that we should post our data to.
-
#send_request(opts) ⇒ Object
Posts to the specified server.
- #session(&block) ⇒ Object
- #session_with_keepalive(&block) ⇒ Object
- #session_without_keepalive(&block) ⇒ Object
- #setup_connection_for_ssl(conn) ⇒ Object
- #setup_connection_timeouts(conn) ⇒ Object
- #ssl_cert_store ⇒ Object
- #start_connection(conn) ⇒ Object
Instance Method Details
#cert_file_path ⇒ Object
The path to the certificate file used to verify the SSL connection if verify_peer is enabled
215 216 217 218 219 220 221 222 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 215 def cert_file_path if path_override = OneApm::Manager.config[:ca_bundle_path] OneApm::Manager.logger.warn("Couldn't find CA bundle from configured ca_bundle_path: #{path_override}") unless File.exists? path_override path_override else File.(File.join(probe.oneapm_root, 'config', 'cert', 'cacert.pem')) end end |
#check_post_size(post_string) ⇒ Object
Raises an UnrecoverableServerException if the post_string is longer than the limit configured in the probe object
244 245 246 247 248 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 244 def check_post_size(post_string) return if post_string.size < OneApm::Manager.config[:post_size_limit] OneApm::Manager.logger.debug "Tried to send too much data: #{post_string.size} bytes" raise UnrecoverableServerException.new('413 Request Entity Too Large') end |
#close_shared_connection ⇒ Object
146 147 148 149 150 151 152 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 146 def close_shared_connection if @shared_tcp_connection OneApm::Manager.logger.debug("Closing shared TCP connection to #{@shared_tcp_connection.address}:#{@shared_tcp_connection.port}") @shared_tcp_connection.finish if @shared_tcp_connection.started? @shared_tcp_connection = nil end end |
#compress_request(data) ⇒ Object
Compress request data
225 226 227 228 229 230 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 225 def compress_request(data) encoding = 'deflate' data = OneApm::Support::Encoders::Compressed.encode(data) check_post_size(data) [data, encoding] end |
#create_and_start_http_connection ⇒ Object
163 164 165 166 167 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 163 def create_and_start_http_connection conn = create_http_connection start_connection(conn) conn end |
#create_http_connection ⇒ Object
Return the Net::HTTP with proxy configuration given the OneApm::Support::Server object.
176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 176 def create_http_connection # Proxy returns regular HTTP if @proxy_host is nil (the default) http_class = Net::HTTP::Proxy(proxy_server.name, proxy_server.port, proxy_server.user, proxy_server.password) conn = http_class.new((@collector.ip || @collector.name), @collector.port) setup_connection_for_ssl(conn) if OneApm::Manager.config[:ssl] setup_connection_timeouts(conn) OneApm::Manager.logger.debug("Created net/http handle to #{conn.address}:#{conn.port}") conn end |
#decompress_response(response) ⇒ Object
Decompresses the response from the server, if it is gzip encoded, otherwise returns it verbatim
234 235 236 237 238 239 240 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 234 def decompress_response(response) if response['content-encoding'] == 'gzip' Zlib::GzipReader.new(StringIO.new(response.body)).read else response.body end end |
#establish_shared_connection ⇒ Object
139 140 141 142 143 144 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 139 def establish_shared_connection unless @shared_tcp_connection @shared_tcp_connection = create_and_start_http_connection end @shared_tcp_connection end |
#http_connection ⇒ Object
131 132 133 134 135 136 137 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 131 def http_connection if @in_session establish_shared_connection else create_http_connection end end |
#invoke_remote(method, payload = [], options = {}, query_params = {}) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 11 def invoke_remote(method, payload = [], = {},query_params={}) start_ts = Time.now data = nil begin data = @marshaller.dump(payload, ) rescue StandardError, SystemStackError => e handle_serialization_error(method, e) end serialize_finish_ts = Time.now data, encoding = compress_request(data) size = data.size uri = remote_method_uri(method, @marshaller.format,query_params) full_uri = "#{@collector}#{uri}" @audit_logger.log_request(full_uri, payload, @marshaller) response = send_request(:data => data, :uri => uri, :encoding => encoding, :collector => @collector) response = @marshaller.load(decompress_response(response)) @audit_logger.log_response(response) response ensure record_timing_supportability_metrics(method, start_ts, serialize_finish_ts) if size record_size_supportability_metrics(method, size, [:item_count]) end end |
#remote_method_uri(method, format, query_params = {}) ⇒ Object
The path on the server that we should post our data to
44 45 46 47 48 49 50 51 52 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 44 def remote_method_uri(method, format, query_params={}) params = {'run_id' => @agent_id, 'marshal_format' => format}.merge(query_params) uri = "/tpm/agent.do?PROTOCOL_VERSION=#{OA_PROTOCOL_VERSION}&license_key=#{@license_key}&method=#{method}" uri << '&' + params.map do |k,v| next unless v "#{k}=#{v}" end.compact.join('&') uri end |
#send_request(opts) ⇒ Object
Posts to the specified server
Options:
- :uri => the path to request on the server (a misnomer of
course)
- :encoding => the encoding to pass to the server
- :collector => a URI object that responds to the 'name' method
and returns the name of the collector to
contact
- :data => the data to send as the body of the request
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 64 def send_request(opts) request = Net::HTTP::Post.new(opts[:uri], 'CONTENT-ENCODING' => opts[:encoding], 'HOST' => opts[:collector].name, 'API-VERSION' => OA_API_VERSION) request['user-agent'] = user_agent request['_SKIP_CAT_'] = true request.body = opts[:data] #see http://jira.oneapm.me/browse/AI-3227 #old content_type is "multipart/form-data; boundary=-oneapm-#{request.object_id}-" request.content_type = 'application/json' response = nil attempts = 0 max_attempts = 2 begin attempts += 1 conn = http_connection OneApm::Manager.logger.debug "Sending request to #{opts[:collector]}#{opts[:uri]}" OneApm::TimerLib.timeout(@request_timeout) do response = conn.request(request) end rescue *OA_CONNECTION_ERRORS => e close_shared_connection if attempts < max_attempts OneApm::Manager.logger.debug("Retrying request to #{opts[:collector]}#{opts[:uri]} after #{e}") retry else raise ServerConnectionException, "Recoverable error talking to #{@collector} after #{attempts} attempts: #{e}" end end log_and_return_response response end |
#session(&block) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 98 def session(&block) raise ArgumentError, "#{self.class}#shared_connection must be passed a block" unless block_given? begin t0 = Time.now @in_session = true if OneApm::Manager.config[:aggressive_keepalive] session_with_keepalive(&block) else session_without_keepalive(&block) end rescue *OA_CONNECTION_ERRORS => e elapsed = Time.now - t0 raise OneApm::ServerConnectionException, "Recoverable error connecting to #{@collector} after #{elapsed} seconds: #{e}" ensure @in_session = false end end |
#session_with_keepalive(&block) ⇒ Object
117 118 119 120 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 117 def session_with_keepalive(&block) establish_shared_connection block.call end |
#session_without_keepalive(&block) ⇒ Object
122 123 124 125 126 127 128 129 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 122 def session_without_keepalive(&block) begin establish_shared_connection block.call ensure close_shared_connection end end |
#setup_connection_for_ssl(conn) ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 189 def setup_connection_for_ssl(conn) # Jruby 1.6.8 requires a gem for full ssl support and will throw # an error when use_ssl=(true) is called and jruby-openssl isn't # installed conn.use_ssl = true conn.verify_mode = OpenSSL::SSL::VERIFY_PEER conn.cert_store = ssl_cert_store rescue StandardError, LoadError msg = "Agent is configured to use SSL, but SSL is not available in the environment. " msg << "Either disable SSL in the agent configuration, or install SSL support." raise UnrecoverableAgentException.new(msg) end |
#setup_connection_timeouts(conn) ⇒ Object
154 155 156 157 158 159 160 161 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 154 def setup_connection_timeouts(conn) # We use Timeout explicitly instead of this conn.read_timeout = nil if conn.respond_to?(:keep_alive_timeout) && OneApm::Manager.config[:aggressive_keepalive] conn.keep_alive_timeout = OneApm::Manager.config[:keep_alive_timeout] end end |
#ssl_cert_store ⇒ Object
202 203 204 205 206 207 208 209 210 211 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 202 def ssl_cert_store path = cert_file_path if !@ssl_cert_store || path != @cached_cert_store_path OneApm::Manager.logger.debug("Creating SSL certificate store from file at #{path}") @ssl_cert_store = OpenSSL::X509::Store.new @ssl_cert_store.add_file(path) @cached_cert_store_path = path end @ssl_cert_store end |
#start_connection(conn) ⇒ Object
169 170 171 172 173 |
# File 'lib/one_apm/collector/collector/http_connection.rb', line 169 def start_connection(conn) OneApm::Manager.logger.debug("Opening TCP connection to #{conn.address}:#{conn.port}") OneApm::TimerLib.timeout(@request_timeout) { conn.start } conn end |