Class: LogStash::Outputs::SQS

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/outputs/sqs.rb

Overview

Push events to an Amazon Web Services (AWS) Simple Queue Service (SQS) queue.

SQS is a simple, scalable queue system that is part of the Amazon Web Services suite of tools. Although SQS is similar to other queuing systems such as Advanced Message Queuing Protocol (AMQP), it uses a custom API and requires that you have an AWS account. See aws.amazon.com/sqs/ for more details on how SQS works, what the pricing schedule looks like and how to setup a queue.

The “consumer” identity must have the following permissions on the queue:

* `sqs:GetQueueUrl`
* `sqs:SendMessage`
* `sqs:SendMessageBatch`

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user. See aws.amazon.com/iam/ for more details on setting up AWS identities. A sample policy is as follows:

source,json

{

"Version": "2012-10-17",
"Statement": [
  {
    "Effect": "Allow",
    "Action": [
      "sqs:GetQueueUrl",
      "sqs:SendMessage",
      "sqs:SendMessageBatch"
    ],
    "Resource": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue"
  }
]

}

Batch Publishing

This output publishes messages to SQS in batches in order to optimize event throughput and increase performance. This is done using the [‘SendMessageBatch`](docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html) API. When publishing messages to SQS in batches, the following service limits must be respected (see [Limits in Amazon SQS](docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-messages.html)):

* The maximum allowed individual message size is 256KiB.
* The maximum total payload size (i.e. the sum of the sizes of all
  individual messages within a batch) is also 256KiB.

This plugin will dynamically adjust the size of the batch published to SQS in order to ensure that the total payload size does not exceed 256KiB.

WARNING: This output cannot currently handle messages larger than 256KiB. Any single message exceeding this size will be dropped.

Instance Method Summary collapse

Instance Method Details

#multi_receive_encoded(encoded_events) ⇒ Object



113
114
115
116
117
118
119
# File 'lib/logstash/outputs/sqs.rb', line 113

def multi_receive_encoded(encoded_events)
  if @batch_events > 1
    multi_receive_encoded_batch(encoded_events)
  else
    multi_receive_encoded_single(encoded_events)
  end
end

#registerObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/logstash/outputs/sqs.rb', line 90

def register
  @sqs = Aws::SQS::Client.new(aws_options_hash)

  if @batch_events > 10
    raise LogStash::ConfigurationError, 'The maximum batch size is 10 events'
  elsif @batch_events < 1
    raise LogStash::ConfigurationError, 'The batch size must be greater than 0'
 end

  begin
    params = { queue_name: @queue }
    params[:queue_owner_aws_account_id] = @queue_owner_aws_account_id if @queue_owner_aws_account_id

    @logger.debug('Connecting to SQS queue', params.merge(region: region))
    @queue_url = @sqs.get_queue_url(params)[:queue_url]
    @logger.info('Connected to SQS queue successfully', params.merge(region: region))
  rescue Aws::SQS::Errors::ServiceError => e
    @logger.error('Failed to connect to SQS', :error => e)
    raise LogStash::ConfigurationError, 'Verify the SQS queue name and your credentials'
  end
end