Class: Druid::Node::Coordinator
- Inherits:
-
Object
- Object
- Druid::Node::Coordinator
- Defined in:
- lib/druid/node/coordinator.rb
Constant Summary collapse
- DATASOURCES_PATH =
'/druid/coordinator/v1/datasources/'.freeze
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#zk ⇒ Object
readonly
Returns the value of attribute zk.
Instance Method Summary collapse
-
#connection ⇒ Object
TODO: DRY; copy/paste from broker.
-
#datasource_enabled?(datasource_name) ⇒ Boolean
TODO: This should either be private or moved to datasource.
-
#datasource_has_segments?(datasource_name) ⇒ Boolean
TODO: This should either be private or moved to datasource.
- #datasource_info(datasource_name) ⇒ Object
- #disable_datasource(datasource_name) ⇒ Object
- #disable_segment(datasource_name, segment) ⇒ Object
-
#disable_segments(datasource_name) ⇒ Object
TODO: This should either be private or moved to datasource.
-
#initialize(config, zk) ⇒ Coordinator
constructor
A new instance of Coordinator.
- #issue_kill_task(datasource_name, interval) ⇒ Object
- #list_datasources(url_params = {}) ⇒ Object
- #list_segments(datasource_name) ⇒ Object
Constructor Details
#initialize(config, zk) ⇒ Coordinator
Returns a new instance of Coordinator.
7 8 9 10 |
# File 'lib/druid/node/coordinator.rb', line 7 def initialize(config, zk) @config = config @zk = zk end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
6 7 8 |
# File 'lib/druid/node/coordinator.rb', line 6 def config @config end |
#zk ⇒ Object (readonly)
Returns the value of attribute zk.
6 7 8 |
# File 'lib/druid/node/coordinator.rb', line 6 def zk @zk end |
Instance Method Details
#connection ⇒ Object
TODO: DRY; copy/paste from broker
13 14 15 16 17 18 |
# File 'lib/druid/node/coordinator.rb', line 13 def connection coordinator = zk.registry["#{config.discovery_path}/druid:coordinator"].first raise Druid::ConnectionError, 'no druid coordinators available' if coordinator.nil? zk.registry["#{config.discovery_path}/druid:coordinator"].rotate! # round-robin load balancing Druid::Connection.new(host: coordinator[:host], port: coordinator[:port]) end |
#datasource_enabled?(datasource_name) ⇒ Boolean
TODO: This should either be private or moved to datasource
38 39 40 |
# File 'lib/druid/node/coordinator.rb', line 38 def datasource_enabled?(datasource_name) list_datasources.include? datasource_name end |
#datasource_has_segments?(datasource_name) ⇒ Boolean
TODO: This should either be private or moved to datasource
43 44 45 |
# File 'lib/druid/node/coordinator.rb', line 43 def datasource_has_segments?(datasource_name) list_segments(datasource_name).any? end |
#datasource_info(datasource_name) ⇒ Object
20 21 22 23 24 |
# File 'lib/druid/node/coordinator.rb', line 20 def datasource_info(datasource_name) response = connection.get(DATASOURCES_PATH + datasource_name.to_s, full: true) raise ConnectionError, 'Unable to retrieve datasource information.' unless response.code.to_i == 200 JSON.parse(response.body) end |
#disable_datasource(datasource_name) ⇒ Object
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/druid/node/coordinator.rb', line 26 def disable_datasource(datasource_name) # response = connection.delete(DATASOURCES_PATH + datasource_name.to_s) # raise ConnectionError, 'Unable to disable datasource' unless response.code.to_i == 200 # return true if response.code.to_i == 200 # This is a workaround for https://github.com/druid-io/druid/issues/3154 disable_segments(datasource_name) bounded_wait_for_segments_disable(datasource_name) true end |
#disable_segment(datasource_name, segment) ⇒ Object
47 48 49 50 51 |
# File 'lib/druid/node/coordinator.rb', line 47 def disable_segment(datasource_name, segment) response = connection.delete(DATASOURCES_PATH + datasource_name + '/segments/' + segment) raise ConnectionError, "Unable to disable #{segment}" unless response.code.to_i == 200 true end |
#disable_segments(datasource_name) ⇒ Object
TODO: This should either be private or moved to datasource
54 55 56 57 |
# File 'lib/druid/node/coordinator.rb', line 54 def disable_segments(datasource_name) segments = list_segments(datasource_name) segments.each{ |segment| disable_segment(datasource_name, segment) } end |
#issue_kill_task(datasource_name, interval) ⇒ Object
59 60 61 62 63 |
# File 'lib/druid/node/coordinator.rb', line 59 def issue_kill_task(datasource_name, interval) response = connection.delete(DATASOURCES_PATH + datasource_name + '/intervals/' + interval) raise ConnectionError, 'Unable to issue kill task.' unless response.code.to_i == 200 true end |
#list_datasources(url_params = {}) ⇒ Object
65 66 67 68 |
# File 'lib/druid/node/coordinator.rb', line 65 def list_datasources(url_params = {}) response = connection.get(DATASOURCES_PATH, url_params) JSON.parse(response.body) if response.code.to_i == 200 end |
#list_segments(datasource_name) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/druid/node/coordinator.rb', line 70 def list_segments(datasource_name) response = connection.get(DATASOURCES_PATH + datasource_name + '/segments', full: true) case response.code.to_i when 200 JSON.parse(response.body).map{ |segment| segment['identifier'] } when 204 [] else raise ConnectionError, "Unable to list segments for #{datasource_name}" end end |