Class: KafkaReplicator::OffsetsSync

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka_replicator/offsets_sync.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_brokers:, destination_brokers:, consumer_group:) ⇒ OffsetsSync

Returns a new instance of OffsetsSync.



10
11
12
13
14
15
16
# File 'lib/kafka_replicator/offsets_sync.rb', line 10

def initialize(source_brokers:, destination_brokers:, consumer_group:)
  @source_brokers = source_brokers
  @destination_brokers = destination_brokers
  @consumer_group = consumer_group
  @topics = Hash.new { |h, k| h[k] = {} }
  @logger = Logger.new(STDOUT)
end

Instance Attribute Details

#consumer_groupObject (readonly)

Returns the value of attribute consumer_group.



3
4
5
# File 'lib/kafka_replicator/offsets_sync.rb', line 3

def consumer_group
  @consumer_group
end

#destination_consumerObject (readonly)

Returns the value of attribute destination_consumer.



3
4
5
# File 'lib/kafka_replicator/offsets_sync.rb', line 3

def destination_consumer
  @destination_consumer
end

#destination_kafkaObject (readonly)

Returns the value of attribute destination_kafka.



3
4
5
# File 'lib/kafka_replicator/offsets_sync.rb', line 3

def destination_kafka
  @destination_kafka
end

#loggerObject (readonly)

Returns the value of attribute logger.



3
4
5
# File 'lib/kafka_replicator/offsets_sync.rb', line 3

def logger
  @logger
end

#source_kafkaObject (readonly)

Returns the value of attribute source_kafka.



3
4
5
# File 'lib/kafka_replicator/offsets_sync.rb', line 3

def source_kafka
  @source_kafka
end

#topicsObject (readonly)

Returns the value of attribute topics.



3
4
5
# File 'lib/kafka_replicator/offsets_sync.rb', line 3

def topics
  @topics
end

Instance Method Details

#calculate_destination_consumer_offsetsObject



80
81
82
83
84
85
86
87
88
89
# File 'lib/kafka_replicator/offsets_sync.rb', line 80

def calculate_destination_consumer_offsets
  logger.info "calculate_destination_consumer_offsets"

  @topics.each do |topic, partitions|
    partitions.each do |partition, info|
      delta = info[:source_producer_offset] - info[:destination_producer_offset]
      info[:destination_consumer_offsets] = info[:source_consumer_offset] - delta
    end
  end
end

#load_destination_producer_offsetsObject



70
71
72
73
74
75
76
77
78
# File 'lib/kafka_replicator/offsets_sync.rb', line 70

def load_destination_producer_offsets
  logger.info "load_source_producer_offsets"

  destination_kafka.last_offsets_for(*@topics.keys).each do |topic, partitions|
    partitions.each do |partition, offset|
      @topics[topic][partition][:destination_producer_offset] = offset
    end
  end
end

#load_source_consumer_offsetsObject



50
51
52
53
54
55
56
57
58
# File 'lib/kafka_replicator/offsets_sync.rb', line 50

def load_source_consumer_offsets
  logger.info "load_source_consumer_offsets"

  source_consumer_offsets.topics.each do |topic, partitions|
    partitions.map do |partition, info|
      @topics[topic][partition] = { source_consumer_offset: info.offset }
    end
  end
end

#load_source_producer_offsetsObject



60
61
62
63
64
65
66
67
68
# File 'lib/kafka_replicator/offsets_sync.rb', line 60

def load_source_producer_offsets
  logger.info "load_destination_producer_offsets"

  source_kafka.last_offsets_for(*@topics.keys).each do |topic, partitions|
    partitions.each do |partition, offset|
      @topics[topic][partition][:source_producer_offset] = offset
    end
  end
end

#set_destination_consumer_offsetsObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/kafka_replicator/offsets_sync.rb', line 91

def set_destination_consumer_offsets
  logger.info "set_destination_consumer_offsets"

  @topics.each do |topic, partitions|
    destination_consumer.subscribe(topic)
    partitions.each do |partition, info|
      offset = info[:destination_consumer_offsets]
      logger.info "Seeking consumer offset for: #{m.topic}/#{m.partition} to #{offset}"
      destination_consumer.seek(topic, partition, offset)
    end
  end

  opts = { automatically_mark_as_processed: false }
  destination_consumer.each_message(opts) do |m|
    logger.info "Setting consumer offset for: #{m.topic}/#{m.partition}"
    break
  end
end

#source_consumer_offsetsObject



45
46
47
48
# File 'lib/kafka_replicator/offsets_sync.rb', line 45

def source_consumer_offsets
  Kafka::Protocol::OffsetFetchRequest.send(:define_method, "api_version") { 2 }
  source_group_cordinator.fetch_offsets(group_id: consumer_group, topics: nil)
end

#source_group_cordinatorObject



38
39
40
41
42
43
# File 'lib/kafka_replicator/offsets_sync.rb', line 38

def source_group_cordinator
  source_kafka.instance_variable_get('@cluster').send(
    :get_group_coordinator,
    group_id: consumer_group
  )
end

#syncObject



110
111
112
113
114
115
116
117
# File 'lib/kafka_replicator/offsets_sync.rb', line 110

def sync
  load_source_consumer_offsets
  load_destination_producer_offsets
  load_source_producer_offsets

  calculate_destination_consumer_offsets
  set_destination_consumer_offsets
end