Class: FakeSQS::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/fake_sqs/queue.rb

Constant Summary collapse

VISIBILITY_TIMEOUT =
30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Queue

Returns a new instance of Queue.



18
19
20
21
22
23
24
25
26
# File 'lib/fake_sqs/queue.rb', line 18

def initialize(options = {})
  @message_factory = options.fetch(:message_factory)

  @name = options.fetch("QueueName")
  @arn = options.fetch("Arn") { "arn:aws:sqs:us-east-1:#{SecureRandom.hex}:#{@name}" }
  @queue_attributes = options.fetch("Attributes") { {} }
  @lock = Monitor.new
  reset
end

Instance Attribute Details

#arnObject (readonly)

Returns the value of attribute arn.



16
17
18
# File 'lib/fake_sqs/queue.rb', line 16

def arn
  @arn
end

#message_factoryObject (readonly)

Returns the value of attribute message_factory.



16
17
18
# File 'lib/fake_sqs/queue.rb', line 16

def message_factory
  @message_factory
end

#nameObject (readonly)

Returns the value of attribute name.



16
17
18
# File 'lib/fake_sqs/queue.rb', line 16

def name
  @name
end

#queue_attributesObject (readonly)

Returns the value of attribute queue_attributes.



16
17
18
# File 'lib/fake_sqs/queue.rb', line 16

def queue_attributes
  @queue_attributes
end

Instance Method Details

#add_queue_attributes(attrs) ⇒ Object



36
37
38
# File 'lib/fake_sqs/queue.rb', line 36

def add_queue_attributes(attrs)
  queue_attributes.merge!(attrs)
end

#attributesObject



40
41
42
43
44
45
46
# File 'lib/fake_sqs/queue.rb', line 40

def attributes
  queue_attributes.merge(
    "QueueArn" => arn,
    "ApproximateNumberOfMessages" => published_size,
    "ApproximateNumberOfMessagesNotVisible" => @messages_in_flight.size,
  )
end

#change_message_visibility(receipt, visibility) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fake_sqs/queue.rb', line 111

def change_message_visibility(receipt, visibility)
  with_lock do
    message = @messages_in_flight[receipt]
    raise MessageNotInflight unless message

    if visibility == 0
      message.expire!
      @messages[receipt] = message
      @messages_in_flight.delete(receipt)
    else
      message.expire_at(visibility)
    end
  end
end

#check_message_for_dlq(message, options = {}) ⇒ Object



126
127
128
129
130
131
132
133
134
135
# File 'lib/fake_sqs/queue.rb', line 126

def check_message_for_dlq(message, options={})
  if redrive_policy = queue_attributes["RedrivePolicy"] && JSON.parse(queue_attributes["RedrivePolicy"])
    dlq = options[:queues].list.find{|queue| queue.arn == redrive_policy["deadLetterTargetArn"]}
    if dlq && message.approximate_receive_count >= redrive_policy["maxReceiveCount"].to_i
      dlq.send_message(message: message)
      message.expire!
      true
    end
  end
end

#default_visibility_timeoutObject



87
88
89
90
91
92
93
# File 'lib/fake_sqs/queue.rb', line 87

def default_visibility_timeout
  if value = attributes['VisibilityTimeout']
    value.to_i
  else
    VISIBILITY_TIMEOUT
  end
end

#delete_message(receipt) ⇒ Object



137
138
139
140
141
142
# File 'lib/fake_sqs/queue.rb', line 137

def delete_message(receipt)
  with_lock do
    @messages.delete(receipt)
    @messages_in_flight.delete(receipt)
  end
end

#expireObject



152
153
154
155
156
157
158
# File 'lib/fake_sqs/queue.rb', line 152

def expire
  with_lock do
    @messages.merge!(@messages_in_flight)
    @messages_in_flight.clear()
    reset_messages_in_flight
  end
end

#messagesObject



167
168
169
# File 'lib/fake_sqs/queue.rb', line 167

def messages
  @messages_view
end

#messages_in_flightObject



171
172
173
# File 'lib/fake_sqs/queue.rb', line 171

def messages_in_flight
  @messages_in_flight_view
end

#published_sizeObject



179
180
181
# File 'lib/fake_sqs/queue.rb', line 179

def published_size
  @messages.values.select { |m| m.published? }.size
end

#receive_message(options = {}) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fake_sqs/queue.rb', line 58

def receive_message(options = {})
  amount = Integer options.fetch("MaxNumberOfMessages") { "1" }
  visibility_timeout = Integer options.fetch("VisibilityTimeout") { default_visibility_timeout }

  fail ReadCountOutOfRange, amount if amount > 10

  return {} if @messages.empty?

  result = {}

  with_lock do
    actual_amount = amount > published_size ? published_size : amount
    published_messages = @messages.values.select { |m| m.published? }

    actual_amount.times do
      message = published_messages.delete_at(rand(published_size))
      @messages.delete(message.receipt)
      unless check_message_for_dlq(message, options)
        message.expire_at(visibility_timeout)
        message.receive!
        @messages_in_flight[message.receipt] = message
        result[message.receipt] = message
      end
    end
  end

  result
end

#resetObject



144
145
146
147
148
149
150
# File 'lib/fake_sqs/queue.rb', line 144

def reset
  with_lock do
    @messages = {}
    @messages_view = FakeSQS::CollectionView.new(@messages)
    reset_messages_in_flight
  end
end

#reset_messages_in_flightObject



160
161
162
163
164
165
# File 'lib/fake_sqs/queue.rb', line 160

def reset_messages_in_flight
  with_lock do
    @messages_in_flight = {}
    @messages_in_flight_view = FakeSQS::CollectionView.new(@messages_in_flight)
  end
end

#send_message(options = {}) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/fake_sqs/queue.rb', line 48

def send_message(options = {})
  with_lock do
    message = options.fetch(:message){ message_factory.new(options) }
    if message
      @messages[message.receipt] = message
    end
    message
  end
end

#sizeObject



175
176
177
# File 'lib/fake_sqs/queue.rb', line 175

def size
  @messages.size
end

#timeout_messages!Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fake_sqs/queue.rb', line 95

def timeout_messages!
  with_lock do
    expired = @messages_in_flight.inject({}) do |memo,(receipt,message)|
      if message.expired?
        memo[receipt] = message
      end
      memo
    end
    expired.each do |receipt,message|
      message.expire!
      @messages[receipt] = message
      @messages_in_flight.delete(receipt)
    end
  end
end

#to_yamlObject



28
29
30
31
32
33
34
# File 'lib/fake_sqs/queue.rb', line 28

def to_yaml
  {
    "QueueName" => name,
    "Arn" => arn,
    "Attributes" => queue_attributes,
  }
end

#with_lockObject



183
184
185
186
187
# File 'lib/fake_sqs/queue.rb', line 183

def with_lock
  @lock.synchronize do
    yield
  end
end