Class: LogStash::Inputs::Kinesis::Worker

Inherits:
Object
  • Object
show all
Includes:
comcom.amazonawscom.amazonaws.servicescom.amazonaws.services.kinesiscom.amazonaws.services.kinesis.clientlibrarycom.amazonaws.services.kinesis.clientlibrary.interfacescom.amazonaws.services.kinesis.clientlibrary.interfaces.v2com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor
Defined in:
lib/logstash/inputs/kinesis/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Worker



13
14
15
16
17
18
19
20
21
22
# File 'lib/logstash/inputs/kinesis/worker.rb', line 13

def initialize(*args)
  # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
  if !@constructed
    @codec, @output_queue, @decorator, @checkpoint_interval, @logger = args
    @next_checkpoint = Time.now - 600
    @constructed = true
  else
    _shard_id = args[0].shardId
  end
end

Instance Attribute Details

#checkpoint_intervalObject (readonly)

Returns the value of attribute checkpoint_interval.



5
6
7
# File 'lib/logstash/inputs/kinesis/worker.rb', line 5

def checkpoint_interval
  @checkpoint_interval
end

#codecObject (readonly)

Returns the value of attribute codec.



5
6
7
# File 'lib/logstash/inputs/kinesis/worker.rb', line 5

def codec
  @codec
end

#decoratorObject (readonly)

Returns the value of attribute decorator.



5
6
7
# File 'lib/logstash/inputs/kinesis/worker.rb', line 5

def decorator
  @decorator
end

#loggerObject (readonly)

Returns the value of attribute logger.



5
6
7
# File 'lib/logstash/inputs/kinesis/worker.rb', line 5

def logger
  @logger
end

Instance Method Details

#processRecords(records_input) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/logstash/inputs/kinesis/worker.rb', line 25

def processRecords(records_input)
  records_input.records.each { |record| process_record(record) }
  if Time.now >= @next_checkpoint
    checkpoint(records_input.checkpointer)
    @next_checkpoint = Time.now + @checkpoint_interval
  end
end

#shutdown(shutdown_input) ⇒ Object



33
34
35
36
37
# File 'lib/logstash/inputs/kinesis/worker.rb', line 33

def shutdown(shutdown_input)
  if shutdown_input.shutdown_reason == com.amazonaws.services.kinesis.clientlibrary.lib.worker::ShutdownReason::TERMINATE
    checkpoint(shutdown_input.checkpointer)
  end
end