Class: LogStash::Inputs::Kinesis::Worker
- Inherits:
-
Object
- Object
- LogStash::Inputs::Kinesis::Worker
- Includes:
- comcom.amazonawscom.amazonaws.servicescom.amazonaws.services.kinesiscom.amazonaws.services.kinesis.clientlibrarycom.amazonaws.services.kinesis.clientlibrary.interfacescom.amazonaws.services.kinesis.clientlibrary.interfaces.v2com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor
- Defined in:
- lib/logstash/inputs/kinesis/worker.rb
Instance Attribute Summary collapse
-
#checkpoint_interval ⇒ Object
readonly
Returns the value of attribute checkpoint_interval.
-
#codec ⇒ Object
readonly
Returns the value of attribute codec.
-
#decorator ⇒ Object
readonly
Returns the value of attribute decorator.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
-
#initialize(*args) ⇒ Worker
constructor
A new instance of Worker.
- #processRecords(records_input) ⇒ Object
- #shutdown(shutdown_input) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Worker
13 14 15 16 17 18 19 20 21 22 |
# File 'lib/logstash/inputs/kinesis/worker.rb', line 13 def initialize(*args) # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor if !@constructed @codec, @output_queue, @decorator, @checkpoint_interval, @logger = args @next_checkpoint = Time.now - 600 @constructed = true else _shard_id = args[0].shardId end end |
Instance Attribute Details
#checkpoint_interval ⇒ Object (readonly)
Returns the value of attribute checkpoint_interval.
5 6 7 |
# File 'lib/logstash/inputs/kinesis/worker.rb', line 5 def checkpoint_interval @checkpoint_interval end |
#codec ⇒ Object (readonly)
Returns the value of attribute codec.
5 6 7 |
# File 'lib/logstash/inputs/kinesis/worker.rb', line 5 def codec @codec end |
#decorator ⇒ Object (readonly)
Returns the value of attribute decorator.
5 6 7 |
# File 'lib/logstash/inputs/kinesis/worker.rb', line 5 def decorator @decorator end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
5 6 7 |
# File 'lib/logstash/inputs/kinesis/worker.rb', line 5 def logger @logger end |
Instance Method Details
#processRecords(records_input) ⇒ Object
25 26 27 28 29 30 31 |
# File 'lib/logstash/inputs/kinesis/worker.rb', line 25 def processRecords(records_input) records_input.records.each { |record| process_record(record) } if Time.now >= @next_checkpoint checkpoint(records_input.checkpointer) @next_checkpoint = Time.now + @checkpoint_interval end end |
#shutdown(shutdown_input) ⇒ Object
33 34 35 36 37 |
# File 'lib/logstash/inputs/kinesis/worker.rb', line 33 def shutdown(shutdown_input) if shutdown_input.shutdown_reason == com.amazonaws.services.kinesis.clientlibrary.lib.worker::ShutdownReason::TERMINATE checkpoint(shutdown_input.checkpointer) end end |