Class: KafkaReplicator::TopicsReplicator
- Inherits:
-
Object
- Object
- KafkaReplicator::TopicsReplicator
- Defined in:
- lib/kafka_replicator/topics_replicator.rb
Constant Summary collapse
- SKIP_TOPICS =
['__consumer_offse', '__consumer_offsets', '_schemas']
Instance Attribute Summary collapse
-
#destination_kafka ⇒ Object
readonly
Returns the value of attribute destination_kafka.
-
#destination_producer ⇒ Object
readonly
Returns the value of attribute destination_producer.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#replicated_topics ⇒ Object
readonly
Returns the value of attribute replicated_topics.
-
#skip_topics ⇒ Object
readonly
Returns the value of attribute skip_topics.
-
#source_consumer ⇒ Object
readonly
Returns the value of attribute source_consumer.
-
#source_kafka ⇒ Object
readonly
Returns the value of attribute source_kafka.
-
#stopped ⇒ Object
readonly
Returns the value of attribute stopped.
Instance Method Summary collapse
-
#initialize(source_brokers:, destination_brokers:, skip_topics: []) ⇒ TopicsReplicator
constructor
A new instance of TopicsReplicator.
- #setup ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(source_brokers:, destination_brokers:, skip_topics: []) ⇒ TopicsReplicator
Returns a new instance of TopicsReplicator.
14 15 16 17 18 19 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 14 def initialize(source_brokers:, destination_brokers:, skip_topics: []) @source_brokers = source_brokers @destination_brokers = destination_brokers @skip_topics = SKIP_TOPICS | skip_topics @logger = Logger.new(STDOUT) end |
Instance Attribute Details
#destination_kafka ⇒ Object (readonly)
Returns the value of attribute destination_kafka.
5 6 7 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 5 def destination_kafka @destination_kafka end |
#destination_producer ⇒ Object (readonly)
Returns the value of attribute destination_producer.
5 6 7 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 5 def destination_producer @destination_producer end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
5 6 7 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 5 def logger @logger end |
#replicated_topics ⇒ Object (readonly)
Returns the value of attribute replicated_topics.
5 6 7 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 5 def replicated_topics @replicated_topics end |
#skip_topics ⇒ Object (readonly)
Returns the value of attribute skip_topics.
5 6 7 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 5 def skip_topics @skip_topics end |
#source_consumer ⇒ Object (readonly)
Returns the value of attribute source_consumer.
5 6 7 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 5 def source_consumer @source_consumer end |
#source_kafka ⇒ Object (readonly)
Returns the value of attribute source_kafka.
5 6 7 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 5 def source_kafka @source_kafka end |
#stopped ⇒ Object (readonly)
Returns the value of attribute stopped.
5 6 7 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 5 def stopped @stopped end |
Instance Method Details
#setup ⇒ Object
21 22 23 24 25 26 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 21 def setup @stopped = false @replicated_topics = Set[] @source_consumer = nil @destination_producer = nil end |
#start ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 50 def start loop do break if stopped logger.info 'Setting up configuration...' setup logger.info 'Adding topics for replication...' subscribe_to_source_topics logger.info 'Starting replication...' replicate end end |
#stop ⇒ Object
72 73 74 75 76 |
# File 'lib/kafka_replicator/topics_replicator.rb', line 72 def stop logger.info 'Stopping replication...' source_consumer.stop @stopped = true end |