Class: LogStash::Inputs::CSVFile

Inherits:
File
  • Object
show all
Defined in:
lib/logstash/inputs/csvfile.rb

Overview

Subclass of logstash-input-file that parses CSV lines, with support for first-line schemas. Set first_line_defines_columns => true to enable this behavior. Statically defined columns are also supported, a la logstash-filter-csv, via the columns param. first_line_defines_columns => true takes precedence, though.

Since multiple files may be being read by the same plugin instance, and each can have a distinct schema, this plugin records a schema per source file (as defined by the event’s path attribute) in a hash. When it receives an event for a file it doesn’t know it reads/parses that file’s first line to obtain the schema. This method supports resuming processing after logstash restarts in mid-file.

I considered extending logstash-filter-csv for to do this, but felt that the only reliable way to support streaming csv read was to explicitly read it from the file’s schema row (and cache it so subsequent row performance for that file is good.) Since we cannot count on a logstash filter having read-access to the file, or even processing events that originate from files I rejected this approach. By definition, a file input plugin must have read-access to the file it’s sourcing data from.

This plugin borrows most of its csv parsing logic from logstash-filter-csv.

This plugin extends logstash-input-file by overriding its decorate method. Note that logstash-input-plugin 0.0.10, released with Logstash 1.5, doesn’t set the event’s path element before calling decorate (which this plugin requires), so gemspec insists on logstash-input-file 1.1.0

Instance Method Summary collapse

Instance Method Details

#addSchemaToCache(path, schema) ⇒ Object



187
188
189
190
# File 'lib/logstash/inputs/csvfile.rb', line 187

def addSchemaToCache(path, schema)
  @fileColumns[path] = schema
  touchSchema(path)
end

#decorate(event) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/logstash/inputs/csvfile.rb', line 94

def decorate(event)
  super(event)

  message = event["message"]
  return if !message
  
  begin
    values = CSV.parse_line(message, :col_sep => @separator, :quote_char => @quote_char)
    return if values.length == 0 

    # Get names for the columns.

    if @first_line_defines_columns
      @logger.debug? && @logger.debug("handling csv in first_line_defines_columns mode", :message => message, :columns => @columns)
      cols = getSchemaForFile(event, values)
    else
      @logger.debug? && @logger.debug("handling csv in explicitly defined columns mode", :message => message, :columns => @columns)
      cols = @columns 
    end

    # Determine where to write the new attributes

    if @target.nil?
      # Default is to write to the root of the event.

      dest = event
    else
      dest = event[@target] ||= {}
    end

    # Add the per-column attributes (as long as this isn't the event from the schema defining row)

    if !event["_csvmetadata"] 
      values.each_index do |i|
      field_name = cols[i] || "column#{i+1}"
      dest[field_name] = values[i]
    end
  end

  rescue => e
    event.tag "_csvparsefailure"
    @logger.warn("Trouble parsing csv", :message => message, :exception => e)
    return
  end # begin

end

#getCachedSchemaForFile(path) ⇒ Object



183
184
185
# File 'lib/logstash/inputs/csvfile.rb', line 183

def getCachedSchemaForFile(path)
  @fileColumns[path]
end

#getSchemaForFile(event, parsedValues) ⇒ Object

decorate()



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/logstash/inputs/csvfile.rb', line 136

def getSchemaForFile(event, parsedValues)
  path = event["path"]
  if !path
    @logger.warn("No path in event.  Cannot retrieve a schema for this event.")
    return []
  end

  @logger.debug? && @logger.debug("Getting schema for file", :path => path)

  schema = getCachedSchemaForFile(path)
  if schema   
    @logger.debug? && @logger.debug("Using cached schema", :cols => schema)
    event["_schemacachetelemetry"]="cachedEntryUsed" if @add_schema_cache_telemetry_to_event  
    touchSchema(path)
    return schema
  end

  @logger.debug? && @logger.debug("Event from unknown file/schema.  Reading schema from that file.", :path => path)

  scrubSchemaCache(event) if @max_cached_schema_age_hours > 0

  csvFileLine = readSchemaLineFromFile(path)
  if !csvFileLine || csvFileLine.length == 0 
    @logger.warn("No suitable schema row found in file.", :path => path)
    return []
  end

  schema = CSV.parse_line(csvFileLine, :col_sep => @separator, :quote_char => @quote_char)
  addSchemaToCache(path, schema)
  @logger.debug? && @logger.debug("Schema read from file:", :path => path, :cols => schema)
  
  if @add_schema_cache_telemetry_to_event 
    event["_schemacachetelemetry"]="newEntryCreated"
    event["_cache_touch_time"]=Time.now
  end
   
  # Special handling for the schema row event: tag _csvmetadata and don't return individual column attributes

  if @fileColumns[path].join == parsedValues.join
    @logger.debug? && @logger.debug("Received the schema row event.  Tagging w/ _csvmetadata", :message => message)
    event["_csvmetadata"] = true        
    return []
  else 
    return schema
  end
  
end

#readSchemaLineFromFile(path) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/logstash/inputs/csvfile.rb', line 196

def readSchemaLineFromFile(path)
  csvFileLine = ""
  File.open(path, "r") do |f| 
    while csvFileLine.length == 0 and csvFileLine = f.gets 
      if @schema_pattern_to_match
        if !csvFileLine.end_with?("\n") or !csvFileLine.match(@schema_pattern_to_match) 
          csvFileLine = ""
        end
      end
    end
  end
  csvFileLine
end

#registerObject



86
87
88
89
90
91
92
# File 'lib/logstash/inputs/csvfile.rb', line 86

def register
  @fileColumns = Hash.new
  @schemaTouchedTimes = Hash.new
  super()
  
  @logger.warn("schema cache scrubbing disabled.  Memory use will grow over time.") if @max_cached_schema_age_hours <= 0
end

#scrubSchemaCache(event) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/logstash/inputs/csvfile.rb', line 210

def scrubSchemaCache(event)
  @logger.debug? && @logger.debug("Scrubbing schema cache", :size => @fileColumns.length)
  event["_schemacachetelemetryscrubbedbeforecount"]=@fileColumns.length if @add_schema_cache_telemetry_to_event 
  
  expiringFiles = []
  now = Time.now
  @schemaTouchedTimes.each do |filename, lastReadTime|
    if (lastReadTime + (@max_cached_schema_age_hours * 60 * 60)) < now
      expiringFiles << filename 
      @logger.debug? && @logger.debug("Expiring schema for: ", :file => filename, :lastRead => lastReadTime)
    end
  end
  
  expiringFiles.each do |filename|
    @fileColumns.delete(filename)
    @schemaTouchedTimes.delete(filename)
    @logger.debug? && @logger.debug("Deleted schema for: ", :file => filename)
  end

  event["_schemacachetelemetryscrubbedaftercount"]=@fileColumns.length if @add_schema_cache_telemetry_to_event 
  @logger.debug? && @logger.debug("Done scrubbing schema cache", :size => @fileColumns.length)
  
end

#touchSchema(path) ⇒ Object



192
193
194
# File 'lib/logstash/inputs/csvfile.rb', line 192

def touchSchema(path)
  @schemaTouchedTimes[path] = Time.now
end