Method: Fluent::Plugin::Output#handle_stream_with_custom_format
- Defined in:
- lib/fluent/plugin/output.rb
#handle_stream_with_custom_format(tag, es, enqueue: false) ⇒ Object
metadata_and_data is a Hash of:
(standard format) metadata => event stream
(custom format) metadata => array of formatted event
For standard format, formatting should be done for whole event stream, but
"whole event stream" may be a split of "es" here when it's bigger than chunk_limit_size.
`@buffer.write` will do this splitting.
For custom format, formatting will be done here. Custom formatting always requires
iteration of event stream, and it should be done just once even if total event stream size
is bigger than chunk_limit_size because of performance.
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 |
# File 'lib/fluent/plugin/output.rb', line 1038 def handle_stream_with_custom_format(tag, es, enqueue: false) = {} records = 0 es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| = (tag, time, record) [] ||= [] res = format(tag, time, record) if res [] << res records += 1 end end write_guard do @buffer.write(, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end |