Module: Hatetepe::Client
Defined Under Namespace
Modules: VerbMethods Classes: Job
Constant Summary collapse
- CONFIG_DEFAULTS =
The default configuration.
{ :timeout => 5, :connect_timeout => 5 }
Instance Attribute Summary collapse
-
#app ⇒ Object
readonly
private
The pipe of middleware and request transmission/response reception.
-
#config ⇒ Object
readonly
The configuration for this Client instance.
Attributes included from Connection
Class Method Summary collapse
- .request(verb, uri, headers = {}, body = []) ⇒ Object
-
.start(config) ⇒ Hatetepe::Client
Starts a new Client.
Instance Method Summary collapse
-
#<<(request) ⇒ Object
Sends a request and waits for the response without blocking.
-
#initialize(config) ⇒ Object
Initializes a new Client instance.
-
#post_init ⇒ Object
Initializes the parser, request queue, and middleware pipe.
-
#receive_data(data) ⇒ Object
Feeds response data into the parser.
-
#receive_response(response) ⇒ Object
private
Relates an incoming response to the corresponding request.
-
#request(verb, uri, headers = {}, body = []) ⇒ Hatetepe::Response?
Builds a
Request
, sends it, and blocks while waiting for the response. -
#send_request(request) ⇒ Hatetepe::Response?
private
Feeds the request into the builder and blocks while waiting for the response to arrive.
-
#stop ⇒ Object
Gracefully stops the client.
-
#stop! ⇒ Object
Immediately stops the client by closing the connection.
-
#unbind(reason) ⇒ Object
Aborts all outstanding requests.
-
#wait ⇒ Object
Blocks until the last request has finished receiving its response.
Methods included from Connection
#close_connection, #closed?, #closed_by_connect_timeout?, #closed_by_remote?, #closed_by_self?, #closed_by_timeout?, #comm_inactivity_timeout=, #connected?, #connection_completed, #pending_connect_timeout=, #remote_address, #remote_port, #sockaddr
Instance Attribute Details
#app ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The pipe of middleware and request transmission/response reception.
32 33 34 |
# File 'lib/hatetepe/client.rb', line 32 def app @app end |
#config ⇒ Object (readonly)
The configuration for this Client instance.
27 28 29 |
# File 'lib/hatetepe/client.rb', line 27 def config @config end |
Class Method Details
.request(verb, uri, headers = {}, body = []) ⇒ Object
189 190 191 192 193 194 195 |
# File 'lib/hatetepe/client.rb', line 189 def self.request(verb, uri, headers = {}, body = []) uri = URI(uri) client = start(host: uri.host, port: uri.port) client.request(verb, uri, headers, body) ensure client.stop end |
.start(config) ⇒ Hatetepe::Client
Starts a new Client.
184 185 186 |
# File 'lib/hatetepe/client.rb', line 184 def self.start(config) EM.connect(config[:host], config[:port], self, config) end |
Instance Method Details
#<<(request) ⇒ Object
Sends a request and waits for the response without blocking.
Transmission and reception are performed within a separate Fiber
. #succeed
and #fail
will be called on the request
passing the response, depending on whether the response indicates success (100-399) or failure (400-599).
The request will #fail
with a nil
response if the connection was closed for whatever reason.
TODO find out if there are more cases where the response body
should automatically be closed.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/hatetepe/client.rb', line 102 def <<(request) Fiber.new do response = @app.call(request) if response && (request.verb == "HEAD" || response.status == 204) response.body.close_write end if !response request.fail(nil, self) elsif response.failure? request.fail(response) else request.succeed(response) end end.resume end |
#initialize(config) ⇒ Object
Initializes a new Client instance.
40 41 42 |
# File 'lib/hatetepe/client.rb', line 40 def initialize(config) @config = CONFIG_DEFAULTS.merge(config) end |
#post_init ⇒ Object
Initializes the parser, request queue, and middleware pipe.
TODO: Use Rack::Builder
for building the app pipe.
51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/hatetepe/client.rb', line 51 def post_init @builder, @parser = Hatetepe::Builder.new, Hatetepe::Parser.new @builder.on_write << method(:send_data) # @builder.on_write {|data| p "|--> #{data}" } @parser.on_response << method(:receive_response) @queue = [] @app = proc {|request| send_request(request) } self.comm_inactivity_timeout = config[:timeout] self.pending_connect_timeout = config[:connect_timeout] end |
#receive_data(data) ⇒ Object
Feeds response data into the parser.
73 74 75 76 |
# File 'lib/hatetepe/client.rb', line 73 def receive_data(data) # p "|<-- #{data}" @parser << data end |
#receive_response(response) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Relates an incoming response to the corresponding request.
Supports the response bit of HTTP pipelining by relating responses to requests in the order the requests were sent.
TODO: raise a more meaningful error.
251 252 253 254 255 256 257 258 259 260 |
# File 'lib/hatetepe/client.rb', line 251 def receive_response(response) query = proc {|j| j.response.nil? } if job = @queue.find(&query) job.response = response job.fiber.resume else raise "Received response but didn't expect one: #{response.status}" end end |
#request(verb, uri, headers = {}, body = []) ⇒ Hatetepe::Response?
Builds a Request
, sends it, and blocks while waiting for the response.
135 136 137 138 139 |
# File 'lib/hatetepe/client.rb', line 135 def request(verb, uri, headers = {}, body = []) request = Hatetepe::Request.new(verb, URI(uri), headers, body) self << request EM::Synchrony.sync(request) end |
#send_request(request) ⇒ Hatetepe::Response?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Feeds the request into the builder and blocks while waiting for the response to arrive.
Supports the request bit of HTTP pipelining by waiting until the previous request has been sent.
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/hatetepe/client.rb', line 211 def send_request(request) previous = @queue.last current = Job.new(Fiber.current, request, false) @queue << current # wait for the previous request to be sent while previous && !previous.sent return if Fiber.yield == :kill end # send the request self.comm_inactivity_timeout = 0 @builder.request(request.to_a) current.sent = true self.comm_inactivity_timeout = config[:timeout] # wait for the response while !current.response return if Fiber.yield == :kill end # clean up and return response @queue.delete(current) current.response end |
#stop ⇒ Object
Gracefully stops the client.
Waits for all requests to finish and then stops the client.
146 147 148 149 |
# File 'lib/hatetepe/client.rb', line 146 def stop wait stop! end |
#stop! ⇒ Object
Immediately stops the client by closing the connection.
This will lead to EventMachine’s event loop calling #unbind, which fail all outstanding requests.
159 160 161 |
# File 'lib/hatetepe/client.rb', line 159 def stop! close_connection end |
#unbind(reason) ⇒ Object
Aborts all outstanding requests.
83 84 85 86 |
# File 'lib/hatetepe/client.rb', line 83 def unbind(reason) super @queue.each {|job| job.fiber.resume(:kill) } end |
#wait ⇒ Object
Blocks until the last request has finished receiving its response.
Returns immediately if there are no outstanding requests.
168 169 170 171 172 173 |
# File 'lib/hatetepe/client.rb', line 168 def wait if job = @queue.last EM::Synchrony.sync(job.request) EM::Synchrony.sync(job.response.body) if job.response end end |