Class: PredictionIO::Connection
- Inherits:
-
Object
- Object
- PredictionIO::Connection
- Defined in:
- lib/predictionio/connection.rb
Overview
This class handles multithreading and asynchronous requests transparently for the REST client.
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Number of connections active.
-
#packages ⇒ Object
readonly
Number of pending asynchronous request and response packages.
-
#timeout ⇒ Object
readonly
Timeout in seconds.
Instance Method Summary collapse
-
#adelete(areq) ⇒ Object
Shortcut to create an asynchronous DELETE request with the response object returned.
-
#aget(areq) ⇒ Object
Shortcut to create an asynchronous GET request with the response object returned.
-
#apost(areq) ⇒ Object
Shortcut to create an asynchronous POST request with the response object returned.
-
#initialize(uri, threads = 1, timeout = 60) ⇒ Connection
constructor
Spawns a number of threads with persistent HTTP connection to the specified URI.
-
#request(method, request) ⇒ Object
Create an asynchronous request and response package, put it in the pending queue, and return the response object.
Constructor Details
#initialize(uri, threads = 1, timeout = 60) ⇒ Connection
Spawns a number of threads with persistent HTTP connection to the specified URI. Sets a default timeout of 60 seconds.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/predictionio/connection.rb', line 32 def initialize(uri, threads = 1, timeout = 60) @packages = Queue.new @counter_lock = Mutex.new @connections = 0 @timeout = timeout threads.times do Thread.new do begin Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http| @counter_lock.synchronize do @connections += 1 end catch(:exit) do http.read_timeout = @timeout loop do package = @packages.pop request = package[:request] response = package[:response] case package[:method] when 'get' http_req = Net::HTTP::Get.new("#{uri.path}#{request.qpath}") begin response.set(http.request(http_req)) rescue Exception => details response.set(details) end when 'post' if request.params.is_a?(Hash) http_req = Net::HTTP::Post.new("#{uri.path}#{request.path}") http_req.set_form_data(request.params) else http_req = Net::HTTP::Post.new("#{uri.path}#{request.path}", initheader = { 'Content-Type' => 'application/json; charset=utf-8' }) http_req.body = request.params end begin response.set(http.request(http_req)) rescue Exception => details response.set(details) end when 'delete' http_req = Net::HTTP::Delete.new("#{uri.path}#{request.qpath}") begin response.set(http.request(http_req)) rescue Exception => details response.set(details) end when 'exit' @counter_lock.synchronize do @connections -= 1 end throw :exit end end end end rescue Exception => detail @counter_lock.synchronize do if @connections == 0 then # Use non-blocking pop to avoid dead-locking the current # thread when there is no request, and give it a chance to re-connect. begin package = @packages.pop(true) response = package[:response] response.set(detail) rescue Exception end end end sleep(1) retry end end end end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Number of connections active
25 26 27 |
# File 'lib/predictionio/connection.rb', line 25 def connections @connections end |
#packages ⇒ Object (readonly)
Number of pending asynchronous request and response packages.
22 23 24 |
# File 'lib/predictionio/connection.rb', line 22 def packages @packages end |
#timeout ⇒ Object (readonly)
Timeout in seconds
28 29 30 |
# File 'lib/predictionio/connection.rb', line 28 def timeout @timeout end |
Instance Method Details
#adelete(areq) ⇒ Object
Shortcut to create an asynchronous DELETE request with the response object returned.
125 126 127 |
# File 'lib/predictionio/connection.rb', line 125 def adelete(areq) request('delete', areq) end |
#aget(areq) ⇒ Object
Shortcut to create an asynchronous GET request with the response object returned.
115 116 117 |
# File 'lib/predictionio/connection.rb', line 115 def aget(areq) request('get', areq) end |
#apost(areq) ⇒ Object
Shortcut to create an asynchronous POST request with the response object returned.
120 121 122 |
# File 'lib/predictionio/connection.rb', line 120 def apost(areq) request('post', areq) end |
#request(method, request) ⇒ Object
Create an asynchronous request and response package, put it in the pending queue, and return the response object.
108 109 110 111 112 |
# File 'lib/predictionio/connection.rb', line 108 def request(method, request) response = AsyncResponse.new(request) @packages.push(method: method, request: request, response: response) response end |