Class: LogStash::Inputs::CSVFile
- Inherits:
-
File
- Object
- File
- LogStash::Inputs::CSVFile
- Defined in:
- lib/logstash/inputs/csvfile.rb
Overview
Subclass of logstash-input-file that parses CSV lines, with support for first-line schemas. Set first_line_defines_columns => true to enable this behavior. Statically defined columns are also supported, a la logstash-filter-csv, via the columns param. first_line_defines_columns => true takes precedence, though.
Since multiple files may be being read by the same plugin instance, and each can have a distinct schema, this plugin records a schema per source file (as defined by the event’s path attribute) in a hash. When it receives an event for a file it doesn’t know it reads/parses that file’s first line to obtain the schema. This method supports resuming processing after logstash restarts in mid-file.
I considered extending logstash-filter-csv for to do this, but felt that the only reliable way to support streaming csv read was to explicitly read it from the file’s schema row (and cache it so subsequent row performance for that file is good.) Since we cannot count on a logstash filter having read-access to the file, or even processing events that originate from files I rejected this approach. By definition, a file input plugin must have read-access to the file it’s sourcing data from.
This plugin borrows most of its csv parsing logic from logstash-filter-csv.
This plugin extends logstash-input-file by overriding its decorate method. Note that logstash-input-plugin 0.0.10, released with Logstash 1.5, doesn’t set the event’s path element before calling decorate (which this plugin requires), so gemspec insists on logstash-input-file 1.1.0
Instance Method Summary collapse
- #addSchemaToCache(path, schema) ⇒ Object
- #decorate(event) ⇒ Object
- #getCachedSchemaForFile(path) ⇒ Object
-
#getSchemaForFile(event, parsedValues) ⇒ Object
decorate().
- #readSchemaLineFromFile(path) ⇒ Object
- #register ⇒ Object
- #scrubSchemaCache(event) ⇒ Object
- #touchSchema(path) ⇒ Object
Instance Method Details
#addSchemaToCache(path, schema) ⇒ Object
187 188 189 190 |
# File 'lib/logstash/inputs/csvfile.rb', line 187 def addSchemaToCache(path, schema) @fileColumns[path] = schema touchSchema(path) end |
#decorate(event) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/logstash/inputs/csvfile.rb', line 94 def decorate(event) super(event) = event["message"] return if ! begin values = CSV.parse_line(, :col_sep => @separator, :quote_char => @quote_char) return if values.length == 0 # Get names for the columns. if @first_line_defines_columns @logger.debug? && @logger.debug("handling csv in first_line_defines_columns mode", :message => , :columns => @columns) cols = getSchemaForFile(event, values) else @logger.debug? && @logger.debug("handling csv in explicitly defined columns mode", :message => , :columns => @columns) cols = @columns end # Determine where to write the new attributes if @target.nil? # Default is to write to the root of the event. dest = event else dest = event[@target] ||= {} end # Add the per-column attributes (as long as this isn't the event from the schema defining row) if !event["_csvmetadata"] values.each_index do |i| field_name = cols[i] || "column#{i+1}" dest[field_name] = values[i] end end rescue => e event.tag "_csvparsefailure" @logger.warn("Trouble parsing csv", :message => , :exception => e) return end # begin end |
#getCachedSchemaForFile(path) ⇒ Object
183 184 185 |
# File 'lib/logstash/inputs/csvfile.rb', line 183 def getCachedSchemaForFile(path) @fileColumns[path] end |
#getSchemaForFile(event, parsedValues) ⇒ Object
decorate()
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/logstash/inputs/csvfile.rb', line 136 def getSchemaForFile(event, parsedValues) path = event["path"] if !path @logger.warn("No path in event. Cannot retrieve a schema for this event.") return [] end @logger.debug? && @logger.debug("Getting schema for file", :path => path) schema = getCachedSchemaForFile(path) if schema @logger.debug? && @logger.debug("Using cached schema", :cols => schema) event["_schemacachetelemetry"]="cachedEntryUsed" if @add_schema_cache_telemetry_to_event touchSchema(path) return schema end @logger.debug? && @logger.debug("Event from unknown file/schema. Reading schema from that file.", :path => path) scrubSchemaCache(event) if @max_cached_schema_age_hours > 0 csvFileLine = readSchemaLineFromFile(path) if !csvFileLine || csvFileLine.length == 0 @logger.warn("No suitable schema row found in file.", :path => path) return [] end schema = CSV.parse_line(csvFileLine, :col_sep => @separator, :quote_char => @quote_char) addSchemaToCache(path, schema) @logger.debug? && @logger.debug("Schema read from file:", :path => path, :cols => schema) if @add_schema_cache_telemetry_to_event event["_schemacachetelemetry"]="newEntryCreated" event["_cache_touch_time"]=Time.now end # Special handling for the schema row event: tag _csvmetadata and don't return individual column attributes if @fileColumns[path].join == parsedValues.join @logger.debug? && @logger.debug("Received the schema row event. Tagging w/ _csvmetadata", :message => ) event["_csvmetadata"] = true return [] else return schema end end |
#readSchemaLineFromFile(path) ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/logstash/inputs/csvfile.rb', line 196 def readSchemaLineFromFile(path) csvFileLine = "" File.open(path, "r") do |f| while csvFileLine.length == 0 and csvFileLine = f.gets if @schema_pattern_to_match if !csvFileLine.end_with?("\n") or !csvFileLine.match(@schema_pattern_to_match) csvFileLine = "" end end end end csvFileLine end |
#register ⇒ Object
86 87 88 89 90 91 92 |
# File 'lib/logstash/inputs/csvfile.rb', line 86 def register @fileColumns = Hash.new @schemaTouchedTimes = Hash.new super() @logger.warn("schema cache scrubbing disabled. Memory use will grow over time.") if @max_cached_schema_age_hours <= 0 end |
#scrubSchemaCache(event) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/logstash/inputs/csvfile.rb', line 210 def scrubSchemaCache(event) @logger.debug? && @logger.debug("Scrubbing schema cache", :size => @fileColumns.length) event["_schemacachetelemetryscrubbedbeforecount"]=@fileColumns.length if @add_schema_cache_telemetry_to_event expiringFiles = [] now = Time.now @schemaTouchedTimes.each do |filename, lastReadTime| if (lastReadTime + (@max_cached_schema_age_hours * 60 * 60)) < now expiringFiles << filename @logger.debug? && @logger.debug("Expiring schema for: ", :file => filename, :lastRead => lastReadTime) end end expiringFiles.each do |filename| @fileColumns.delete(filename) @schemaTouchedTimes.delete(filename) @logger.debug? && @logger.debug("Deleted schema for: ", :file => filename) end event["_schemacachetelemetryscrubbedaftercount"]=@fileColumns.length if @add_schema_cache_telemetry_to_event @logger.debug? && @logger.debug("Done scrubbing schema cache", :size => @fileColumns.length) end |
#touchSchema(path) ⇒ Object
192 193 194 |
# File 'lib/logstash/inputs/csvfile.rb', line 192 def touchSchema(path) @schemaTouchedTimes[path] = Time.now end |