Module: Cassie::Statements::Execution::BatchedFetching
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/cassie/statements/execution/batched_fetching.rb
Instance Method Summary collapse
-
#fetch_each(opts = {}) ⇒ Object
Uses #fetch_in_batches with a batch size of 1000 (or as specified by the
:batch_size
option) to enumerate through all records, while using batches to limit resource consumption. -
#fetch_in_batches(opts = {}) ⇒ Object
Yields each batch of records that was found by the options as an array.
Instance Method Details
#fetch_each(opts = {}) ⇒ Object
Uses #fetch_in_batches with a batch size of 1000 (or as specified by the :batch_size
option) to enumerate through all records, while using batches to limit resource consumption.
If you do not provide a block to #fetch_each, it will return an Enumerator for chaining with other methods.
UsersByPositionQuery.fetch_each.with_index do |user, index|
user.position = index
UserMapper.update_position(user)
end
Options
-
:batch_size
- Specifies the size of the batch. Default to 1000.
NOTE: Any limit specified on the query will affect the batched set. Cassandra internal paging is used for batching.
27 28 29 30 31 32 33 |
# File 'lib/cassie/statements/execution/batched_fetching.rb', line 27 def fetch_each(opts={}) return to_enum(:fetch_each, opts) unless block_given? fetch_in_batches(opts) do |records| records.each { |record| yield record } end end |
#fetch_in_batches(opts = {}) ⇒ Object
Yields each batch of records that was found by the options as an array.
If you do not provide a block to find_in_batches, it will return an Enumerator for chaining with other methods.
query.fetch_in_batches do |records|
puts "max score in group: #{records.max{ |a, b| a.score <=> b.score }}"
end
"max score in group: 26"
Options
-
:batch_size
- Specifies the size of the batch. Default to 1000.
NOTE: Any limit specified on the query will affect the batched set. Cassandra internal paging is used for batching.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/cassie/statements/execution/batched_fetching.rb', line 50 def fetch_in_batches(opts={}) opts[:batch_size] ||= 1000 # spawn the new query as soon as the enumerable is created # rather than waiting until the firt iteration is executed. # The client could mutate the object between these moments, # however we don't want to spawn twice if a block isn't passed. paged_query = opts.delete(:_paged_query) || self.clone return to_enum(:fetch_in_batches, opts.merge(_paged_query: paged_query)) unless block_given? # use Cassandra internal paging # but clone the query to isolate it # and allow all paging queries # to execute within a Cassie::Query # for use of other features, like logging # # note: stateless page size is independent from limit paged_query.stateless_page_size = opts[:batch_size] paged_query.paging_state = nil loop do # done if the previous result was the last page break if paged_query.result && paged_query.result.last_page? raise page_size_changed_error(opts[:batch_size]) if opts[:batch_size] != paged_query.stateless_page_size batch = paged_query.fetch paged_query.paging_state = paged_query.result.paging_state yield batch end end |