Method: Dataflow::Adapters::MongoDbAdapter#all_paginated

Defined in:
lib/dataflow/adapters/mongo_db_adapter.rb

#all_paginated(where: {}, fields: [], cursor: nil) ⇒ Hash

Helper that supports paginating through the whole dataset at fixed performance. Unlike using offset/skip which requires to read through the skipped content (high usage of CPU), we use the internal mongo cursor to get batch of results.

Returns:

  • (Hash)

    with 2 fields: data and next_cursor for the next call



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 80

def all_paginated(where: {}, fields: [], cursor: nil)
  cursor = cursor.to_i
  data = []

  # If there is no cursor, we make the initial query
  # get the first batch of data and get the cursor id.
  if cursor.zero?
    all(where: where, fields: fields) do |res|
      results = res.initial_query
      data = results.documents
      cursor = res.cursor.id
    end
  end

  # The first query's result batch is a small 101 set of results
  # so we want to get one more batch of data.
  # However, there might be queries whose results are very small
  # and the resulting cursor is 0. In such case there is no more
  # data to be fetched.
  unless cursor.zero?
    # send a getMore command on the cursor id
    command = { getMore: cursor, collection: read_dataset_name }
    result = client.database.command(command).documents[0]
    cursor = result['cursor']['id']
    data += result['cursor']['nextBatch']
  end

  # We want to return the cursor as a string.
  # If there is no cursor (zero) then make it empty
  cursor = '' if cursor.zero?

  { 'data' => data, 'next_cursor' => cursor.to_s }
rescue Mongo::Error::OperationFailure
  { 'data' => data, 'next_cursor' => '' }
end