Class: Rack::AMQP::Client::Request
- Inherits:
-
Object
- Object
- Rack::AMQP::Client::Request
- Defined in:
- lib/rack/amqp/client/request.rb
Constant Summary collapse
- SLEEP_INCREMENT =
attr_accessor :routing_key, :request_path, :body, :request_id, :callback_queue, :http_method
Instance Method Summary collapse
- #callback(delivery_info, meta, payload) ⇒ Object
- #headers ⇒ Object
-
#initialize(request_id, http_method, uri, body, headers = {}) ⇒ Request
constructor
A new instance of Request.
- #payload ⇒ Object
- #publishing_options ⇒ Object
- #reply_wait(timeout) ⇒ Object
- #split_uri(uri) ⇒ Object
- #user_agent ⇒ Object
Constructor Details
#initialize(request_id, http_method, uri, body, headers = {}) ⇒ Request
Returns a new instance of Request.
11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/rack/amqp/client/request.rb', line 11 def initialize(request_id, http_method, uri, body, headers={}) @callback_queue = callback_queue @request_id = request_id @http_method = http_method @headers = headers @body = body @routing_key, @request_path = split_uri uri @response = nil @mutex = Mutex.new @resource = ConditionVariable.new end |
Instance Method Details
#callback(delivery_info, meta, payload) ⇒ Object
32 33 34 35 36 37 |
# File 'lib/rack/amqp/client/request.rb', line 32 def callback(delivery_info, , payload) @mutex.synchronize do @resource.signal @response = Response.new(, payload, delivery_info) end end |
#headers ⇒ Object
57 58 59 |
# File 'lib/rack/amqp/client/request.rb', line 57 def headers @headers.merge(path: request_path) end |
#payload ⇒ Object
53 54 55 |
# File 'lib/rack/amqp/client/request.rb', line 53 def payload body end |
#publishing_options ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/rack/amqp/client/request.rb', line 40 def { mandatory: true, # receive an error on routing error message_id: request_id, reply_to: callback_queue.name, type: http_method, app_id: user_agent, timestamp: Time.now.to_i, headers: headers, routing_key: routing_key } end |
#reply_wait(timeout) ⇒ Object
23 24 25 26 27 28 29 30 |
# File 'lib/rack/amqp/client/request.rb', line 23 def reply_wait(timeout) @mutex.synchronize do @resource.wait(@mutex, timeout) end resp = @response @reponse = nil resp end |
#split_uri(uri) ⇒ Object
65 66 67 68 69 |
# File 'lib/rack/amqp/client/request.rb', line 65 def split_uri(uri) # expecting target.queue.name/request/path?params=things&others=stuff parts = uri.split('/', 2) [parts[0].to_s, "/#{parts[1].to_s}"] end |
#user_agent ⇒ Object
61 62 63 |
# File 'lib/rack/amqp/client/request.rb', line 61 def user_agent "rack-amqp-client-#{VERSION}" end |