Class: EM::Mongo::Cursor

Inherits:
Object
  • Object
show all
Includes:
Conversions
Defined in:
lib/em-mongo/cursor.rb

Overview

A cursor over query results. Returned objects are hashes.

Constant Summary

Constants included from Conversions

EM::Mongo::Conversions::ASCENDING_CONVERSION, EM::Mongo::Conversions::DESCENDING_CONVERSION

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Conversions

#array_as_sort_parameters, #sort_value, #string_as_sort_parameters

Constructor Details

#initialize(collection, opts = {}) ⇒ Cursor

Create a new cursor.

Note: cursors are created when executing queries using [Collection#find] and other similar methods. Application developers shouldn’t have to create cursors manually.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/em-mongo/cursor.rb', line 36

def initialize(collection, opts={})
  @cursor_id  = nil

  @db         = collection.db
  @collection = collection
  @connection = @db.connection
  #@logger     = @connection.logger

  # Query selector
  @selector   = opts[:selector] || {}

  # Special operators that form part of $query
  @order      = opts[:order]
  @explain    = opts[:explain]
  @hint       = opts[:hint]
  @snapshot   = opts[:snapshot]
  @max_scan   = opts.fetch(:max_scan, nil)
  @return_key = opts.fetch(:return_key, nil)
  @show_disk_loc = opts.fetch(:show_disk_loc, nil)

  # Wire-protocol settings
  @fields     = convert_fields_for_query(opts[:fields])
  @skip       = opts[:skip]     || 0
  @limit      = opts[:limit]    || 0
  @tailable   = opts[:tailable] || false
  @timeout    = opts.fetch(:timeout, true)

  # Use this socket for the query
  #@socket     = opts[:socket]

  @closed       = false
  @query_run    = false

  @transformer = opts[:transformer]
  batch_size(opts[:batch_size] || 0)

  @full_collection_name = "#{@collection.db.name}.#{@collection.name}"
  @cache        = []
  @returned     = 0

  if @collection.name =~ /^\$cmd/ || @collection.name =~ /^system/
    @command = true
  else
    @command = false
  end
end

Instance Attribute Details

#collectionObject (readonly)

include Enumerable



24
25
26
# File 'lib/em-mongo/cursor.rb', line 24

def collection
  @collection
end

#fieldsObject (readonly)

include Enumerable



24
25
26
# File 'lib/em-mongo/cursor.rb', line 24

def fields
  @fields
end

#full_collection_nameObject (readonly)

include Enumerable



24
25
26
# File 'lib/em-mongo/cursor.rb', line 24

def full_collection_name
  @full_collection_name
end

#hintObject (readonly)

include Enumerable



24
25
26
# File 'lib/em-mongo/cursor.rb', line 24

def hint
  @hint
end

#orderObject (readonly)

include Enumerable



24
25
26
# File 'lib/em-mongo/cursor.rb', line 24

def order
  @order
end

#selectorObject (readonly)

include Enumerable



24
25
26
# File 'lib/em-mongo/cursor.rb', line 24

def selector
  @selector
end

#snapshotObject (readonly)

include Enumerable



24
25
26
# File 'lib/em-mongo/cursor.rb', line 24

def snapshot
  @snapshot
end

#timeoutObject (readonly)

include Enumerable



24
25
26
# File 'lib/em-mongo/cursor.rb', line 24

def timeout
  @timeout
end

#transformerObject (readonly)

include Enumerable



24
25
26
# File 'lib/em-mongo/cursor.rb', line 24

def transformer
  @transformer
end

Instance Method Details

#batch_size(size = 0) ⇒ Cursor

Set the batch size for server responses.

Note that the batch size will take effect only on queries where the number to be returned is greater than 100.

Parameters:

  • size (Integer) (defaults to: 0)

    either 0 or some integer greater than 1. If 0, the server will determine the batch size.

Returns:



247
248
249
250
251
252
253
254
255
256
# File 'lib/em-mongo/cursor.rb', line 247

def batch_size(size=0)
  check_modifiable
  if size < 0 || size == 1
    raise ArgumentError, "Invalid value for batch_size #{size}; must be 0 or > 1."
  else
    @batch_size = size > @limit ? @limit : size
  end

  self
end

#closeTrue

Close the cursor.

Note: if a cursor is read until exhausted (read until EM::Mongo::Constants::OP_QUERY or EM::Mongo::Constants::OP_GETMORE returns zero for the cursor id), there is no need to close it manually.

Note also: Collection#find takes an optional block argument which can be used to ensure that your cursors get closed.

Returns:

  • (True)


351
352
353
354
355
356
357
358
359
360
361
# File 'lib/em-mongo/cursor.rb', line 351

def close
  if @cursor_id && @cursor_id != 0
    @cursor_id = 0
    @closed    = true
    message = BSON::ByteBuffer.new([0, 0, 0, 0])
    message.put_int(1)
    message.put_long(@cursor_id)
    @connection.send_command(EM::Mongo::OP_KILL_CURSORS, message)
  end
  true
end

#closed?Boolean

Is this cursor closed?

Returns:

  • (Boolean)


366
# File 'lib/em-mongo/cursor.rb', line 366

def closed?; @closed; end

#count(skip_and_limit = false) ⇒ EM::Mongo::RequestResponse

Get the size of the result set for this query.

Parameters:

  • whether (Boolean)

    of not to take notice of skip and limit

Returns:

Raises:



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/em-mongo/cursor.rb', line 153

def count(skip_and_limit = false)
  response = RequestResponse.new
  command = BSON::OrderedHash["count",  @collection.name, "query",  @selector]

  if skip_and_limit
    command.merge!(BSON::OrderedHash["limit", @limit]) if @limit != 0
    command.merge!(BSON::OrderedHash["skip", @skip]) if @skip != 0
  end

  command.merge!(BSON::OrderedHash["fields", @fields])

  cmd_resp = @db.command(command)

  cmd_resp.callback { |doc| response.succeed( doc['n'].to_i ) }
  cmd_resp.errback do |err|
    if err[1] =~ /ns missing/
      response.succeed(0)
    else
      response.fail([OperationFailure, "Count failed: #{err[1]}"])
    end
  end

  response
end

#defer_as_aEM::Mongo::RequestResponse Also known as: to_a

Receive all the documents from this cursor as an array of hashes.

Notes:

If you’ve already started iterating over the cursor, the array returned by this method contains only the remaining documents. See Cursor#rewind! if you need to reset the cursor.

Use of this method is discouraged - in most cases, it’s much more efficient to retrieve documents as you need them by iterating over the cursor.

Returns:



302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/em-mongo/cursor.rb', line 302

def defer_as_a
  response = RequestResponse.new
  items = []
  self.each do |doc,err|
    if doc == :error
      response.fail(err)
    elsif doc
      items << doc
    else
      response.succeed(items)
    end
  end
  response
end

#each { ... } ⇒ Object

Iterate over each document in this cursor, yielding it to the given block.

Iterating over an entire cursor will close it.

Examples:

if ‘comments’ represents a collection of comments:

comments.find.each do |doc|
  if doc
    puts doc['user']
  end
end

Yields:

  • passes each document to a block for processing. When the cursor is empty, each will yield a nil value



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/em-mongo/cursor.rb', line 272

def each(&blk)
  raise "A callback block is required for #each" unless blk
  EM.next_tick do
    next_doc_resp = next_document
    next_doc_resp.callback do |doc|
      blk.call(doc)
      doc.nil? ? close : self.each(&blk)
    end
    next_doc_resp.errback do |err|
      if blk.arity > 1
        blk.call(:error, err)
      else
        blk.call(:error)
      end
    end
  end
end

#explainEM::Mongo::RequestResponse

Get the explain plan for this cursor.

Returns:



325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/em-mongo/cursor.rb', line 325

def explain
  response = RequestResponse.new
  c = Cursor.new(@collection, query_options_hash.merge(:limit => -@limit.abs, :explain => true))

  exp_response = c.next_document
  exp_response.callback do |explanation|
    c.close
    response.succeed(explanation)
  end
  exp_response.errback do |err|
    c.close
    response.fail(err)
  end
  response
end

#has_next?EM::Mongo::RequestResponse

Determine whether this cursor has any remaining results.



138
139
140
141
142
143
144
# File 'lib/em-mongo/cursor.rb', line 138

def has_next?
  response = RequestResponse.new
  num_resp = num_remaining
  num_resp.callback { |num| response.succeed( num > 0 ) }
  num_resp.errback { |err| response.fail err }
  response
end

#inspectObject

Clean output for inspect.



400
401
402
403
# File 'lib/em-mongo/cursor.rb', line 400

def inspect
  "<EM::Mongo::Cursor:0x#{object_id.to_s} namespace='#{@db.name}.#{@collection.name}' " +
    "@selector=#{@selector.inspect}>"
end

#limit(number_to_return = nil) ⇒ Integer

Limit the number of results to be returned by this cursor.

This method overrides any limit specified in the Collection#find method, and only the last limit applied has an effect.

Returns:

  • (Integer)

    the current number_to_return if no parameter is given.

Raises:



213
214
215
216
217
218
219
# File 'lib/em-mongo/cursor.rb', line 213

def limit(number_to_return=nil)
  return @limit unless number_to_return
  check_modifiable

  @limit = number_to_return
  self
end

#next_documentEM::Mongo::RequestResponse Also known as: next

Get the next document specified the cursor options.

Returns:



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/em-mongo/cursor.rb', line 86

def next_document
  response = RequestResponse.new
  if @cache.length == 0
    refresh.callback do
      check_and_transform_document(@cache.shift, response)
    end
  else
    check_and_transform_document(@cache.shift, response)
  end
  response
end

#query_options_hashHash

Get the query options for this Cursor.

Returns:



385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/em-mongo/cursor.rb', line 385

def query_options_hash
  { :selector => @selector,
    :fields   => @fields,
    :skip     => @skip,
    :limit    => @limit,
    :order    => @order,
    :hint     => @hint,
    :snapshot => @snapshot,
    :timeout  => @timeout,
    :max_scan => @max_scan,
    :return_key => @return_key,
    :show_disk_loc => @show_disk_loc }
end

#query_optsInteger

Returns an integer indicating which query options have been selected.

The MongoDB wire protocol.



374
375
376
377
378
379
380
# File 'lib/em-mongo/cursor.rb', line 374

def query_opts
  opts     = 0
  opts    |= EM::Mongo::OP_QUERY_NO_CURSOR_TIMEOUT unless @timeout
  opts    |= EM::Mongo::OP_QUERY_SLAVE_OK if @connection.slave_ok?
  opts    |= EM::Mongo::OP_QUERY_TAILABLE if @tailable
  opts
end

#rewind!Object

Reset this cursor on the server. Cursor options, such as the query string and the values for skip and limit, are preserved.



126
127
128
129
130
131
132
133
# File 'lib/em-mongo/cursor.rb', line 126

def rewind!
  close
  @cache.clear
  @cursor_id  = nil
  @closed     = false
  @query_run  = false
  @n_received = nil
end

#skip(number_to_skip = nil) ⇒ Integer

Skips the first number_to_skip results of this cursor. Returns the current number_to_skip if no parameter is given.

This method overrides any skip specified in the Collection#find method, and only the last skip applied has an effect.

Returns:

  • (Integer)

Raises:



230
231
232
233
234
235
236
# File 'lib/em-mongo/cursor.rb', line 230

def skip(number_to_skip=nil)
  return @skip unless number_to_skip
  check_modifiable

  @skip = number_to_skip
  self
end

#sort(key_or_list, direction = nil) ⇒ Object

Sort this cursor’s results.

This method overrides any sort order specified in the Collection#find method, and only the last sort applied has an effect.

Parameters:

  • key_or_list (Symbol, Array)

    either 1) a key to sort by or 2) an array of [key, direction] pairs to sort by. Direction should be specified as EM::Mongo::ASCENDING (or :ascending / :asc) or EM::Mongo::DESCENDING (or :descending / :desc)

Raises:



190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/em-mongo/cursor.rb', line 190

def sort(key_or_list, direction=nil)
  check_modifiable

  if !direction.nil?
    order = [[key_or_list, direction]]
  else
    order = key_or_list
  end

  @order = order
  self
end