Class: Druid::Node::Coordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/druid/node/coordinator.rb

Constant Summary collapse

DATASOURCES_PATH =
'/druid/coordinator/v1/datasources/'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, zk) ⇒ Coordinator

Returns a new instance of Coordinator.



7
8
9
10
# File 'lib/druid/node/coordinator.rb', line 7

def initialize(config, zk)
  @config = config
  @zk = zk
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



6
7
8
# File 'lib/druid/node/coordinator.rb', line 6

def config
  @config
end

#zkObject (readonly)

Returns the value of attribute zk.



6
7
8
# File 'lib/druid/node/coordinator.rb', line 6

def zk
  @zk
end

Instance Method Details

#connectionObject

TODO: DRY; copy/paste from broker



13
14
15
16
17
18
# File 'lib/druid/node/coordinator.rb', line 13

def connection
  coordinator = zk.registry["#{config.discovery_path}/druid:coordinator"].first
  raise Druid::ConnectionError, 'no druid coordinators available' if coordinator.nil?
  zk.registry["#{config.discovery_path}/druid:coordinator"].rotate! # round-robin load balancing
  Druid::Connection.new(host: coordinator[:host], port: coordinator[:port])
end

#datasource_enabled?(datasource_name) ⇒ Boolean

TODO: This should either be private or moved to datasource

Returns:

  • (Boolean)


38
39
40
# File 'lib/druid/node/coordinator.rb', line 38

def datasource_enabled?(datasource_name)
  list_datasources.include? datasource_name
end

#datasource_has_segments?(datasource_name) ⇒ Boolean

TODO: This should either be private or moved to datasource

Returns:

  • (Boolean)


43
44
45
# File 'lib/druid/node/coordinator.rb', line 43

def datasource_has_segments?(datasource_name)
  list_segments(datasource_name).any?
end

#datasource_info(datasource_name) ⇒ Object

Raises:



20
21
22
23
24
# File 'lib/druid/node/coordinator.rb', line 20

def datasource_info(datasource_name)
  response = connection.get(DATASOURCES_PATH + datasource_name.to_s, full: true)
  raise ConnectionError, 'Unable to retrieve datasource information.' unless response.code.to_i == 200
  JSON.parse(response.body)
end

#disable_datasource(datasource_name) ⇒ Object



26
27
28
29
30
31
32
33
34
35
# File 'lib/druid/node/coordinator.rb', line 26

def disable_datasource(datasource_name)
  # response = connection.delete(DATASOURCES_PATH + datasource_name.to_s)
  # raise ConnectionError, 'Unable to disable datasource' unless response.code.to_i == 200
  # return true if response.code.to_i == 200

  # This is a workaround for https://github.com/druid-io/druid/issues/3154
  disable_segments(datasource_name)
  bounded_wait_for_segments_disable(datasource_name)
  true
end

#disable_segment(datasource_name, segment) ⇒ Object

Raises:



47
48
49
50
51
# File 'lib/druid/node/coordinator.rb', line 47

def disable_segment(datasource_name, segment)
  response = connection.delete(DATASOURCES_PATH + datasource_name + '/segments/' + segment)
  raise ConnectionError, "Unable to disable #{segment}" unless response.code.to_i == 200
  true
end

#disable_segments(datasource_name) ⇒ Object

TODO: This should either be private or moved to datasource



54
55
56
57
# File 'lib/druid/node/coordinator.rb', line 54

def disable_segments(datasource_name)
  segments = list_segments(datasource_name)
  segments.each{ |segment| disable_segment(datasource_name, segment) }
end

#issue_kill_task(datasource_name, interval) ⇒ Object

Raises:



59
60
61
62
63
# File 'lib/druid/node/coordinator.rb', line 59

def issue_kill_task(datasource_name, interval)
  response = connection.delete(DATASOURCES_PATH + datasource_name + '/intervals/' + interval)
  raise ConnectionError, 'Unable to issue kill task.' unless response.code.to_i == 200
  true
end

#list_datasources(url_params = {}) ⇒ Object



65
66
67
68
# File 'lib/druid/node/coordinator.rb', line 65

def list_datasources(url_params = {})
  response = connection.get(DATASOURCES_PATH, url_params)
  JSON.parse(response.body) if response.code.to_i == 200
end

#list_segments(datasource_name) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/druid/node/coordinator.rb', line 70

def list_segments(datasource_name)
  response = connection.get(DATASOURCES_PATH + datasource_name + '/segments', full: true)
  case response.code.to_i
  when 200
    JSON.parse(response.body).map{ |segment| segment['identifier'] }
  when 204
    []
  else
    raise ConnectionError, "Unable to list segments for #{datasource_name}"
  end
end