Class: LogStash::Inputs::JSQS
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::JSQS
- Defined in:
- lib/logstash/inputs/jsqs.rb
Overview
Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.
SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools.
Although SQS is similar to other queuing systems like AMQP, it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.
To use this plugin, you must:
* Have an AWS account
* Setup an SQS queue
* Create an identify that has access to consume from the queue.
The “consumer” identity must have the following permissions on the queue:
* sqs:ChangeMessageVisibility
* sqs:ChangeMessageVisibilityBatch
* sqs:DeleteMessage
* sqs:DeleteMessageBatch
* sqs:GetQueueAttributes
* sqs:GetQueueUrl
* sqs:ListQueues
* sqs:ReceiveMessage
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. A sample policy is as follows:
{
"Statement": [
{
"Action": [
"sqs:ChangeMessageVisibility",
"sqs:ChangeMessageVisibilityBatch",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ListQueues",
"sqs:SendMessage",
"sqs:SendMessageBatch"
],
"Effect": "Allow",
"Resource": [
"arn:aws:sqs:us-east-1:123456789012:Logstash"
]
}
]
}
See aws.amazon.com/iam/ for more details on setting up AWS identities.
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/logstash/inputs/jsqs.rb', line 78 def register @logger.info("Registering SQS input", :queue => @queue) # Client config @logger.debug("Creating AWS SQS queue client", :queue => @queue) clientConfig = ClientConfiguration.new.withMaxConnections(@max_connections) # SQS client @sqs = AmazonSQSAsyncClient.new(clientConfig) @logger.debug("Amazon SQS Client created") # Buffered client config queueBufferConfig = QueueBufferConfig.new.withMaxBatchOpenMs(@max_batch_open_ms).withMaxInflightReceiveBatches(@max_inflight_receive_batches).withMaxDoneReceiveBatches(@max_done_receive_batches) @bufferedSqs = AmazonSQSBufferedAsyncClient.new(@sqs, queueBufferConfig); @logger.info("Connected to AWS SQS queue successfully.", :queue => @queue) @receiveRequest = ReceiveMessageRequest.new(@queueUrl).withMaxNumberOfMessages() end |
#run(output_queue) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/logstash/inputs/jsqs.rb', line 100 def run(output_queue) @logger.debug("Polling SQS queue", :queue => @queue) while running? begin result = @bufferedSqs.receiveMessage(@receiveRequest) deleteEntries = [] # Process messages (expected 0 - 10 messages) result..each_with_index { |, i| @codec.decode(.body) do |event| if event.is_a?(Array) event.each do |msg| decorate(msg) output_queue << msg end else decorate(event) output_queue << event end end # codec.decode #Add Delete entry for this message deleteEntries << DeleteMessageBatchRequestEntry.new.withId(i.to_s).withReceiptHandle(.getReceiptHandle()) } if deleteEntries.size > 0 deleteRequest = DeleteMessageBatchRequest.new.withQueueUrl(@queueUrl); deleteRequest.setEntries(deleteEntries); # Issue delete request @bufferedSqs.deleteMessageBatch(deleteRequest); end # end if rescue Exception => e if (@retry_count -= 1) > 0 @logger.warn("Unable to access SQS queue. Sleeping before retrying.", :error => e.to_s, :queue => @queue) sleep(10) retry else @logger.error("Unable to access SQS queue. Aborting.", :error => e.to_s, :queue => @queue) teardown end # if end # begin end # polling loop end |
#teardown ⇒ Object
def run
143 144 145 146 147 |
# File 'lib/logstash/inputs/jsqs.rb', line 143 def teardown @sqs = nil @bufferedSqs = nil finished end |