Class: Rack::AMQP::Client::Request

Inherits:
Object
  • Object
show all
Defined in:
lib/rack/amqp/client/request.rb

Constant Summary collapse

SLEEP_INCREMENT =
attr_accessor :routing_key, :request_path, :body, :request_id, :callback_queue, :http_method

Instance Method Summary collapse

Constructor Details

#initialize(request_id, http_method, uri, body, headers = {}) ⇒ Request

Returns a new instance of Request.



11
12
13
14
15
16
17
18
19
20
21
# File 'lib/rack/amqp/client/request.rb', line 11

def initialize(request_id, http_method, uri, body, headers={})
  @callback_queue             = callback_queue
  @request_id                 = request_id
  @http_method                = http_method
  @headers                    = headers
  @body                       = body
  @routing_key, @request_path = split_uri uri
  @response                   = nil
  @mutex                      = Mutex.new
  @resource                   = ConditionVariable.new
end

Instance Method Details

#callback(delivery_info, meta, payload) ⇒ Object



32
33
34
35
36
37
# File 'lib/rack/amqp/client/request.rb', line 32

def callback(delivery_info, meta, payload)
  @mutex.synchronize do
    @resource.signal
    @response = Response.new(meta, payload, delivery_info)
  end
end

#headersObject



57
58
59
# File 'lib/rack/amqp/client/request.rb', line 57

def headers
  @headers.merge(path: request_path)
end

#payloadObject



53
54
55
# File 'lib/rack/amqp/client/request.rb', line 53

def payload
  body
end

#publishing_optionsObject



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rack/amqp/client/request.rb', line 40

def publishing_options
  {
    mandatory: true, # receive an error on routing error
    message_id: request_id,
    reply_to: callback_queue.name,
    type: http_method,
    app_id: user_agent,
    timestamp: Time.now.to_i,
    headers: headers,
    routing_key: routing_key
  }
end

#reply_wait(timeout) ⇒ Object



23
24
25
26
27
28
29
30
# File 'lib/rack/amqp/client/request.rb', line 23

def reply_wait(timeout)
  @mutex.synchronize do
    @resource.wait(@mutex, timeout)
  end
  resp = @response
  @reponse = nil
  resp
end

#split_uri(uri) ⇒ Object



65
66
67
68
69
# File 'lib/rack/amqp/client/request.rb', line 65

def split_uri(uri)
  # expecting target.queue.name/request/path?params=things&others=stuff
  parts = uri.split('/', 2)
  [parts[0].to_s, "/#{parts[1].to_s}"]
end

#user_agentObject



61
62
63
# File 'lib/rack/amqp/client/request.rb', line 61

def user_agent
  "rack-amqp-client-#{VERSION}"
end