Class: Klunk::Topic
- Inherits:
-
Object
- Object
- Klunk::Topic
- Defined in:
- lib/klunk/topic.rb
Class Method Summary collapse
- .add_policy(queue_url, topic_arn, previous_policy) ⇒ Object
- .build_policy(queue_url, topic_arn) ⇒ Object
- .build_statement(queue_url, topic_arn) ⇒ Object
- .client ⇒ Object
- .create(topic_name) ⇒ Object
- .describe(topic_name, options = {}) ⇒ Object
- .name_for(topic_name, options = {}) ⇒ Object
- .publish(topic_name, message) ⇒ Object
- .subscribe(queue_url, topic_arn, previous_policy = nil) ⇒ Object
- .topic_arn(topic_name, options = {}) ⇒ Object
- .topics ⇒ Object
Class Method Details
.add_policy(queue_url, topic_arn, previous_policy) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/klunk/topic.rb', line 62 def add_policy(queue_url, topic_arn, previous_policy) previous_policy ||= build_policy(queue_url, topic_arn) Queue.client.set_queue_attributes( queue_url: queue_url, attributes: { Policy: previous_policy.tap do |p| (p['Statement'] ||= []) << build_statement(queue_url, topic_arn) p['Statement'].uniq! end.to_json } ) end |
.build_policy(queue_url, topic_arn) ⇒ Object
91 92 93 94 95 96 97 98 99 100 |
# File 'lib/klunk/topic.rb', line 91 def build_policy(queue_url, topic_arn) queue_arn = Klunk::Queue.get_attributes(queue_url)['QueueArn'] { 'Version': '2012-10-17', 'Id': "#{queue_arn}/SQSDefaultPolicy", 'Statement': [ build_statement(queue_url, topic_arn) ] } end |
.build_statement(queue_url, topic_arn) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/klunk/topic.rb', line 75 def build_statement(queue_url, topic_arn) queue_arn = Klunk::Queue.get_attributes(queue_url)['QueueArn'] queue_name = queue_arn.split(':').last topic_name = topic_arn.split(':').last { 'Sid': "#{queue_name.camelize}_Send_#{topic_name.camelize}", 'Effect': 'Allow', 'Principal': { 'AWS': '*' }, 'Action': 'SQS:SendMessage', 'Resource': queue_arn, 'Condition': { 'ArnEquals': { 'aws:SourceArn': topic_arn } } } end |
.client ⇒ Object
102 103 104 |
# File 'lib/klunk/topic.rb', line 102 def client @client ||= Aws::SNS::Client.new end |
.create(topic_name) ⇒ Object
20 21 22 23 24 |
# File 'lib/klunk/topic.rb', line 20 def create(topic_name) topic = client.create_topic(name: name_for(topic_name)) puts "Topic created: #{topic.topic_arn}".cyan topic end |
.describe(topic_name, options = {}) ⇒ Object
52 53 54 55 56 57 58 59 60 |
# File 'lib/klunk/topic.rb', line 52 def describe(topic_name, = {}) puts topic_arn(topic_name, ) { topic: topic_arn(topic_name, ), subscriptions: client.list_subscriptions_by_topic( topic_arn: topic_arn(topic_name, ) ).subscriptions.map { |topic| topic[:endpoint] } } end |
.name_for(topic_name, options = {}) ⇒ Object
14 15 16 17 18 |
# File 'lib/klunk/topic.rb', line 14 def name_for(topic_name, = {}) system_name = [:system] || Klunk.configuration.prefix [system_name, ENV['EB_ENV'], topic_name] .compact.reject(&:blank?).join('_') end |
.publish(topic_name, message) ⇒ Object
26 27 28 29 30 |
# File 'lib/klunk/topic.rb', line 26 def publish(topic_name, ) topic_arn = topic_arn(topic_name) puts "Publishing to #{topic_arn}: #{message}" client.publish(topic_arn: topic_arn(topic_name), message: ) end |
.subscribe(queue_url, topic_arn, previous_policy = nil) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/klunk/topic.rb', line 36 def subscribe(queue_url, topic_arn, previous_policy = nil) queue_attributes = Klunk::Queue.get_attributes(queue_url) queue_arn = queue_attributes['QueueArn'] subscription = client.subscribe( topic_arn: topic_arn, protocol: 'sqs', endpoint: queue_arn ) client.set_subscription_attributes( subscription_arn: subscription.subscription_arn, attribute_name: 'RawMessageDelivery', attribute_value: 'true' ) if queue_attributes.key?('Policy') previous_policy = JSON.parse(queue_attributes['Policy']) end add_policy(queue_url, topic_arn, previous_policy) end |
.topic_arn(topic_name, options = {}) ⇒ Object
32 33 34 |
# File 'lib/klunk/topic.rb', line 32 def topic_arn(topic_name, = {}) "arn:aws:sns:#{ENV['AWS_REGION']}:#{ENV['AWS_ACCOUNT_ID']}:#{name_for(topic_name, options)}" end |
.topics ⇒ Object
6 7 8 9 10 11 12 |
# File 'lib/klunk/topic.rb', line 6 def topics if File.exists?('config/topics.yml') ::YAML.load_file('config/topics.yml').map(&:deep_symbolize_keys) else [] end end |