Class: LogStash::Outputs::SCACSV

Inherits:
File
  • Object
show all
Defined in:
lib/logstash/outputs/scacsv.rb

Overview

SCACSV - based upon original Logstash CSV output.

Write events to disk in CSV format Write a PI header as the first line in the file Name file per PI convention, based upon first and last timestamps encountered

Instance Method Summary collapse

Instance Method Details

#receive(event) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/logstash/outputs/scacsv.rb', line 92

def receive(event)
  return unless output?(event)

  @logger.debug("in SCACSV receive")

  if (event['SCAWindowMarker']) 
    # just eat the marker - don't output it
    # if we had at least one record output, then close the file and move on 
    if @recordCount >= 1
      closeAndRenameCurrentFile
    end
  else
    @formattedPath = event.sprintf(@path)
    fd = open(@formattedPath)
    @logger.debug("SCACSVreceive - after opening fd=" + fd.to_s)

    if @recordCount == 0
      # output header on first line - note, need a minimum of one record for sensible output
      if @header then 
#         csv_header = @fields.map { |name| name }
        fd.write(@header.to_csv(@csv_options))
      else     
        fd.write(@fields.to_csv(@csv_options))
      end
    end

    csv_values = @fields.map {|name| get_value(name, event)}
    fd.write(csv_values.to_csv(@csv_options))

    flush(fd)
    close_stale_files

    # remember state
    @recordCount = @recordCount + 1
    @lastOutputTime = Time.now

    # capture the earliest - assumption is that records are in order
    if (@recordCount) == 1 
      @startTime = event[@time_field]
    end

    # for every record, update endTime - again, assumption is that records are in order
    @endTime = event[@time_field]

    if ((@max_size > 0) and (@recordCount >= max_size))
      # Have enough records, close it out
      closeAndRenameCurrentFile
    end

  end  

end

#registerObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/logstash/outputs/scacsv.rb', line 57

def register
  super
  @csv_options = Hash[@csv_options.map{|(k,v)|[k.to_sym, v]}]
  
  # variables to hold the start and end times which we'll use to rename the files to
  @startTime   = "missingStartTime"
  @endTime     = "missingEndTime"
  @recordCount = 0

  @lastOutputTime = 0
  @flushInterval = @flush_interval.to_i

  @timerThread = Thread.new { flushWatchdog(@flush_interval) }

end