Class: Klunk::Queue
- Inherits:
-
Object
- Object
- Klunk::Queue
- Defined in:
- lib/klunk/queue.rb
Class Method Summary collapse
- .build(queue_options) ⇒ Object
- .build_attributes(queue, attributes = {}) ⇒ Object
- .client ⇒ Object
- .create(queue_name, attributes, subscriptions) ⇒ Object
- .create_deadletter(queue_name) ⇒ Object
- .deadletter_message_retention_period ⇒ Object
- .get_attributes(queue_url, attribute_names = ['All']) ⇒ Object
- .name_for(queue_name, deadletter = false) ⇒ Object
- .queues ⇒ Object
- .queues_for_shoryuken_config ⇒ Object
- .resource ⇒ Object
Class Method Details
.build(queue_options) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/klunk/queue.rb', line 14 def build() queue_name = .delete(:name) subscriptions = .delete(:subscribes) deadletter_queue = create_deadletter(queue_name) deadletter_attributes = get_attributes(deadletter_queue[:queue_url]) attributes = build_attributes() attributes[:RedrivePolicy][:deadLetterTargetArn] = deadletter_attributes['QueueArn'] attributes[:RedrivePolicy] = attributes[:RedrivePolicy].to_json create(queue_name, attributes, subscriptions) end |
.build_attributes(queue, attributes = {}) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/klunk/queue.rb', line 67 def build_attributes(queue, attributes = {}) max_receive_count = queue[:retries_limit] || Klunk.configuration.retries_limit = queue[:message_retention_period] || Klunk.configuration. { MessageRetentionPeriod: .to_s, RedrivePolicy: { maxReceiveCount: max_receive_count }.merge(attributes) } end |
.client ⇒ Object
84 85 86 |
# File 'lib/klunk/queue.rb', line 84 def client @client ||= Aws::SQS::Client.new end |
.create(queue_name, attributes, subscriptions) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/klunk/queue.rb', line 26 def create(queue_name, attributes, subscriptions) begin queue = client.create_queue( queue_name: name_for(queue_name), attributes: attributes ) rescue Aws::SQS::Errors::QueueAlreadyExists puts "#{queue_name} already exists.".green queue = client.create_queue(queue_name: name_for(queue_name)) end subscriptions.to_a.each do |subscription| topic_name = Topic.name_for(subscription[:name], subscription) topic = Topic.create(topic_name) ap Topic.subscribe(queue.queue_url, topic.topic_arn) end queue end |
.create_deadletter(queue_name) ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/klunk/queue.rb', line 58 def create_deadletter(queue_name) client.create_queue( queue_name: name_for(queue_name, true), attributes: { MessageRetentionPeriod: .to_s } ) end |
.deadletter_message_retention_period ⇒ Object
92 93 94 |
# File 'lib/klunk/queue.rb', line 92 def Klunk.configuration. end |
.get_attributes(queue_url, attribute_names = ['All']) ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/klunk/queue.rb', line 44 def get_attributes(queue_url, attribute_names = ['All']) attributes = client.get_queue_attributes( queue_url: queue_url, attribute_names: attribute_names ) attributes[:attributes] end |
.name_for(queue_name, deadletter = false) ⇒ Object
52 53 54 55 56 |
# File 'lib/klunk/queue.rb', line 52 def name_for(queue_name, deadletter = false) name = [Klunk.configuration.prefix, ENV['EB_ENV'], queue_name] name << Klunk.configuration.deadletter_suffix if deadletter name.compact.reject(&:blank?).join('_') end |
.queues ⇒ Object
6 7 8 9 10 11 12 |
# File 'lib/klunk/queue.rb', line 6 def queues if File.exists?('config/queues.yml') ::YAML.load_file('config/queues.yml').map(&:deep_symbolize_keys) else [] end end |
.queues_for_shoryuken_config ⇒ Object
80 81 82 |
# File 'lib/klunk/queue.rb', line 80 def queues_for_shoryuken_config Klunk::Queue::queues.map{|queue| [name_for(queue[:name]), queue[:priority]] } end |
.resource ⇒ Object
88 89 90 |
# File 'lib/klunk/queue.rb', line 88 def resource Aws::SQS::Resource.new(client: client) end |