Class: Ktl::ContinousReassigner
- Inherits:
-
Reassigner
show all
- Includes:
- ZkClient::IZkDataListener
- Defined in:
- lib/ktl/continous_reassigner.rb
Instance Attribute Summary
Attributes inherited from Reassigner
#limit
Instance Method Summary
collapse
Methods inherited from Reassigner
#load_overflow, #overflow?, #reassignment_in_progress?
Constructor Details
#initialize(zk_client, options = {}) ⇒ ContinousReassigner
Returns a new instance of ContinousReassigner.
7
8
9
10
11
12
13
|
# File 'lib/ktl/continous_reassigner.rb', line 7
def initialize(zk_client, options={})
super(zk_client, options)
@latch = JavaConcurrent::CountDownLatch.new(1)
@sleeper = options[:sleeper] || java.lang.Thread
@delay = options[:delay] || 5
@shell = options[:shell]
end
|
Instance Method Details
#execute(reassignment) ⇒ Object
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/ktl/continous_reassigner.rb', line 15
def execute(reassignment)
Signal.trap('SIGINT', proc { @logger.info 'Exiting due to Ctrl-C'; @latch.count_down })
@zk_client.watch_data(zk_utils.class.reassign_partitions_path, self)
if reassignment_in_progress?
@logger.info 'reassignment already in progress, watching for changes...'
progress = ReassignmentProgress.new(@zk_client, logger: @logger, verbose: true)
progress.display(@shell)
else
reassign(reassignment)
end
@latch.await
end
|
#handle_data_change(path, data) ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/ktl/continous_reassigner.rb', line 28
def handle_data_change(path, data)
parsed_data = JSON.parse(data)
if (partitions = parsed_data['partitions'])
partitions = partitions.map { |r| r.values_at('topic', 'partition').join(':') }
@logger.debug sprintf('%d partitions left to reassign (%p)', partitions.size, partitions.size <= 5 ? partitions : '...')
else
@logger.info sprintf('Data without `partitions` key: %p', parsed_data)
end
rescue => e
@logger.error sprintf('Bad data: %p', data)
end
|
#handle_data_deleted(path) ⇒ Object
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/ktl/continous_reassigner.rb', line 40
def handle_data_deleted(path)
reassignment = load_overflow
if reassignment.empty?
@zk_client.unsubscribe_data(zk_utils.class.reassign_partitions_path, self)
delete_previous_state
@latch.count_down
else
@logger.info sprintf('Waiting %ds before next assignment', @delay)
@sleeper.sleep(@delay * 1000)
reassign(reassignment)
end
end
|