Method: Fluent::Plugin::Output#commit_write

Defined in:
lib/fluent/plugin/output.rb

#commit_write(chunk_id, delayed: @delayed_commit, secondary: false) ⇒ Object



1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
# File 'lib/fluent/plugin/output.rb', line 1102

def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
  log.on_trace { log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed }

  if delayed
    @dequeued_chunks_mutex.synchronize do
      @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id }
    end
  end
  @buffer.purge_chunk(chunk_id)

  @retry_mutex.synchronize do
    if @retry # success to flush chunks in retries
      if secondary
        log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id)
      else
        log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id)
      end
      @retry = nil
    end
  end
end