Class: Bisques::Queue
- Inherits:
-
Object
- Object
- Bisques::Queue
- Defined in:
- lib/bisques/queue.rb
Overview
An SQS queue
Defined Under Namespace
Classes: QueueError, QueueNotFound
Instance Method Summary collapse
- #==(queue) ⇒ Object
-
#attributes(*attributes) ⇒ Object, Hash
Return attributes for the queue.
-
#delete ⇒ AwsResponse
Delete the queue.
-
#delete_message(handle) ⇒ Boolean
Delete a message from the queue.
- #eql?(queue) ⇒ Boolean
- #hash ⇒ Object
-
#initialize(client, url) ⇒ Queue
constructor
Queues are created by the Client passing the client itself and the url for the queue.
-
#name ⇒ String
The name of the queue derived from the URL.
-
#path ⇒ String
(also: #url)
The path part of the queue URL.
-
#post_message(object) ⇒ Object
Post a message to the queue.
- #post_messages(objects) ⇒ Object
-
#retrieve(poll_time = 1) ⇒ Message?
Retrieve a message from the queue.
-
#retrieve_one(poll_time = 5) ⇒ Message
Retrieve a single message from the queue.
-
#return_message(handle) ⇒ AwsResponse
Return a message to the queue after receiving it.
Constructor Details
Instance Method Details
#==(queue) ⇒ Object
54 55 56 |
# File 'lib/bisques/queue.rb', line 54 def ==(queue) hash == queue.hash end |
#attributes(*attributes) ⇒ Object, Hash
Return attributes for the queue. Pass in the names of the attributes to retrieve, or :All for all attributes. The available attributes can be found at docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/Query_QueryGetQueueAttributes.html
If 1 attribute is requested then just that attributes value is returned. If more than one, or all, attributes are requested then a hash of attribute names and values is returned.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/bisques/queue.rb', line 81 def attributes(*attributes) return nil if attributes.blank? values = {} response = client.get_queue_attributes(url, attributes) response.doc.xpath("//Attribute").each do |attribute_element| name = attribute_element.xpath("Name").text value = attribute_element.xpath("Value").text value = value.to_i if value =~ /\A\d+\z/ values[name] = value end if values.size == 1 && attributes.size == 1 values.values.first else values end end |
#delete ⇒ AwsResponse
Delete the queue
104 105 106 |
# File 'lib/bisques/queue.rb', line 104 def delete client.delete_queue(url) end |
#delete_message(handle) ⇒ Boolean
Delete a message from the queue. This should be called to confirm that the message has been processed. If it is not called then the message will get put back on the queue after a timeout.
180 181 182 183 |
# File 'lib/bisques/queue.rb', line 180 def (handle) response = client.(url, handle) response.success? end |
#eql?(queue) ⇒ Boolean
51 52 53 |
# File 'lib/bisques/queue.rb', line 51 def eql?(queue) hash == queue.hash end |
#hash ⇒ Object
57 58 59 |
# File 'lib/bisques/queue.rb', line 57 def hash @url.hash end |
#name ⇒ String
Returns The name of the queue derived from the URL.
41 42 43 |
# File 'lib/bisques/queue.rb', line 41 def name @url.split("/").last end |
#path ⇒ String Also known as: url
Returns The path part of the queue URL.
46 47 48 |
# File 'lib/bisques/queue.rb', line 46 def path Addressable::URI.parse(@url).path end |
#post_message(object) ⇒ Object
Post a message to the queue. The message must be serializable (i.e. strings, numbers, arrays, hashes).
114 115 116 |
# File 'lib/bisques/queue.rb', line 114 def (object) client.(url, JSON.dump(object)) end |
#post_messages(objects) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/bisques/queue.rb', line 118 def (objects) objects = objects.dup group = [] while objects.any? group.push objects.pop if group.length == 10 client.(url, objects.map{|obj| JSON.dump(obj)}) end end if group.length > 0 client.(url, objects.map{|obj| JSON.dump(obj)}) end nil end |
#retrieve(poll_time = 1) ⇒ Message?
Retrieve a message from the queue. Returns nil if no message is waiting in the given poll time. Otherwise it returns a Message.
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/bisques/queue.rb', line 142 def retrieve(poll_time = 1) response = client.(url, {"WaitTimeSeconds" => poll_time, "MaxNumberOfMessages" => 1}) raise QueueNotFound.new(self, "not found at #{url}") if response.http_response.status == 404 response.doc.xpath("//Message").map do |element| attributes = Hash[*element.xpath("Attribute").map do |attr_element| [attr_element.xpath("Name").text, attr_element.xpath("Value").text] end.flatten] Message.new(self, element.xpath("MessageId").text, element.xpath("ReceiptHandle").text, element.xpath("Body").text, attributes ) end.first end |
#retrieve_one(poll_time = 5) ⇒ Message
Retrieve a single message from the queue. This will block until a message arrives. The message will be of the class Message.
165 166 167 168 169 170 171 |
# File 'lib/bisques/queue.rb', line 165 def retrieve_one(poll_time = 5) object = nil while object.nil? object = retrieve(poll_time) end object end |
#return_message(handle) ⇒ AwsResponse
Return a message to the queue after receiving it. This would typically happen if the receiver decided it couldn’t process.
191 192 193 |
# File 'lib/bisques/queue.rb', line 191 def (handle) client.(url, handle, 0) end |