Class: Logstash::Inputs::DynamoDB::LogStashRecordProcessor
- Inherits:
-
Object
- Object
- Logstash::Inputs::DynamoDB::LogStashRecordProcessor
- Includes:
- comcom.amazonawscom.amazonaws.servicescom.amazonaws.services.kinesiscom.amazonaws.services.kinesis.clientlibrarycom.amazonaws.services.kinesis.clientlibrary.interfacescom.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor
- Defined in:
- lib/logstash/inputs/LogStashRecordProcessor.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#shard_id ⇒ Object
Returns the value of attribute shard_id.
Instance Method Summary collapse
-
#initialize(queue) ⇒ LogStashRecordProcessor
constructor
A new instance of LogStashRecordProcessor.
- #process_records(records, checkpointer) ⇒ Object
- #shutdown(checkpointer, reason) ⇒ Object
Constructor Details
#initialize(queue) ⇒ LogStashRecordProcessor
Returns a new instance of LogStashRecordProcessor.
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 32 def initialize(queue) # Workaround for IRecordProcessor.initialize(String shardId) interfering with constructor. # No good way to overload methods in JRuby, so deciding which was supposed to be called here. if (queue.is_a? String) @shard_id = queue return else @queue ||= queue @logger ||= LogStash::Inputs::DynamoDB.logger end end |
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
30 31 32 |
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 30 def queue @queue end |
#shard_id ⇒ Object
Returns the value of attribute shard_id.
30 31 32 |
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 30 def shard_id @shard_id end |
Instance Method Details
#process_records(records, checkpointer) ⇒ Object
44 45 46 47 48 49 50 51 |
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 44 def process_records(records, checkpointer) @logger.debug("Processing batch of " + records.size().to_s + " records") records.each do |record| @queue.push(record) end #checkpoint once all of the records have been consumed checkpointer.checkpoint() end |
#shutdown(checkpointer, reason) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 53 def shutdown(checkpointer, reason) case reason when ShutdownReason::TERMINATE checkpointer.checkpoint() when ShutdownReason::ZOMBIE else raise RuntimeError, "Invalid shutdown reason." end unless @shard_id.nil? @logger.info("shutting down record processor with shardId: " + @shard_id + " with reason " + reason.to_s) end end |