Class: Ktl::ZookeeperClient
- Inherits:
-
Object
- Object
- Ktl::ZookeeperClient
- Defined in:
- lib/ktl/zookeeper_client.rb
Instance Attribute Summary collapse
-
#utils ⇒ Object
readonly
Returns the value of attribute utils.
Instance Method Summary collapse
- #all_partitions ⇒ Object
- #all_topics ⇒ Object
- #broker_ids ⇒ Object
- #brokers ⇒ Object
- #close ⇒ Object
- #create_znode(path, data = '') ⇒ Object
- #delete_znode(path, options = {}) ⇒ Object
- #exists?(path) ⇒ Boolean
- #get_children(path) ⇒ Object
-
#initialize(uri, options = {}) ⇒ ZookeeperClient
constructor
A new instance of ZookeeperClient.
- #leader_and_isr_for(partitions) ⇒ Object
- #partitions_being_reassigned ⇒ Object
- #partitions_for_topics(topics) ⇒ Object
- #raw_client ⇒ Object
- #read_data(path) ⇒ Object
- #reassign_partitions(json) ⇒ Object
- #replica_assignment_for_topics(topics) ⇒ Object
- #setup ⇒ Object
- #unsubscribe_data(path, listener) ⇒ Object
- #watch_child(path, listener) ⇒ Object
- #watch_data(path, listener) ⇒ Object
- #watch_state(path, listener) ⇒ Object
Constructor Details
#initialize(uri, options = {}) ⇒ ZookeeperClient
Returns a new instance of ZookeeperClient.
7 8 9 10 11 |
# File 'lib/ktl/zookeeper_client.rb', line 7 def initialize(uri, ={}) @uri = uri @threadpool = [:threadpool] || JavaConcurrent::Executors.new_fixed_thread_pool(CONCURRENCY) @utils = [:utils] || Kafka::Utils::ZkUtils.apply(@uri, 5000, 5000, false) end |
Instance Attribute Details
#utils ⇒ Object (readonly)
Returns the value of attribute utils.
5 6 7 |
# File 'lib/ktl/zookeeper_client.rb', line 5 def utils @utils end |
Instance Method Details
#all_partitions ⇒ Object
27 28 29 |
# File 'lib/ktl/zookeeper_client.rb', line 27 def all_partitions @utils.get_all_partitions end |
#all_topics ⇒ Object
31 32 33 |
# File 'lib/ktl/zookeeper_client.rb', line 31 def all_topics @utils.get_all_topics end |
#broker_ids ⇒ Object
39 40 41 |
# File 'lib/ktl/zookeeper_client.rb', line 39 def broker_ids @utils.get_sorted_broker_list end |
#brokers ⇒ Object
35 36 37 |
# File 'lib/ktl/zookeeper_client.rb', line 35 def brokers @utils.get_all_brokers_in_cluster end |
#close ⇒ Object
18 19 20 21 |
# File 'lib/ktl/zookeeper_client.rb', line 18 def close @threadpool.shutdown_now if @threadpool @utils.close end |
#create_znode(path, data = '') ⇒ Object
63 64 65 |
# File 'lib/ktl/zookeeper_client.rb', line 63 def create_znode(path, data='') @utils.create_persistent_path(path, data, no_acl) end |
#delete_znode(path, options = {}) ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/ktl/zookeeper_client.rb', line 67 def delete_znode(path, ={}) if [:recursive] @utils.delete_path_recursive(path) else @utils.delete_path(path) end end |
#exists?(path) ⇒ Boolean
83 84 85 |
# File 'lib/ktl/zookeeper_client.rb', line 83 def exists?(path) @utils.path_exists(path) end |
#get_children(path) ⇒ Object
79 80 81 |
# File 'lib/ktl/zookeeper_client.rb', line 79 def get_children(path) @utils.get_children(path) end |
#leader_and_isr_for(partitions) ⇒ Object
43 44 45 |
# File 'lib/ktl/zookeeper_client.rb', line 43 def leader_and_isr_for(partitions) @utils.get_partition_leader_and_isr_for_topics(@utils.class.create_zk_client(@uri, 5_000, 5_000), partitions) end |
#partitions_being_reassigned ⇒ Object
55 56 57 |
# File 'lib/ktl/zookeeper_client.rb', line 55 def partitions_being_reassigned @utils.get_partitions_being_reassigned end |
#partitions_for_topics(topics) ⇒ Object
47 48 49 |
# File 'lib/ktl/zookeeper_client.rb', line 47 def partitions_for_topics(topics) request(:get_partitions_for_topics, topics) end |
#raw_client ⇒ Object
23 24 25 |
# File 'lib/ktl/zookeeper_client.rb', line 23 def raw_client @utils end |
#read_data(path) ⇒ Object
75 76 77 |
# File 'lib/ktl/zookeeper_client.rb', line 75 def read_data(path) @utils.read_data(path) end |
#reassign_partitions(json) ⇒ Object
59 60 61 |
# File 'lib/ktl/zookeeper_client.rb', line 59 def reassign_partitions(json) @utils.create_persistent_path(@utils.class.reassign_partitions_path, json, no_acl) end |
#replica_assignment_for_topics(topics) ⇒ Object
51 52 53 |
# File 'lib/ktl/zookeeper_client.rb', line 51 def replica_assignment_for_topics(topics) request(:get_replica_assignment_for_topics, topics) end |
#setup ⇒ Object
13 14 15 16 |
# File 'lib/ktl/zookeeper_client.rb', line 13 def setup @submit = @threadpool.java_method(:submit, [java.lang.Class.for_name('java.util.concurrent.Callable')]) self end |
#unsubscribe_data(path, listener) ⇒ Object
99 100 101 |
# File 'lib/ktl/zookeeper_client.rb', line 99 def unsubscribe_data(path, listener) zk_client.unsubscribe_data_changes(path, listener) end |
#watch_child(path, listener) ⇒ Object
95 96 97 |
# File 'lib/ktl/zookeeper_client.rb', line 95 def watch_child(path, listener) zk_client.subscribe_child_changes(path, listener) end |
#watch_data(path, listener) ⇒ Object
91 92 93 |
# File 'lib/ktl/zookeeper_client.rb', line 91 def watch_data(path, listener) zk_client.subscribe_data_changes(path, listener) end |
#watch_state(path, listener) ⇒ Object
87 88 89 |
# File 'lib/ktl/zookeeper_client.rb', line 87 def watch_state(path, listener) zk_client.subscribe_state_changes(path, listener) end |