Class: Druid::Node::Broker
- Inherits:
-
Object
- Object
- Druid::Node::Broker
- Defined in:
- lib/druid/node/broker.rb
Constant Summary collapse
- QUERY_PATH =
'/druid/v2'.freeze
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#zk ⇒ Object
readonly
Returns the value of attribute zk.
Instance Method Summary collapse
-
#connection ⇒ Object
TODO: Would caching connections be beneficial?.
-
#initialize(config, zk) ⇒ Broker
constructor
A new instance of Broker.
- #query(query_object) ⇒ Object
Constructor Details
#initialize(config, zk) ⇒ Broker
Returns a new instance of Broker.
7 8 9 10 |
# File 'lib/druid/node/broker.rb', line 7 def initialize(config, zk) @config = config @zk = zk end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
6 7 8 |
# File 'lib/druid/node/broker.rb', line 6 def config @config end |
#zk ⇒ Object (readonly)
Returns the value of attribute zk.
6 7 8 |
# File 'lib/druid/node/broker.rb', line 6 def zk @zk end |
Instance Method Details
#connection ⇒ Object
TODO: Would caching connections be beneficial?
13 14 15 16 17 18 |
# File 'lib/druid/node/broker.rb', line 13 def connection broker = zk.registry["#{config.discovery_path}/druid:broker"].first raise Druid::ConnectionError, 'no druid brokers available' if broker.nil? zk.registry["#{config.discovery_path}/druid:broker"].rotate! # round-robin load balancing Druid::Connection.new(host: broker[:host], port: broker[:port]) end |
#query(query_object) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/druid/node/broker.rb', line 20 def query(query_object) begin response = connection.post(QUERY_PATH, query_object) rescue Druid::ConnectionError => e # TODO: This sucks, make it better (zk.registry["#{config.discovery_path}/druid:broker"].size - 1).times do response = connection.post(QUERY_PATH, query_object) break if response.code.to_i == 200 end end raise QueryError unless response.code.to_i == 200 JSON.parse(response.body) end |