Method: Fluent::Plugin::Output#handle_stream_with_custom_format

Defined in:
lib/fluent/plugin/output.rb

#handle_stream_with_custom_format(tag, es, enqueue: false) ⇒ Object

metadata_and_data is a Hash of:

(standard format) metadata => event stream
(custom format)   metadata => array of formatted event

For standard format, formatting should be done for whole event stream, but

"whole event stream" may be a split of "es" here when it's bigger than chunk_limit_size.
`@buffer.write` will do this splitting.

For custom format, formatting will be done here. Custom formatting always requires

iteration of event stream, and it should be done just once even if total event stream size
is bigger than chunk_limit_size because of performance.


1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
# File 'lib/fluent/plugin/output.rb', line 1038

def handle_stream_with_custom_format(tag, es, enqueue: false)
  meta_and_data = {}
  records = 0
  es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
    meta = (tag, time, record)
    meta_and_data[meta] ||= []
    res = format(tag, time, record)
    if res
      meta_and_data[meta] << res
      records += 1
    end
  end
  write_guard do
    @buffer.write(meta_and_data, enqueue: enqueue)
  end
  @emit_records_metrics.add(es.size)
  @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
  true
end