Class: Druid::Node::Overlord

Inherits:
Object
  • Object
show all
Defined in:
lib/druid/node/overlord.rb

Constant Summary collapse

INDEXER_PATH =
'/druid/indexer/v1/'.freeze
RUNNING_TASKS_PATH =
(INDEXER_PATH + 'runningTasks').freeze
TASK_PATH =
INDEXER_PATH + 'task/'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, zk) ⇒ Overlord

Returns a new instance of Overlord.



9
10
11
12
# File 'lib/druid/node/overlord.rb', line 9

def initialize(config, zk)
  @config = config
  @zk = zk
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



8
9
10
# File 'lib/druid/node/overlord.rb', line 8

def config
  @config
end

#zkObject (readonly)

Returns the value of attribute zk.



8
9
10
# File 'lib/druid/node/overlord.rb', line 8

def zk
  @zk
end

Instance Method Details

#connectionObject

TODO: DRY: copy/paste



15
16
17
18
19
20
# File 'lib/druid/node/overlord.rb', line 15

def connection
  overlord = zk.registry["#{config.discovery_path}/druid:overlord"].first
  raise Druid::ConnectionError, 'no druid overlords available' if overlord.nil?
  zk.registry["#{config.discovery_path}/druid:overlord"].rotate! # round-robin load balancing
  Druid::Connection.new(host: overlord[:host], port: overlord[:port])
end

#running_tasks(datasource_name = nil) ⇒ Object

Raises:



22
23
24
25
26
27
28
# File 'lib/druid/node/overlord.rb', line 22

def running_tasks(datasource_name = nil)
  response = connection.get(RUNNING_TASKS_PATH)
  raise ConnectionError, 'Could not retrieve running tasks' unless response.code.to_i == 200
  tasks = JSON.parse(response.body).map{|task| task['id']}
  tasks.select!{ |task| task.include? datasource_name } if datasource_name
  tasks ? tasks : []
end

#shutdown_task(task) ⇒ Object

Raises:



30
31
32
33
34
# File 'lib/druid/node/overlord.rb', line 30

def shutdown_task(task)
  response = connection.post(TASK_PATH + task + '/shutdown')
  raise ConnectionError, 'Unable to shutdown task' unless response.code.to_i == 200
  bounded_wait_for_shutdown(task)
end

#shutdown_tasks(datasource_name = nil) ⇒ Object



36
37
38
39
# File 'lib/druid/node/overlord.rb', line 36

def shutdown_tasks(datasource_name = nil)
  tasks = running_tasks(datasource_name)
  tasks.each{|task| shutdown_task(task)}
end