Class: EM::Mongo::EMConnection
- Inherits:
-
Connection
- Object
- Connection
- EM::Mongo::EMConnection
- Includes:
- Deferrable
- Defined in:
- lib/em-mongo/connection.rb
Defined Under Namespace
Classes: Error
Constant Summary collapse
- MAX_RETRIES =
5
- RESERVED =
0
- STANDARD_HEADER_SIZE =
16
- RESPONSE_HEADER_SIZE =
20
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Class Method Summary collapse
Instance Method Summary collapse
-
#build_last_error_message(message, db_name, opts) ⇒ Object
Connection#send_message_with_safe_check.
- #close ⇒ Object
- #connected? ⇒ Boolean
- #connection_completed ⇒ Object
-
#initialize(options = {}) ⇒ EMConnection
constructor
EM hooks.
- #message_headers(operation, request_id, message) ⇒ Object
- #message_received?(buffer) ⇒ Boolean
- #new_request_id ⇒ Object
- #next_response ⇒ Object
- #peek_size(buffer) ⇒ Object
-
#prepare_message(op, message, options = {}) ⇒ Object
MongoDB Commands.
- #prepare_safe_message(message, options) ⇒ Object
- #receive_data(data) ⇒ Object
- #remaining_bytes(buffer) ⇒ Object
- #responses_pending? ⇒ Boolean
- #send_command(op, message, options = {}, &cb) ⇒ Object
- #slave_ok? ⇒ Boolean
- #unbind ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ EMConnection
EM hooks
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/em-mongo/connection.rb', line 111 def initialize(={}) @request_id = 0 @retries = 0 @responses = {} @is_connected = false @host = [:host] || DEFAULT_IP @port = [:port] || DEFAULT_PORT @on_unbind = [:unbind_cb] || proc {} @reconnect_in = [:reconnect_in]|| false @slave_ok = [:slave_ok] || false @on_close = proc { raise Error, "failure with mongodb server #{@host}:#{@port}" } timeout [:timeout] if [:timeout] errback { @on_close.call } end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
47 48 49 |
# File 'lib/em-mongo/connection.rb', line 47 def connection @connection end |
Class Method Details
.connect(host = DEFAULT_IP, port = DEFAULT_PORT, timeout = nil, opts = nil) ⇒ Object
129 130 131 132 |
# File 'lib/em-mongo/connection.rb', line 129 def self.connect(host = DEFAULT_IP, port = DEFAULT_PORT, timeout = nil, opts = nil) opt = {:host => host, :port => port, :timeout => timeout, :reconnect_in => false}.merge(opts) EM.connect(host, port, self, opt) end |
Instance Method Details
#build_last_error_message(message, db_name, opts) ⇒ Object
Connection#send_message_with_safe_check.
Because it modifies message by reference, we don’t need to return it.
222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/em-mongo/connection.rb', line 222 def (, db_name, opts) .put_int(0) BSON::BSON_RUBY.serialize_cstr(, "#{db_name}.$cmd") .put_int(0) .put_int(-1) cmd = BSON::OrderedHash.new cmd[:getlasterror] = 1 if opts.is_a?(Hash) opts.assert_valid_keys(:w, :wtimeout, :fsync) cmd.merge!(opts) end .put_binary(BSON::BSON_CODER.serialize(cmd, false).to_s) nil end |
#close ⇒ Object
209 210 211 212 213 214 215 216 |
# File 'lib/em-mongo/connection.rb', line 209 def close @on_close = proc { yield if block_given? } if @responses.empty? close_connection_after_writing else @close_pending = true end end |
#connected? ⇒ Boolean
53 54 55 |
# File 'lib/em-mongo/connection.rb', line 53 def connected? @is_connected end |
#connection_completed ⇒ Object
134 135 136 137 138 139 |
# File 'lib/em-mongo/connection.rb', line 134 def connection_completed @buffer = BSON::ByteBuffer.new @is_connected = true @retries = 0 succeed end |
#message_headers(operation, request_id, message) ⇒ Object
90 91 92 93 94 95 96 97 |
# File 'lib/em-mongo/connection.rb', line 90 def (operation, request_id, ) headers = BSON::ByteBuffer.new headers.put_int(16 + .size) headers.put_int(request_id) headers.put_int(0) headers.put_int(operation) headers end |
#message_received?(buffer) ⇒ Boolean
141 142 143 144 |
# File 'lib/em-mongo/connection.rb', line 141 def (buffer) x= remaining_bytes(@buffer) x > STANDARD_HEADER_SIZE && x >= peek_size(@buffer) end |
#new_request_id ⇒ Object
57 58 59 |
# File 'lib/em-mongo/connection.rb', line 57 def new_request_id @request_id += 1 end |
#next_response ⇒ Object
182 183 184 |
# File 'lib/em-mongo/connection.rb', line 182 def next_response() ServerResponse.new(@buffer, self) end |
#peek_size(buffer) ⇒ Object
150 151 152 153 154 155 |
# File 'lib/em-mongo/connection.rb', line 150 def peek_size(buffer) position= buffer.position size= buffer.get_int buffer.position= position size end |
#prepare_message(op, message, options = {}) ⇒ Object
MongoDB Commands
67 68 69 70 71 72 |
# File 'lib/em-mongo/connection.rb', line 67 def (op, , ={}) req_id = new_request_id .prepend!((op, req_id, )) req_id = (,) if [:safe] [req_id, .to_s] end |
#prepare_safe_message(message, options) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/em-mongo/connection.rb', line 74 def (,) db_name = [:db_name] unless db_name raise( ArgumentError, "You must include the :db_name option when :safe => true" ) end last_error_params = [:last_error_params] || false = BSON::ByteBuffer.new (, db_name, last_error_params) last_error_id = new_request_id .prepend!((EM::Mongo::OP_QUERY, last_error_id, )) .append!() last_error_id end |
#receive_data(data) ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/em-mongo/connection.rb', line 157 def receive_data(data) @buffer.append!(data) @buffer.rewind while (@buffer) response = next_response callback = @responses.delete(response.response_to) callback.call(response) if callback end if @buffer.more? if @buffer.position > 0 remaining_bytes= @buffer.size-@buffer.position @buffer = BSON::ByteBuffer.new(@buffer.to_s[@buffer.position,remaining_bytes]) @buffer.rewind end else @buffer.clear end close_connection if @close_pending && @responses.empty? end |
#remaining_bytes(buffer) ⇒ Object
146 147 148 |
# File 'lib/em-mongo/connection.rb', line 146 def remaining_bytes(buffer) buffer.size-buffer.position end |
#responses_pending? ⇒ Boolean
49 50 51 |
# File 'lib/em-mongo/connection.rb', line 49 def responses_pending? @responses.size >= 1 end |
#send_command(op, message, options = {}, &cb) ⇒ Object
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/em-mongo/connection.rb', line 99 def send_command(op, , ={}, &cb) request_id, buffer = (op, , ) callback do send_data buffer end @responses[request_id] = cb if cb request_id end |
#slave_ok? ⇒ Boolean
61 62 63 |
# File 'lib/em-mongo/connection.rb', line 61 def slave_ok? @slave_ok end |
#unbind ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/em-mongo/connection.rb', line 186 def unbind if @is_connected @responses.values.each { |resp| resp.call(:disconnected) } @request_id = 0 @responses = {} end @is_connected = false set_deferred_status(nil) if @reconnect_in && @retries < MAX_RETRIES EM.add_timer(@reconnect_in) { reconnect(@host, @port) } elsif @on_unbind @on_unbind.call else raise "Connection to Mongo Lost" end @retries += 1 end |