Class: EM::Mongo::EMConnection

Inherits:
Connection
  • Object
show all
Includes:
Deferrable
Defined in:
lib/em-mongo/connection.rb

Defined Under Namespace

Classes: Error

Constant Summary collapse

MAX_RETRIES =
5
RESERVED =
0
STANDARD_HEADER_SIZE =
16
RESPONSE_HEADER_SIZE =
20

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ EMConnection

EM hooks



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/em-mongo/connection.rb', line 111

def initialize(options={})
  @request_id    = 0
  @retries       = 0
  @responses     = {}
  @is_connected  = false
  @host          = options[:host]        || DEFAULT_IP
  @port          = options[:port]        || DEFAULT_PORT
  @on_unbind     = options[:unbind_cb]   || proc {}
  @reconnect_in  = options[:reconnect_in]|| false
  @slave_ok      = options[:slave_ok]    || false

  @on_close = proc {
    raise Error, "failure with mongodb server #{@host}:#{@port}"
  }
  timeout options[:timeout] if options[:timeout]
  errback { @on_close.call }
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



47
48
49
# File 'lib/em-mongo/connection.rb', line 47

def connection
  @connection
end

Class Method Details

.connect(host = DEFAULT_IP, port = DEFAULT_PORT, timeout = nil, opts = nil) ⇒ Object



129
130
131
132
# File 'lib/em-mongo/connection.rb', line 129

def self.connect(host = DEFAULT_IP, port = DEFAULT_PORT, timeout = nil, opts = nil)
  opt = {:host => host, :port => port, :timeout => timeout, :reconnect_in => false}.merge(opts)
  EM.connect(host, port, self, opt)
end

Instance Method Details

#build_last_error_message(message, db_name, opts) ⇒ Object

Connection#send_message_with_safe_check.

Because it modifies message by reference, we don’t need to return it.



222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/em-mongo/connection.rb', line 222

def build_last_error_message(message, db_name, opts)
  message.put_int(0)
  BSON::BSON_RUBY.serialize_cstr(message, "#{db_name}.$cmd")
  message.put_int(0)
  message.put_int(-1)
  cmd = BSON::OrderedHash.new
  cmd[:getlasterror] = 1
  if opts.is_a?(Hash)
    opts.assert_valid_keys(:w, :wtimeout, :fsync)
    cmd.merge!(opts)
  end
  message.put_binary(BSON::BSON_CODER.serialize(cmd, false).to_s)
  nil
end

#closeObject



209
210
211
212
213
214
215
216
# File 'lib/em-mongo/connection.rb', line 209

def close
  @on_close = proc { yield if block_given? }
  if @responses.empty?
    close_connection_after_writing
  else
    @close_pending = true
  end
end

#connected?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/em-mongo/connection.rb', line 53

def connected?
  @is_connected
end

#connection_completedObject



134
135
136
137
138
139
# File 'lib/em-mongo/connection.rb', line 134

def connection_completed
  @buffer = BSON::ByteBuffer.new
  @is_connected = true
  @retries = 0
  succeed
end

#message_headers(operation, request_id, message) ⇒ Object



90
91
92
93
94
95
96
97
# File 'lib/em-mongo/connection.rb', line 90

def message_headers(operation, request_id, message)
  headers = BSON::ByteBuffer.new
  headers.put_int(16 + message.size)
  headers.put_int(request_id)
  headers.put_int(0)
  headers.put_int(operation)
  headers
end

#message_received?(buffer) ⇒ Boolean

Returns:

  • (Boolean)


141
142
143
144
# File 'lib/em-mongo/connection.rb', line 141

def message_received?(buffer)
  x= remaining_bytes(@buffer)
  x > STANDARD_HEADER_SIZE && x >= peek_size(@buffer)
end

#new_request_idObject



57
58
59
# File 'lib/em-mongo/connection.rb', line 57

def new_request_id
  @request_id += 1
end

#next_responseObject



182
183
184
# File 'lib/em-mongo/connection.rb', line 182

def next_response()
  ServerResponse.new(@buffer, self)
end

#peek_size(buffer) ⇒ Object



150
151
152
153
154
155
# File 'lib/em-mongo/connection.rb', line 150

def peek_size(buffer)
  position= buffer.position
  size= buffer.get_int
  buffer.position= position
  size
end

#prepare_message(op, message, options = {}) ⇒ Object

MongoDB Commands



67
68
69
70
71
72
# File 'lib/em-mongo/connection.rb', line 67

def prepare_message(op, message, options={})
  req_id = new_request_id
  message.prepend!(message_headers(op, req_id, message))
  req_id = prepare_safe_message(message,options) if options[:safe]
  [req_id, message.to_s]
end

#prepare_safe_message(message, options) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/em-mongo/connection.rb', line 74

def prepare_safe_message(message,options)
    db_name = options[:db_name]
    unless db_name
      raise( ArgumentError, "You must include the :db_name option when :safe => true" )
    end

    last_error_params = options[:last_error_params] || false
    last_error_message = BSON::ByteBuffer.new

    build_last_error_message(last_error_message, db_name, last_error_params)
    last_error_id = new_request_id
    last_error_message.prepend!(message_headers(EM::Mongo::OP_QUERY, last_error_id, last_error_message))
    message.append!(last_error_message)
    last_error_id
end

#receive_data(data) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/em-mongo/connection.rb', line 157

def receive_data(data)

  @buffer.append!(data)

  @buffer.rewind
  while message_received?(@buffer)
    response = next_response
    callback = @responses.delete(response.response_to)
    callback.call(response) if callback
  end

  if @buffer.more?
    if @buffer.position > 0
      remaining_bytes= @buffer.size-@buffer.position
      @buffer = BSON::ByteBuffer.new(@buffer.to_s[@buffer.position,remaining_bytes])
      @buffer.rewind
    end
  else
    @buffer.clear
  end

  close_connection if @close_pending && @responses.empty?

end

#remaining_bytes(buffer) ⇒ Object



146
147
148
# File 'lib/em-mongo/connection.rb', line 146

def remaining_bytes(buffer)
  buffer.size-buffer.position
end

#responses_pending?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/em-mongo/connection.rb', line 49

def responses_pending?
  @responses.size >= 1
end

#send_command(op, message, options = {}, &cb) ⇒ Object



99
100
101
102
103
104
105
106
107
108
# File 'lib/em-mongo/connection.rb', line 99

def send_command(op, message, options={}, &cb)
  request_id, buffer = prepare_message(op, message, options)

  callback do
    send_data buffer
  end

  @responses[request_id] = cb if cb
  request_id
end

#slave_ok?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/em-mongo/connection.rb', line 61

def slave_ok?
  @slave_ok
end

#unbindObject



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/em-mongo/connection.rb', line 186

def unbind
  if @is_connected
    @responses.values.each { |resp| resp.call(:disconnected) }

    @request_id = 0
    @responses = {}
  end

  @is_connected = false

  set_deferred_status(nil)

  if @reconnect_in && @retries < MAX_RETRIES
    EM.add_timer(@reconnect_in) { reconnect(@host, @port) }
  elsif @on_unbind
    @on_unbind.call
  else
    raise "Connection to Mongo Lost"
  end

  @retries += 1
end