Module: Hatetepe::Client

Extended by:
VerbMethods
Includes:
VerbMethods, Connection
Defined in:
lib/hatetepe/client.rb

Defined Under Namespace

Modules: VerbMethods Classes: Job

Constant Summary collapse

CONFIG_DEFAULTS =

The default configuration.

{
  :timeout         => 5,
  :connect_timeout => 5
}

Instance Attribute Summary collapse

Attributes included from Connection

#processing_enabled

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Connection

#close_connection, #closed?, #closed_by_connect_timeout?, #closed_by_remote?, #closed_by_self?, #closed_by_timeout?, #comm_inactivity_timeout=, #connected?, #connection_completed, #pending_connect_timeout=, #remote_address, #remote_port, #sockaddr

Instance Attribute Details

#appObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The pipe of middleware and request transmission/response reception.



32
33
34
# File 'lib/hatetepe/client.rb', line 32

def app
  @app
end

#configObject (readonly)

The configuration for this Client instance.



27
28
29
# File 'lib/hatetepe/client.rb', line 27

def config
  @config
end

Class Method Details

.request(verb, uri, headers = {}, body = []) ⇒ Object



189
190
191
192
193
194
195
# File 'lib/hatetepe/client.rb', line 189

def self.request(verb, uri, headers = {}, body = [])
  uri    = URI(uri)
  client = start(host: uri.host, port: uri.port)
  client.request(verb, uri, headers, body)
ensure
  client.stop
end

.start(config) ⇒ Hatetepe::Client

Starts a new Client.

Parameters:

  • config (Hash)

    The :host and :port the Client should connect to.

Returns:



184
185
186
# File 'lib/hatetepe/client.rb', line 184

def self.start(config)
  EM.connect(config[:host], config[:port], self, config)
end

Instance Method Details

#<<(request) ⇒ Object

Sends a request and waits for the response without blocking.

Transmission and reception are performed within a separate Fiber. #succeed and #fail will be called on the request passing the response, depending on whether the response indicates success (100-399) or failure (400-599).

The request will #fail with a nil response if the connection was closed for whatever reason.

TODO find out if there are more cases where the response body

should automatically be closed.


102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/hatetepe/client.rb', line 102

def <<(request)
  Fiber.new do
    response = @app.call(request)

    if response && (request.verb == "HEAD" || response.status == 204)
      response.body.close_write
    end

    if !response
      request.fail(nil, self)
    elsif response.failure?
      request.fail(response)
    else
      request.succeed(response)
    end
  end.resume
end

#initialize(config) ⇒ Object

Initializes a new Client instance.

Parameters:

  • config (Hash)

    Configuration values that overwrite the defaults.



40
41
42
# File 'lib/hatetepe/client.rb', line 40

def initialize(config)
  @config = CONFIG_DEFAULTS.merge(config)
end

#post_initObject

Initializes the parser, request queue, and middleware pipe.

TODO: Use Rack::Builder for building the app pipe.

See Also:

  • EM::Connection#post_init


51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/hatetepe/client.rb', line 51

def post_init
  @builder, @parser   =  Hatetepe::Builder.new, Hatetepe::Parser.new
  @builder.on_write   << method(:send_data)
  # @builder.on_write {|data| p "|--> #{data}" }
  @parser.on_response << method(:receive_response)

  @queue = []

  @app = proc {|request| send_request(request) }

  self.comm_inactivity_timeout = config[:timeout]
  self.pending_connect_timeout = config[:connect_timeout]
end

#receive_data(data) ⇒ Object

Feeds response data into the parser.

Parameters:

  • data (String)

    The received data that’s gonna be fed into the parser.

See Also:

  • EM::Connection#receive_data


73
74
75
76
# File 'lib/hatetepe/client.rb', line 73

def receive_data(data)
  # p "|<-- #{data}"
  @parser << data
end

#receive_response(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Relates an incoming response to the corresponding request.

Supports the response bit of HTTP pipelining by relating responses to requests in the order the requests were sent.

TODO: raise a more meaningful error.

Parameters:

Raises:

  • (RuntimeError)

    There is no request that’s waiting for a response.



251
252
253
254
255
256
257
258
259
260
# File 'lib/hatetepe/client.rb', line 251

def receive_response(response)
  query = proc {|j| j.response.nil? }

  if job = @queue.find(&query)
    job.response = response
    job.fiber.resume
  else
    raise "Received response but didn't expect one: #{response.status}"
  end
end

#request(verb, uri, headers = {}, body = []) ⇒ Hatetepe::Response?

Builds a Request, sends it, and blocks while waiting for the response.

Parameters:

  • verb (Symbol, String)

    The HTTP method verb, e.g. :get or “PUT”.

  • uri (String, URI)

    The request URI.

  • headers (Hash) (defaults to: {})

    (optional) The request headers.

  • body (#each) (defaults to: [])

    (optional) A request body object whose #each method yields objects that respond to #to_s.

Returns:



135
136
137
138
139
# File 'lib/hatetepe/client.rb', line 135

def request(verb, uri, headers = {}, body = [])
  request =  Hatetepe::Request.new(verb, URI(uri), headers, body)
  self    << request
  EM::Synchrony.sync(request)
end

#send_request(request) ⇒ Hatetepe::Response?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Feeds the request into the builder and blocks while waiting for the response to arrive.

Supports the request bit of HTTP pipelining by waiting until the previous request has been sent.

Parameters:

Returns:

  • (Hatetepe::Response, nil)

    The received response or nil if the connection has been closed before receiving a response.



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/hatetepe/client.rb', line 211

def send_request(request)
  previous =  @queue.last
  current  =  Job.new(Fiber.current, request, false)
  @queue   << current

  # wait for the previous request to be sent
  while previous && !previous.sent
    return if Fiber.yield == :kill
  end

  # send the request
  self.comm_inactivity_timeout = 0
  @builder.request(request.to_a)
  current.sent = true
  self.comm_inactivity_timeout = config[:timeout]

  # wait for the response
  while !current.response
    return if Fiber.yield == :kill
  end

  # clean up and return response
  @queue.delete(current)
  current.response
end

#stopObject

Gracefully stops the client.

Waits for all requests to finish and then stops the client.



146
147
148
149
# File 'lib/hatetepe/client.rb', line 146

def stop
  wait
  stop!
end

#stop!Object

Immediately stops the client by closing the connection.

This will lead to EventMachine’s event loop calling #unbind, which fail all outstanding requests.

See Also:



159
160
161
# File 'lib/hatetepe/client.rb', line 159

def stop!
  close_connection
end

#unbind(reason) ⇒ Object

Aborts all outstanding requests.

See Also:

  • EM::Connection#unbind


83
84
85
86
# File 'lib/hatetepe/client.rb', line 83

def unbind(reason)
  super
  @queue.each {|job| job.fiber.resume(:kill) }
end

#waitObject

Blocks until the last request has finished receiving its response.

Returns immediately if there are no outstanding requests.



168
169
170
171
172
173
# File 'lib/hatetepe/client.rb', line 168

def wait
  if job = @queue.last
    EM::Synchrony.sync(job.request)
    EM::Synchrony.sync(job.response.body) if job.response
  end
end