Class: Klunk::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/klunk/queue.rb

Class Method Summary collapse

Class Method Details

.build(queue_options) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/klunk/queue.rb', line 14

def build(queue_options)
  queue_name = queue_options.delete(:name)
  subscriptions = queue_options.delete(:subscribes)
  deadletter_queue = create_deadletter(queue_name)
  deadletter_attributes = get_attributes(deadletter_queue[:queue_url])
  attributes = build_attributes(queue_options)
  attributes[:RedrivePolicy][:deadLetterTargetArn] =
    deadletter_attributes['QueueArn']
  attributes[:RedrivePolicy] = attributes[:RedrivePolicy].to_json
  create(queue_name, attributes, subscriptions)
end

.build_attributes(queue, attributes = {}) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/klunk/queue.rb', line 67

def build_attributes(queue, attributes = {})
  max_receive_count = queue[:retries_limit] ||
                      Klunk.configuration.retries_limit
  message_retention_period = queue[:message_retention_period] ||
                             Klunk.configuration.message_retention_period
  {
    MessageRetentionPeriod: message_retention_period.to_s,
    RedrivePolicy: {
      maxReceiveCount: max_receive_count
    }.merge(attributes)
  }
end

.clientObject



84
85
86
# File 'lib/klunk/queue.rb', line 84

def client
  @client ||= Aws::SQS::Client.new
end

.create(queue_name, attributes, subscriptions) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/klunk/queue.rb', line 26

def create(queue_name, attributes, subscriptions)
  begin
    queue = client.create_queue(
      queue_name: name_for(queue_name),
      attributes: attributes
    )
  rescue Aws::SQS::Errors::QueueAlreadyExists
    puts "#{queue_name} already exists.".green
    queue = client.create_queue(queue_name: name_for(queue_name))
  end
  subscriptions.to_a.each do |subscription|
    topic_name = Topic.name_for(subscription[:name], subscription)
    topic = Topic.create(topic_name)
    ap Topic.subscribe(queue.queue_url, topic.topic_arn)
  end
  queue
end

.create_deadletter(queue_name) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/klunk/queue.rb', line 58

def create_deadletter(queue_name)
  client.create_queue(
    queue_name: name_for(queue_name, true),
    attributes: {
      MessageRetentionPeriod: deadletter_message_retention_period.to_s
    }
  )
end

.deadletter_message_retention_periodObject



92
93
94
# File 'lib/klunk/queue.rb', line 92

def deadletter_message_retention_period
  Klunk.configuration.deadletter_message_retention_period
end

.get_attributes(queue_url, attribute_names = ['All']) ⇒ Object



44
45
46
47
48
49
50
# File 'lib/klunk/queue.rb', line 44

def get_attributes(queue_url, attribute_names = ['All'])
  attributes = client.get_queue_attributes(
    queue_url: queue_url,
    attribute_names: attribute_names
  )
  attributes[:attributes]
end

.name_for(queue_name, deadletter = false) ⇒ Object



52
53
54
55
56
# File 'lib/klunk/queue.rb', line 52

def name_for(queue_name, deadletter = false)
  name = [Klunk.configuration.prefix, ENV['EB_ENV'], queue_name]
  name << Klunk.configuration.deadletter_suffix if deadletter
  name.compact.reject(&:blank?).join('_')
end

.queuesObject



6
7
8
9
10
11
12
# File 'lib/klunk/queue.rb', line 6

def queues
  if File.exists?('config/queues.yml')
    ::YAML.load_file('config/queues.yml').map(&:deep_symbolize_keys)
  else
    []
  end
end

.queues_for_shoryuken_configObject



80
81
82
# File 'lib/klunk/queue.rb', line 80

def queues_for_shoryuken_config
  Klunk::Queue::queues.map{|queue| [name_for(queue[:name]), queue[:priority]] }
end

.resourceObject



88
89
90
# File 'lib/klunk/queue.rb', line 88

def resource
  Aws::SQS::Resource.new(client: client)
end