Class: Bisques::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/bisques/queue.rb

Overview

An SQS queue

Defined Under Namespace

Classes: QueueError, QueueNotFound

Instance Method Summary collapse

Constructor Details

#initialize(client, url) ⇒ Queue

Queues are created by the Client passing the client itself and the url for the queue.

Parameters:

  • client (Client)
  • url (String)


36
37
38
# File 'lib/bisques/queue.rb', line 36

def initialize(client, url)
  @client, @url = client, url
end

Instance Method Details

#==(queue) ⇒ Object



54
55
56
# File 'lib/bisques/queue.rb', line 54

def ==(queue)
  hash == queue.hash
end

#attributes(*attributes) ⇒ Object, Hash

Return attributes for the queue. Pass in the names of the attributes to retrieve, or :All for all attributes. The available attributes can be found at docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/Query_QueryGetQueueAttributes.html

If 1 attribute is requested then just that attributes value is returned. If more than one, or all, attributes are requested then a hash of attribute names and values is returned.

Examples:

with one attribute


queue.attributes(:ApproximateNumberOfMessages) == 10

with multiple attributes


queue.attributes(:ApproximateNumberOfMessages, :ApproximateNumberOfMessagesDelayed) == {:ApproximateNumberOfMessages => 10, :ApproximateNumberOfMessagesDelayed => 5}

Parameters:

  • attributes (String)

Returns:

  • (Object, Hash)


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/bisques/queue.rb', line 81

def attributes(*attributes)
  return nil if attributes.blank?

  values = {}
  response = client.get_queue_attributes(url, attributes)

  response.doc.xpath("//Attribute").each do |attribute_element|
    name = attribute_element.xpath("Name").text
    value = attribute_element.xpath("Value").text
    value = value.to_i if value =~ /\A\d+\z/
    values[name] = value
  end

  if values.size == 1 && attributes.size == 1
    values.values.first
  else
    values
  end
end

#deleteAwsResponse

Delete the queue

Returns:

Raises:



104
105
106
# File 'lib/bisques/queue.rb', line 104

def delete
  client.delete_queue(url)
end

#delete_message(handle) ⇒ Boolean

Delete a message from the queue. This should be called to confirm that the message has been processed. If it is not called then the message will get put back on the queue after a timeout.

Parameters:

  • handle (String)

Returns:

  • (Boolean)

    true if the message was deleted.

Raises:



180
181
182
183
# File 'lib/bisques/queue.rb', line 180

def delete_message(handle)
  response = client.delete_message(url, handle)
  response.success?
end

#eql?(queue) ⇒ Boolean

Returns:

  • (Boolean)


51
52
53
# File 'lib/bisques/queue.rb', line 51

def eql?(queue)
  hash == queue.hash
end

#hashObject



57
58
59
# File 'lib/bisques/queue.rb', line 57

def hash
  @url.hash
end

#nameString

Returns The name of the queue derived from the URL.

Returns:

  • (String)

    The name of the queue derived from the URL.



41
42
43
# File 'lib/bisques/queue.rb', line 41

def name
  @url.split("/").last
end

#pathString Also known as: url

Returns The path part of the queue URL.

Returns:

  • (String)

    The path part of the queue URL



46
47
48
# File 'lib/bisques/queue.rb', line 46

def path
  Addressable::URI.parse(@url).path
end

#post_message(object) ⇒ Object

Post a message to the queue. The message must be serializable (i.e. strings, numbers, arrays, hashes).

Parameters:

  • object (String, Fixnum, Array, Hash)

Raises:



114
115
116
# File 'lib/bisques/queue.rb', line 114

def post_message(object)
  client.send_message(url, JSON.dump(object))
end

#post_messages(objects) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/bisques/queue.rb', line 118

def post_messages(objects)
  objects = objects.dup
  group = []
  while objects.any?
    group.push objects.pop

    if group.length == 10
      client.send_message_batch(url, objects.map{|obj| JSON.dump(obj)})
    end
  end

  if group.length > 0
    client.send_message_batch(url, objects.map{|obj| JSON.dump(obj)})
  end

  nil
end

#retrieve(poll_time = 1) ⇒ Message?

Retrieve a message from the queue. Returns nil if no message is waiting in the given poll time. Otherwise it returns a Message.

Parameters:

  • poll_time (Fixnum) (defaults to: 1)

Returns:

Raises:



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/bisques/queue.rb', line 142

def retrieve(poll_time = 1)
  response = client.receive_message(url, {"WaitTimeSeconds" => poll_time, "MaxNumberOfMessages" => 1})
  raise QueueNotFound.new(self, "not found at #{url}") if response.http_response.status == 404

  response.doc.xpath("//Message").map do |element|
    attributes = Hash[*element.xpath("Attribute").map do |attr_element|
      [attr_element.xpath("Name").text, attr_element.xpath("Value").text]
    end.flatten]

    Message.new(self, element.xpath("MessageId").text,
                element.xpath("ReceiptHandle").text,
                element.xpath("Body").text,
                attributes
               )
  end.first
end

#retrieve_one(poll_time = 5) ⇒ Message

Retrieve a single message from the queue. This will block until a message arrives. The message will be of the class Message.

Parameters:

  • poll_time (Fixnum) (defaults to: 5)

Returns:

Raises:



165
166
167
168
169
170
171
# File 'lib/bisques/queue.rb', line 165

def retrieve_one(poll_time = 5)
  object = nil
  while object.nil?
    object = retrieve(poll_time)
  end
  object
end

#return_message(handle) ⇒ AwsResponse

Return a message to the queue after receiving it. This would typically happen if the receiver decided it couldn’t process.

Parameters:

  • handle (String)

Returns:

Raises:



191
192
193
# File 'lib/bisques/queue.rb', line 191

def return_message(handle)
  client.change_message_visibility(url, handle, 0)
end