Class: Fluent::Plugin::HTTPOutput
- Defined in:
- lib/fluent/plugin/out_http.rb
Defined Under Namespace
Classes: ConnectionCache, RetryableResponse
Constant Summary
Constants inherited from Output
Output::BUFFER_STATS_KEYS, Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_ID_PLACEHOLDER_PATTERN, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Output::UNRECOVERABLE_ERRORS
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes inherited from Output
#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #secondary, #timekey_zone
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
- #close ⇒ Object
- #configure(conf) ⇒ Object
- #connection_cache_id_for_thread ⇒ Object
- #connection_cache_id_for_thread=(id) ⇒ Object
- #connection_cache_id_thread_key ⇒ Object
- #format(tag, time, record) ⇒ Object
- #format_json_array(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
-
#initialize ⇒ HTTPOutput
constructor
A new instance of HTTPOutput.
- #multi_workers_ready? ⇒ Boolean
- #write(chunk) ⇒ Object
Methods inherited from Output
#acts_as_secondary, #actual_flush_thread_count, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #commit_write, #emit_buffered, #emit_count, #emit_events, #emit_records, #emit_size, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #formatted_to_msgpack_binary, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_limit_reached, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #keep_buffer_config_compat, #log_retry_error, #metadata, #next_flush_time, #num_errors, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_count, #rollback_write, #shutdown, #start, #statistics, #stop, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #synchronize_in_threads, #synchronize_path, #synchronize_path_in_workers, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write_count, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
Methods included from Fluent::PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ HTTPOutput
Returns a new instance of HTTPOutput.
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/fluent/plugin/out_http.rb', line 119 def initialize super @uri = nil @proxy_uri = nil @formatter = nil @connection_cache = [] @connection_cache_id_mutex = Mutex.new @connection_cache_next_id = 0 end |
Instance Method Details
#close ⇒ Object
131 132 133 134 135 |
# File 'lib/fluent/plugin/out_http.rb', line 131 def close super @connection_cache.each {|entry| entry.conn.finish if entry.conn&.started? } end |
#configure(conf) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/fluent/plugin/out_http.rb', line 137 def configure(conf) super @connection_cache = Array.new(actual_flush_thread_count, ConnectionCache.new("", nil)) if @reuse_connections if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @retryable_response_codes = [503] end @http_opt = setup_http_option @proxy_uri = URI.parse(@proxy) if @proxy @formatter = formatter_create @content_type = setup_content_type unless @content_type if @json_array if @formatter_configs.first[:@type] != "json" raise Fluent::ConfigError, "json_array option could be used with json formatter only" end define_singleton_method(:format, method(:format_json_array)) end if @auth and @auth.method == :aws_sigv4 begin require 'aws-sigv4' require 'aws-sdk-core' rescue LoadError raise Fluent::ConfigError, "The aws-sdk-core and aws-sigv4 gems are required for aws_sigv4 auth. Run: gem install aws-sdk-core -v '~> 3.191'" end raise Fluent::ConfigError, "aws_service is required for aws_sigv4 auth" unless @auth.aws_service != nil raise Fluent::ConfigError, "aws_region is required for aws_sigv4 auth" unless @auth.aws_region != nil if @auth.aws_role_arn == nil aws_credentials = Aws::CredentialProviderChain.new.resolve else aws_credentials = Aws::AssumeRoleCredentials.new( client: Aws::STS::Client.new( region: @auth.aws_region ), role_arn: @auth.aws_role_arn, role_session_name: "fluentd" ) end @aws_signer = Aws::Sigv4::Signer.new( service: @auth.aws_service, region: @auth.aws_region, credentials_provider: aws_credentials ) end end |
#connection_cache_id_for_thread ⇒ Object
111 112 113 |
# File 'lib/fluent/plugin/out_http.rb', line 111 def connection_cache_id_for_thread Thread.current[connection_cache_id_thread_key] end |
#connection_cache_id_for_thread=(id) ⇒ Object
115 116 117 |
# File 'lib/fluent/plugin/out_http.rb', line 115 def connection_cache_id_for_thread=(id) Thread.current[connection_cache_id_thread_key] = id end |
#connection_cache_id_thread_key ⇒ Object
107 108 109 |
# File 'lib/fluent/plugin/out_http.rb', line 107 def connection_cache_id_thread_key "#{plugin_id}_connection_cache_id" end |
#format(tag, time, record) ⇒ Object
198 199 200 |
# File 'lib/fluent/plugin/out_http.rb', line 198 def format(tag, time, record) @formatter.format(tag, time, record) end |
#format_json_array(tag, time, record) ⇒ Object
202 203 204 |
# File 'lib/fluent/plugin/out_http.rb', line 202 def format_json_array(tag, time, record) @formatter.format(tag, time, record) << "," end |
#formatted_to_msgpack_binary? ⇒ Boolean
194 195 196 |
# File 'lib/fluent/plugin/out_http.rb', line 194 def formatted_to_msgpack_binary? @formatter_configs.first[:@type] == 'msgpack' end |
#multi_workers_ready? ⇒ Boolean
190 191 192 |
# File 'lib/fluent/plugin/out_http.rb', line 190 def multi_workers_ready? true end |
#write(chunk) ⇒ Object
206 207 208 209 210 211 212 213 |
# File 'lib/fluent/plugin/out_http.rb', line 206 def write(chunk) uri = parse_endpoint(chunk) req = create_request(chunk, uri) log.debug { "#{@http_method.capitalize} data to #{uri.to_s} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } send_request(uri, req) end |