Class: Logstash::Inputs::DynamoDB::LogStashRecordProcessor

Inherits:
Object
  • Object
show all
Includes:
comcom.amazonawscom.amazonaws.servicescom.amazonaws.services.kinesiscom.amazonaws.services.kinesis.clientlibrarycom.amazonaws.services.kinesis.clientlibrary.interfacescom.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor
Defined in:
lib/logstash/inputs/LogStashRecordProcessor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue) ⇒ LogStashRecordProcessor

Returns a new instance of LogStashRecordProcessor.



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 32

def initialize(queue)
  # Workaround for IRecordProcessor.initialize(String shardId) interfering with constructor.
  # No good way to overload methods in JRuby, so deciding which was supposed to be called here.
  if (queue.is_a? String)
    @shard_id  = queue
    return
  else
    @queue ||= queue
    @logger ||= LogStash::Inputs::DynamoDB.logger
  end
end

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



30
31
32
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 30

def queue
  @queue
end

#shard_idObject

Returns the value of attribute shard_id.



30
31
32
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 30

def shard_id
  @shard_id
end

Instance Method Details

#process_records(records, checkpointer) ⇒ Object



44
45
46
47
48
49
50
51
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 44

def process_records(records, checkpointer)
  @logger.debug("Processing batch of " + records.size().to_s + " records")
  records.each do |record|
    @queue.push(record)
  end
  #checkpoint once all of the records have been consumed
  checkpointer.checkpoint()
end

#shutdown(checkpointer, reason) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/inputs/LogStashRecordProcessor.rb', line 53

def shutdown(checkpointer, reason)
  case reason
  when ShutdownReason::TERMINATE
    checkpointer.checkpoint()
  when ShutdownReason::ZOMBIE
  else
    raise RuntimeError, "Invalid shutdown reason."
  end
  unless @shard_id.nil?
    @logger.info("shutting down record processor with shardId: " + @shard_id + " with reason " + reason.to_s)
  end
end