Class: Druid::Node::Overlord
- Inherits:
-
Object
- Object
- Druid::Node::Overlord
- Defined in:
- lib/druid/node/overlord.rb
Constant Summary collapse
- INDEXER_PATH =
'/druid/indexer/v1/'.freeze
- RUNNING_TASKS_PATH =
(INDEXER_PATH + 'runningTasks').freeze
- TASK_PATH =
INDEXER_PATH + 'task/'
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#zk ⇒ Object
readonly
Returns the value of attribute zk.
Instance Method Summary collapse
-
#connection ⇒ Object
TODO: DRY: copy/paste.
-
#initialize(config, zk) ⇒ Overlord
constructor
A new instance of Overlord.
- #running_tasks(datasource_name = nil) ⇒ Object
- #shutdown_task(task) ⇒ Object
- #shutdown_tasks(datasource_name = nil) ⇒ Object
Constructor Details
#initialize(config, zk) ⇒ Overlord
Returns a new instance of Overlord.
9 10 11 12 |
# File 'lib/druid/node/overlord.rb', line 9 def initialize(config, zk) @config = config @zk = zk end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
8 9 10 |
# File 'lib/druid/node/overlord.rb', line 8 def config @config end |
#zk ⇒ Object (readonly)
Returns the value of attribute zk.
8 9 10 |
# File 'lib/druid/node/overlord.rb', line 8 def zk @zk end |
Instance Method Details
#connection ⇒ Object
TODO: DRY: copy/paste
15 16 17 18 19 20 |
# File 'lib/druid/node/overlord.rb', line 15 def connection overlord = zk.registry["#{config.discovery_path}/druid:overlord"].first raise Druid::ConnectionError, 'no druid overlords available' if overlord.nil? zk.registry["#{config.discovery_path}/druid:overlord"].rotate! # round-robin load balancing Druid::Connection.new(host: overlord[:host], port: overlord[:port]) end |
#running_tasks(datasource_name = nil) ⇒ Object
22 23 24 25 26 27 28 |
# File 'lib/druid/node/overlord.rb', line 22 def running_tasks(datasource_name = nil) response = connection.get(RUNNING_TASKS_PATH) raise ConnectionError, 'Could not retrieve running tasks' unless response.code.to_i == 200 tasks = JSON.parse(response.body).map{|task| task['id']} tasks.select!{ |task| task.include? datasource_name } if datasource_name tasks ? tasks : [] end |
#shutdown_task(task) ⇒ Object
30 31 32 33 34 |
# File 'lib/druid/node/overlord.rb', line 30 def shutdown_task(task) response = connection.post(TASK_PATH + task + '/shutdown') raise ConnectionError, 'Unable to shutdown task' unless response.code.to_i == 200 bounded_wait_for_shutdown(task) end |
#shutdown_tasks(datasource_name = nil) ⇒ Object
36 37 38 39 |
# File 'lib/druid/node/overlord.rb', line 36 def shutdown_tasks(datasource_name = nil) tasks = running_tasks(datasource_name) tasks.each{|task| shutdown_task(task)} end |