Module: Cassie::Statements::Execution::BatchedFetching

Extended by:
ActiveSupport::Concern
Defined in:
lib/cassie/statements/execution/batched_fetching.rb

Instance Method Summary collapse

Instance Method Details

#fetch_each(opts = {}) ⇒ Object

Uses #fetch_in_batches with a batch size of 1000 (or as specified by the :batch_size option) to enumerate through all records, while using batches to limit resource consumption.

If you do not provide a block to #fetch_each, it will return an Enumerator for chaining with other methods.

UsersByPositionQuery.fetch_each.with_index do |user, index|
  user.position = index
  UserMapper.update_position(user)
end

Options

  • :batch_size - Specifies the size of the batch. Default to 1000.

NOTE: Any limit specified on the query will affect the batched set. Cassandra internal paging is used for batching.



27
28
29
30
31
32
33
# File 'lib/cassie/statements/execution/batched_fetching.rb', line 27

def fetch_each(opts={})
  return to_enum(:fetch_each, opts) unless block_given?

  fetch_in_batches(opts) do |records|
    records.each { |record| yield record }
  end
end

#fetch_in_batches(opts = {}) ⇒ Object

Yields each batch of records that was found by the options as an array.

If you do not provide a block to find_in_batches, it will return an Enumerator for chaining with other methods.

query.fetch_in_batches do |records|
 puts "max score in group: #{records.max{ |a, b| a.score <=> b.score }}"
end

"max score in group: 26"

Options

  • :batch_size - Specifies the size of the batch. Default to 1000.

NOTE: Any limit specified on the query will affect the batched set. Cassandra internal paging is used for batching.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/cassie/statements/execution/batched_fetching.rb', line 50

def fetch_in_batches(opts={})
  opts[:batch_size] ||= 1000

  # spawn the new query as soon as the enumerable is created
  # rather than waiting until the firt iteration is executed.
  # The client could mutate the object between these moments,
  # however we don't want to spawn twice if a block isn't passed.
  paged_query = opts.delete(:_paged_query) || self.clone

  return to_enum(:fetch_in_batches, opts.merge(_paged_query: paged_query)) unless block_given?

  # use Cassandra internal paging
  # but clone the query to isolate it
  # and allow all paging queries
  # to execute within a Cassie::Query
  # for use of other features, like logging
  #
  # note: stateless page size is independent from limit
  paged_query.stateless_page_size = opts[:batch_size]
  paged_query.paging_state = nil

  loop do
    # done if the previous result was the last page
    break if paged_query.result && paged_query.result.last_page?
    raise page_size_changed_error(opts[:batch_size]) if opts[:batch_size] != paged_query.stateless_page_size

    batch = paged_query.fetch
    paged_query.paging_state = paged_query.result.paging_state

    yield batch
  end
end