Class: Ktl::Reassigner
- Inherits:
-
Object
- Object
- Ktl::Reassigner
- Defined in:
- lib/ktl/reassigner.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#limit ⇒ Object
readonly
Returns the value of attribute limit.
Instance Method Summary collapse
- #execute(reassignment) ⇒ Object
-
#initialize(zk_client, options = {}) ⇒ Reassigner
constructor
A new instance of Reassigner.
- #load_overflow ⇒ Object
- #overflow? ⇒ Boolean
- #reassignment_in_progress? ⇒ Boolean
Constructor Details
#initialize(zk_client, options = {}) ⇒ Reassigner
Returns a new instance of Reassigner.
8 9 10 11 12 13 14 15 16 |
# File 'lib/ktl/reassigner.rb', line 8 def initialize(zk_client, ={}) @zk_client = zk_client @limit = [:limit] @overflow_path = '/ktl/overflow' @state_path = '/ktl/reassign' @logger = [:logger] || NullLogger.new @log_assignments = !![:log_assignments] @multi_step_migration = [:multi_step_migration] end |
Instance Attribute Details
#limit ⇒ Object (readonly)
Returns the value of attribute limit.
6 7 8 |
# File 'lib/ktl/reassigner.rb', line 6 def limit @limit end |
Instance Method Details
#execute(reassignment) ⇒ Object
43 44 45 |
# File 'lib/ktl/reassigner.rb', line 43 def execute(reassignment) reassign(reassignment) end |
#load_overflow ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/ktl/reassigner.rb', line 30 def load_overflow overflow = Scala::Collection::Map.empty overflow_nodes = @zk_client.get_children(@overflow_path) overflow_nodes.foreach do |index| overflow_json = @zk_client.read_data(overflow_path(index)).first data = parse_reassignment_json(overflow_json) overflow = overflow.send('++', data) end overflow rescue ZkClient::Exception::ZkNoNodeException Scala::Collection::Map.empty end |
#overflow? ⇒ Boolean
23 24 25 26 27 28 |
# File 'lib/ktl/reassigner.rb', line 23 def overflow? overflow_znodes = @zk_client.get_children(@overflow_path) overflow_znodes.size > 0 rescue ZkClient::Exception::ZkNoNodeException false end |
#reassignment_in_progress? ⇒ Boolean
18 19 20 21 |
# File 'lib/ktl/reassigner.rb', line 18 def reassignment_in_progress? partitions = @zk_client.partitions_being_reassigned partitions.size > 0 end |