Class: Ktl::ContinousReassigner

Inherits:
Reassigner show all
Includes:
ZkClient::IZkDataListener
Defined in:
lib/ktl/continous_reassigner.rb

Instance Attribute Summary

Attributes inherited from Reassigner

#limit

Instance Method Summary collapse

Methods inherited from Reassigner

#load_overflow, #overflow?, #reassignment_in_progress?

Constructor Details

#initialize(zk_client, options = {}) ⇒ ContinousReassigner

Returns a new instance of ContinousReassigner.



7
8
9
10
11
12
13
# File 'lib/ktl/continous_reassigner.rb', line 7

def initialize(zk_client, options={})
  super(zk_client, options)
  @latch = JavaConcurrent::CountDownLatch.new(1)
  @sleeper = options[:sleeper] || java.lang.Thread
  @delay = options[:delay] || 5
  @shell = options[:shell]
end

Instance Method Details

#execute(reassignment) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/ktl/continous_reassigner.rb', line 15

def execute(reassignment)
  Signal.trap('SIGINT', proc { @logger.info 'Exiting due to Ctrl-C'; @latch.count_down })
  @zk_client.watch_data(zk_utils.class.reassign_partitions_path, self)
  if reassignment_in_progress?
    @logger.info 'reassignment already in progress, watching for changes...'
    progress = ReassignmentProgress.new(@zk_client, logger: @logger, verbose: true)
    progress.display(@shell)
  else
    reassign(reassignment)
  end
  @latch.await
end

#handle_data_change(path, data) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/ktl/continous_reassigner.rb', line 28

def handle_data_change(path, data)
  parsed_data = JSON.parse(data)
  if (partitions = parsed_data['partitions'])
    partitions = partitions.map { |r| r.values_at('topic', 'partition').join(':') }
    @logger.debug sprintf('%d partitions left to reassign (%p)', partitions.size, partitions.size <= 5 ? partitions : '...')
  else
    @logger.info sprintf('Data without `partitions` key: %p', parsed_data)
  end
rescue => e
  @logger.error sprintf('Bad data: %p', data)
end

#handle_data_deleted(path) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/ktl/continous_reassigner.rb', line 40

def handle_data_deleted(path)
  reassignment = load_overflow
  if reassignment.empty?
    @zk_client.unsubscribe_data(zk_utils.class.reassign_partitions_path, self)
    delete_previous_state
    @latch.count_down
  else
    @logger.info sprintf('Waiting %ds before next assignment', @delay)
    @sleeper.sleep(@delay * 1000)
    reassign(reassignment)
  end
end