Class: LogStash::Inputs::DynamoDB
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::DynamoDB
- Defined in:
- lib/logstash/inputs/dynamodb.rb
Overview
DynamoDBStreams plugin that will first scan the DynamoDB table and then consume streams and push those records into Logstash
Constant Summary collapse
- USER_AGENT =
" logstash-input-dynamodb/1.0.0".freeze
- LF_DYNAMODB =
"dymamodb".freeze
- LF_JSON_NO_BIN =
"json_drop_binary".freeze
- LF_PLAIN =
"plain".freeze
- LF_JSON_BIN_AS_TEXT =
"json_binary_as_text".freeze
- VT_KEYS_ONLY =
"keys_only".freeze
- VT_OLD_IMAGE =
"old_image".freeze
- VT_NEW_IMAGE =
"new_image".freeze
- VT_ALL_IMAGES =
"new_and_old_images".freeze
Instance Method Summary collapse
- #build_credentials ⇒ Object
- #queue_event(event, logstash_queue, event_host) ⇒ Object
- #register ⇒ Object
- #run(logstash_queue) ⇒ Object
Instance Method Details
#build_credentials ⇒ Object
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/logstash/inputs/dynamodb.rb', line 130 def build_credentials if !@aws_access_key_id.to_s.empty? and !@aws_secret_access_key.to_s.empty? @logger.info("Using static credentials: " + @aws_access_key_id + ", " + @aws_secret_access_key) basic = AmazonCredentials::BasicAWSCredentials.new(@aws_access_key_id, @aws_secret_access_key) return AmazonCredentials::StaticCredentialsProvider.new(basic) else @logger.info("Using default provider chain") return AmazonCredentials::DefaultAWSCredentialsProviderChain.new() end # if neither aws access keys end |
#queue_event(event, logstash_queue, event_host) ⇒ Object
313 314 315 316 317 |
# File 'lib/logstash/inputs/dynamodb.rb', line 313 def queue_event(event, logstash_queue, event_host) logstash_event = LogStash::Event.new("message" => event, "host" => event_host) decorate(logstash_event) logstash_queue << logstash_event end |
#register ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/logstash/inputs/dynamodb.rb', line 142 def register LogStash::Logger.setup_log4j(@logger) @host = Socket.gethostname @logger.info("Tablename: " + @table_name) @queue = SizedQueue.new(20) @credentials = build_credentials() @logger.info("Checkpointer: " + @checkpointer) if @perform_scan and @view_type == VT_OLD_IMAGE raise(LogStash::ConfigurationError, "Cannot perform scan with view type: " + @view_type + " configuration") end if @view_type == VT_ALL_IMAGES and !(@log_format == LF_PLAIN) raise(LogStash::ConfigurationError, "Cannot show view_type: " + @view_type + ", with log_format: " + @log_format) end #Create DynamoDB Client @client_configuration = AmazonDynamoDB::ClientConfiguration.new() @client_configuration.setUserAgent(@client_configuration.getUserAgent() + USER_AGENT) @dynamodb_client = AmazonDynamoDB::AmazonDynamoDBClient.new(@credentials, @client_configuration) @logger.info(@dynamodb_client.to_s) @dynamodb_client.setEndpoint(@endpoint) @logger.info("DynamoDB endpoint: " + @endpoint) @key_schema = Array.new @table_description = @dynamodb_client.describeTable(@table_name).getTable() key_iterator = @table_description.getKeySchema().iterator() while(key_iterator.hasNext()) @key_schema.push(key_iterator.next().getAttributeName().to_s) end region = RegionUtils.getRegionByEndpoint(@endpoint) @parser ||= Logstash::Inputs::DynamoDB::DynamoDBLogParser.new(@view_type, @log_format, @key_schema, region) if @perform_stream setup_stream end # unless @perform_stream end |
#run(logstash_queue) ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/logstash/inputs/dynamodb.rb', line 184 def run(logstash_queue) begin run_with_catch(logstash_queue) rescue LogStash::ShutdownSignal exit_threads until @queue.empty? @logger.info("Flushing rest of events in logstash queue") event = @queue.pop() queue_event(@parser.parse_stream(event), logstash_queue, @host) end # until [email protected]? end # begin end |