Method: Dataflow::Adapters::MongoDbAdapter#ordered_system_id_queries

Defined in:
lib/dataflow/adapters/mongo_db_adapter.rb

#ordered_system_id_queries(batch_size:, where: {}) ⇒ Object

Create queries that permit processing the whole dataset in parallel without using offsets.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/dataflow/adapters/mongo_db_adapter.rb', line 117

def ordered_system_id_queries(batch_size:, where: {})
  ids = all(fields: [SYSTEM_ID], where: where, sort: { SYSTEM_ID => 1 }).map { |x| x[SYSTEM_ID].to_s }
  queries_count = (ids.size / batch_size.to_f).ceil
  Array.new(queries_count) do |i|
    from = ids[i * batch_size]
    to = ids[(i + 1) * batch_size] || ids[-1]
    is_last = i == queries_count - 1

    where_query = { SYSTEM_ID => { '>=' => from } }
    operator = is_last ? '<=' : '<'
    where_query[SYSTEM_ID][operator] = to

    where_query
  end
end