Class: KafkaReplicator::TopicsReplicator

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka_replicator/topics_replicator.rb

Constant Summary collapse

SKIP_TOPICS =
['__consumer_offse', '__consumer_offsets', '_schemas']

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_brokers:, destination_brokers:, skip_topics: []) ⇒ TopicsReplicator

Returns a new instance of TopicsReplicator.



14
15
16
17
18
19
# File 'lib/kafka_replicator/topics_replicator.rb', line 14

def initialize(source_brokers:, destination_brokers:, skip_topics: [])
  @source_brokers = source_brokers
  @destination_brokers = destination_brokers
  @skip_topics = SKIP_TOPICS | skip_topics
  @logger = Logger.new(STDOUT)
end

Instance Attribute Details

#destination_kafkaObject (readonly)

Returns the value of attribute destination_kafka.



5
6
7
# File 'lib/kafka_replicator/topics_replicator.rb', line 5

def destination_kafka
  @destination_kafka
end

#destination_producerObject (readonly)

Returns the value of attribute destination_producer.



5
6
7
# File 'lib/kafka_replicator/topics_replicator.rb', line 5

def destination_producer
  @destination_producer
end

#loggerObject (readonly)

Returns the value of attribute logger.



5
6
7
# File 'lib/kafka_replicator/topics_replicator.rb', line 5

def logger
  @logger
end

#replicated_topicsObject (readonly)

Returns the value of attribute replicated_topics.



5
6
7
# File 'lib/kafka_replicator/topics_replicator.rb', line 5

def replicated_topics
  @replicated_topics
end

#skip_topicsObject (readonly)

Returns the value of attribute skip_topics.



5
6
7
# File 'lib/kafka_replicator/topics_replicator.rb', line 5

def skip_topics
  @skip_topics
end

#source_consumerObject (readonly)

Returns the value of attribute source_consumer.



5
6
7
# File 'lib/kafka_replicator/topics_replicator.rb', line 5

def source_consumer
  @source_consumer
end

#source_kafkaObject (readonly)

Returns the value of attribute source_kafka.



5
6
7
# File 'lib/kafka_replicator/topics_replicator.rb', line 5

def source_kafka
  @source_kafka
end

#stoppedObject (readonly)

Returns the value of attribute stopped.



5
6
7
# File 'lib/kafka_replicator/topics_replicator.rb', line 5

def stopped
  @stopped
end

Instance Method Details

#setupObject



21
22
23
24
25
26
# File 'lib/kafka_replicator/topics_replicator.rb', line 21

def setup
  @stopped = false
  @replicated_topics = Set[]
  @source_consumer = nil
  @destination_producer = nil
end

#startObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/kafka_replicator/topics_replicator.rb', line 50

def start
  loop do
    break if stopped

    logger.info 'Setting up configuration...'
    setup

    logger.info 'Adding topics for replication...'
    subscribe_to_source_topics

    logger.info 'Starting replication...'
    replicate
  end
end

#stopObject



72
73
74
75
76
# File 'lib/kafka_replicator/topics_replicator.rb', line 72

def stop
  logger.info 'Stopping replication...'
  source_consumer.stop
  @stopped = true
end