Class: LogStash::Inputs::DynamoDB

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/dynamodb.rb

Overview

DynamoDBStreams plugin that will first scan the DynamoDB table and then consume streams and push those records into Logstash

Constant Summary collapse

USER_AGENT =
" logstash-input-dynamodb/1.0.0".freeze
LF_DYNAMODB =
"dymamodb".freeze
LF_JSON_NO_BIN =
"json_drop_binary".freeze
LF_PLAIN =
"plain".freeze
LF_JSON_BIN_AS_TEXT =
"json_binary_as_text".freeze
VT_KEYS_ONLY =
"keys_only".freeze
VT_OLD_IMAGE =
"old_image".freeze
VT_NEW_IMAGE =
"new_image".freeze
VT_ALL_IMAGES =
"new_and_old_images".freeze

Instance Method Summary collapse

Instance Method Details

#build_credentialsObject



130
131
132
133
134
135
136
137
138
139
# File 'lib/logstash/inputs/dynamodb.rb', line 130

def build_credentials
  if !@aws_access_key_id.to_s.empty? and !@aws_secret_access_key.to_s.empty?
    @logger.info("Using static credentials: " + @aws_access_key_id + ", " + @aws_secret_access_key)
    basic = AmazonCredentials::BasicAWSCredentials.new(@aws_access_key_id, @aws_secret_access_key)
    return AmazonCredentials::StaticCredentialsProvider.new(basic)
  else
    @logger.info("Using default provider chain")
    return AmazonCredentials::DefaultAWSCredentialsProviderChain.new()
  end # if neither aws access keys
end

#queue_event(event, logstash_queue, event_host) ⇒ Object



313
314
315
316
317
# File 'lib/logstash/inputs/dynamodb.rb', line 313

def queue_event(event, logstash_queue, event_host)
  logstash_event = LogStash::Event.new("message" => event, "host" => event_host)
  decorate(logstash_event)
  logstash_queue << logstash_event
end

#registerObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/logstash/inputs/dynamodb.rb', line 142

def register
  LogStash::Logger.setup_log4j(@logger)

  @host = Socket.gethostname
  @logger.info("Tablename: " + @table_name)
  @queue = SizedQueue.new(20)
  @credentials = build_credentials()
  @logger.info("Checkpointer: " + @checkpointer)

  if @perform_scan and @view_type == VT_OLD_IMAGE
    raise(LogStash::ConfigurationError, "Cannot perform scan with view type: " + @view_type + " configuration")
  end
  if @view_type == VT_ALL_IMAGES and !(@log_format == LF_PLAIN)
    raise(LogStash::ConfigurationError, "Cannot show view_type: " + @view_type + ", with log_format: " + @log_format)
  end

  #Create DynamoDB Client
  @client_configuration = AmazonDynamoDB::ClientConfiguration.new()
  @client_configuration.setUserAgent(@client_configuration.getUserAgent() + USER_AGENT)
  @dynamodb_client = AmazonDynamoDB::AmazonDynamoDBClient.new(@credentials, @client_configuration)

  @logger.info(@dynamodb_client.to_s)

  @dynamodb_client.setEndpoint(@endpoint)
  @logger.info("DynamoDB endpoint: " + @endpoint)

  @key_schema = Array.new
  @table_description = @dynamodb_client.describeTable(@table_name).getTable()
  key_iterator = @table_description.getKeySchema().iterator()
  while(key_iterator.hasNext())
    @key_schema.push(key_iterator.next().getAttributeName().to_s)
  end
  region = RegionUtils.getRegionByEndpoint(@endpoint)

  @parser ||= Logstash::Inputs::DynamoDB::DynamoDBLogParser.new(@view_type, @log_format, @key_schema, region)

  if @perform_stream
    setup_stream
  end # unless @perform_stream
end

#run(logstash_queue) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/logstash/inputs/dynamodb.rb', line 184

def run(logstash_queue)
  begin
    run_with_catch(logstash_queue)
  rescue LogStash::ShutdownSignal
    exit_threads
    until @queue.empty?
      @logger.info("Flushing rest of events in logstash queue")
      event = @queue.pop()
      queue_event(@parser.parse_stream(event), logstash_queue, @host)
    end # until [email protected]?
  end # begin
end