9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# File 'lib/kafka/fetched_offset_resolver.rb', line 9
def resolve!(broker, topics)
pending_topics = filter_pending_topics(topics)
return topics if pending_topics.empty?
response = broker.list_offsets(topics: pending_topics)
pending_topics.each do |topic, partitions|
partitions.each do |options|
partition = options.fetch(:partition)
resolved_offset = response.offset_for(topic, partition)
@logger.debug "Offset for #{topic}/#{partition} is #{resolved_offset.inspect}"
topics[topic][partition][:fetch_offset] = resolved_offset || 0
end
end
end
|