Class: FakeSQS::Queue
- Inherits:
-
Object
- Object
- FakeSQS::Queue
- Defined in:
- lib/fake_sqs/queue.rb
Constant Summary collapse
- VISIBILITY_TIMEOUT =
30
Instance Attribute Summary collapse
-
#arn ⇒ Object
readonly
Returns the value of attribute arn.
-
#message_factory ⇒ Object
readonly
Returns the value of attribute message_factory.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue_attributes ⇒ Object
readonly
Returns the value of attribute queue_attributes.
Instance Method Summary collapse
- #add_queue_attributes(attrs) ⇒ Object
- #attributes ⇒ Object
- #change_message_visibility(receipt, visibility) ⇒ Object
- #check_message_for_dlq(message, options = {}) ⇒ Object
- #default_visibility_timeout ⇒ Object
- #delete_message(receipt) ⇒ Object
- #expire ⇒ Object
-
#initialize(options = {}) ⇒ Queue
constructor
A new instance of Queue.
- #messages ⇒ Object
- #messages_in_flight ⇒ Object
- #published_size ⇒ Object
- #receive_message(options = {}) ⇒ Object
- #reset ⇒ Object
- #reset_messages_in_flight ⇒ Object
- #send_message(options = {}) ⇒ Object
- #size ⇒ Object
- #timeout_messages! ⇒ Object
- #to_yaml ⇒ Object
- #with_lock ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Queue
Returns a new instance of Queue.
18 19 20 21 22 23 24 25 26 |
# File 'lib/fake_sqs/queue.rb', line 18 def initialize( = {}) @message_factory = .fetch(:message_factory) @name = .fetch("QueueName") @arn = .fetch("Arn") { "arn:aws:sqs:us-east-1:#{SecureRandom.hex}:#{@name}" } @queue_attributes = .fetch("Attributes") { {} } @lock = Monitor.new reset end |
Instance Attribute Details
#arn ⇒ Object (readonly)
Returns the value of attribute arn.
16 17 18 |
# File 'lib/fake_sqs/queue.rb', line 16 def arn @arn end |
#message_factory ⇒ Object (readonly)
Returns the value of attribute message_factory.
16 17 18 |
# File 'lib/fake_sqs/queue.rb', line 16 def @message_factory end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
16 17 18 |
# File 'lib/fake_sqs/queue.rb', line 16 def name @name end |
#queue_attributes ⇒ Object (readonly)
Returns the value of attribute queue_attributes.
16 17 18 |
# File 'lib/fake_sqs/queue.rb', line 16 def queue_attributes @queue_attributes end |
Instance Method Details
#add_queue_attributes(attrs) ⇒ Object
36 37 38 |
# File 'lib/fake_sqs/queue.rb', line 36 def add_queue_attributes(attrs) queue_attributes.merge!(attrs) end |
#attributes ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/fake_sqs/queue.rb', line 40 def attributes queue_attributes.merge( "QueueArn" => arn, "ApproximateNumberOfMessages" => published_size, "ApproximateNumberOfMessagesNotVisible" => @messages_in_flight.size, ) end |
#change_message_visibility(receipt, visibility) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/fake_sqs/queue.rb', line 111 def (receipt, visibility) with_lock do = @messages_in_flight[receipt] raise MessageNotInflight unless if visibility == 0 .expire! @messages[receipt] = @messages_in_flight.delete(receipt) else .expire_at(visibility) end end end |
#check_message_for_dlq(message, options = {}) ⇒ Object
126 127 128 129 130 131 132 133 134 135 |
# File 'lib/fake_sqs/queue.rb', line 126 def (, ={}) if redrive_policy = queue_attributes["RedrivePolicy"] && JSON.parse(queue_attributes["RedrivePolicy"]) dlq = [:queues].list.find{|queue| queue.arn == redrive_policy["deadLetterTargetArn"]} if dlq && .approximate_receive_count >= redrive_policy["maxReceiveCount"].to_i dlq.(message: ) .expire! true end end end |
#default_visibility_timeout ⇒ Object
87 88 89 90 91 92 93 |
# File 'lib/fake_sqs/queue.rb', line 87 def default_visibility_timeout if value = attributes['VisibilityTimeout'] value.to_i else VISIBILITY_TIMEOUT end end |
#delete_message(receipt) ⇒ Object
137 138 139 140 141 142 |
# File 'lib/fake_sqs/queue.rb', line 137 def (receipt) with_lock do @messages.delete(receipt) @messages_in_flight.delete(receipt) end end |
#expire ⇒ Object
152 153 154 155 156 157 158 |
# File 'lib/fake_sqs/queue.rb', line 152 def expire with_lock do @messages.merge!(@messages_in_flight) @messages_in_flight.clear() end end |
#messages ⇒ Object
167 168 169 |
# File 'lib/fake_sqs/queue.rb', line 167 def @messages_view end |
#messages_in_flight ⇒ Object
171 172 173 |
# File 'lib/fake_sqs/queue.rb', line 171 def @messages_in_flight_view end |
#published_size ⇒ Object
179 180 181 |
# File 'lib/fake_sqs/queue.rb', line 179 def published_size @messages.values.select { |m| m.published? }.size end |
#receive_message(options = {}) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/fake_sqs/queue.rb', line 58 def ( = {}) amount = Integer .fetch("MaxNumberOfMessages") { "1" } visibility_timeout = Integer .fetch("VisibilityTimeout") { default_visibility_timeout } fail ReadCountOutOfRange, amount if amount > 10 return {} if @messages.empty? result = {} with_lock do actual_amount = amount > published_size ? published_size : amount = @messages.values.select { |m| m.published? } actual_amount.times do = .delete_at(rand(published_size)) @messages.delete(.receipt) unless (, ) .expire_at(visibility_timeout) .receive! @messages_in_flight[.receipt] = result[.receipt] = end end end result end |
#reset ⇒ Object
144 145 146 147 148 149 150 |
# File 'lib/fake_sqs/queue.rb', line 144 def reset with_lock do @messages = {} @messages_view = FakeSQS::CollectionView.new(@messages) end end |
#reset_messages_in_flight ⇒ Object
160 161 162 163 164 165 |
# File 'lib/fake_sqs/queue.rb', line 160 def with_lock do @messages_in_flight = {} @messages_in_flight_view = FakeSQS::CollectionView.new(@messages_in_flight) end end |
#send_message(options = {}) ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/fake_sqs/queue.rb', line 48 def ( = {}) with_lock do = .fetch(:message){ .new() } if @messages[.receipt] = end end end |
#size ⇒ Object
175 176 177 |
# File 'lib/fake_sqs/queue.rb', line 175 def size @messages.size end |
#timeout_messages! ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/fake_sqs/queue.rb', line 95 def with_lock do expired = @messages_in_flight.inject({}) do |memo,(receipt,)| if .expired? memo[receipt] = end memo end expired.each do |receipt,| .expire! @messages[receipt] = @messages_in_flight.delete(receipt) end end end |
#to_yaml ⇒ Object
28 29 30 31 32 33 34 |
# File 'lib/fake_sqs/queue.rb', line 28 def to_yaml { "QueueName" => name, "Arn" => arn, "Attributes" => queue_attributes, } end |
#with_lock ⇒ Object
183 184 185 186 187 |
# File 'lib/fake_sqs/queue.rb', line 183 def with_lock @lock.synchronize do yield end end |