Class: KafkaReplicator::OffsetsSync
- Inherits:
-
Object
- Object
- KafkaReplicator::OffsetsSync
- Defined in:
- lib/kafka_replicator/offsets_sync.rb
Instance Attribute Summary collapse
-
#consumer_group ⇒ Object
readonly
Returns the value of attribute consumer_group.
-
#destination_consumer ⇒ Object
readonly
Returns the value of attribute destination_consumer.
-
#destination_kafka ⇒ Object
readonly
Returns the value of attribute destination_kafka.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#source_kafka ⇒ Object
readonly
Returns the value of attribute source_kafka.
-
#topics ⇒ Object
readonly
Returns the value of attribute topics.
Instance Method Summary collapse
- #calculate_destination_consumer_offsets ⇒ Object
-
#initialize(source_brokers:, destination_brokers:, consumer_group:) ⇒ OffsetsSync
constructor
A new instance of OffsetsSync.
- #load_destination_producer_offsets ⇒ Object
- #load_source_consumer_offsets ⇒ Object
- #load_source_producer_offsets ⇒ Object
- #set_destination_consumer_offsets ⇒ Object
- #source_consumer_offsets ⇒ Object
- #source_group_cordinator ⇒ Object
- #sync ⇒ Object
Constructor Details
#initialize(source_brokers:, destination_brokers:, consumer_group:) ⇒ OffsetsSync
Returns a new instance of OffsetsSync.
10 11 12 13 14 15 16 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 10 def initialize(source_brokers:, destination_brokers:, consumer_group:) @source_brokers = source_brokers @destination_brokers = destination_brokers @consumer_group = consumer_group @topics = Hash.new { |h, k| h[k] = {} } @logger = Logger.new(STDOUT) end |
Instance Attribute Details
#consumer_group ⇒ Object (readonly)
Returns the value of attribute consumer_group.
3 4 5 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 3 def consumer_group @consumer_group end |
#destination_consumer ⇒ Object (readonly)
Returns the value of attribute destination_consumer.
3 4 5 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 3 def destination_consumer @destination_consumer end |
#destination_kafka ⇒ Object (readonly)
Returns the value of attribute destination_kafka.
3 4 5 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 3 def destination_kafka @destination_kafka end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
3 4 5 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 3 def logger @logger end |
#source_kafka ⇒ Object (readonly)
Returns the value of attribute source_kafka.
3 4 5 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 3 def source_kafka @source_kafka end |
#topics ⇒ Object (readonly)
Returns the value of attribute topics.
3 4 5 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 3 def topics @topics end |
Instance Method Details
#calculate_destination_consumer_offsets ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 80 def calculate_destination_consumer_offsets logger.info "calculate_destination_consumer_offsets" @topics.each do |topic, partitions| partitions.each do |partition, info| delta = info[:source_producer_offset] - info[:destination_producer_offset] info[:destination_consumer_offsets] = info[:source_consumer_offset] - delta end end end |
#load_destination_producer_offsets ⇒ Object
70 71 72 73 74 75 76 77 78 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 70 def load_destination_producer_offsets logger.info "load_source_producer_offsets" destination_kafka.last_offsets_for(*@topics.keys).each do |topic, partitions| partitions.each do |partition, offset| @topics[topic][partition][:destination_producer_offset] = offset end end end |
#load_source_consumer_offsets ⇒ Object
50 51 52 53 54 55 56 57 58 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 50 def load_source_consumer_offsets logger.info "load_source_consumer_offsets" source_consumer_offsets.topics.each do |topic, partitions| partitions.map do |partition, info| @topics[topic][partition] = { source_consumer_offset: info.offset } end end end |
#load_source_producer_offsets ⇒ Object
60 61 62 63 64 65 66 67 68 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 60 def load_source_producer_offsets logger.info "load_destination_producer_offsets" source_kafka.last_offsets_for(*@topics.keys).each do |topic, partitions| partitions.each do |partition, offset| @topics[topic][partition][:source_producer_offset] = offset end end end |
#set_destination_consumer_offsets ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 91 def set_destination_consumer_offsets logger.info "set_destination_consumer_offsets" @topics.each do |topic, partitions| destination_consumer.subscribe(topic) partitions.each do |partition, info| offset = info[:destination_consumer_offsets] logger.info "Seeking consumer offset for: #{m.topic}/#{m.partition} to #{offset}" destination_consumer.seek(topic, partition, offset) end end opts = { automatically_mark_as_processed: false } destination_consumer.(opts) do |m| logger.info "Setting consumer offset for: #{m.topic}/#{m.partition}" break end end |
#source_consumer_offsets ⇒ Object
45 46 47 48 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 45 def source_consumer_offsets Kafka::Protocol::OffsetFetchRequest.send(:define_method, "api_version") { 2 } source_group_cordinator.fetch_offsets(group_id: consumer_group, topics: nil) end |
#source_group_cordinator ⇒ Object
38 39 40 41 42 43 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 38 def source_group_cordinator source_kafka.instance_variable_get('@cluster').send( :get_group_coordinator, group_id: consumer_group ) end |
#sync ⇒ Object
110 111 112 113 114 115 116 117 |
# File 'lib/kafka_replicator/offsets_sync.rb', line 110 def sync load_source_consumer_offsets load_destination_producer_offsets load_source_producer_offsets calculate_destination_consumer_offsets set_destination_consumer_offsets end |