Class: KafkaRest::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka_rest/client.rb

Constant Summary collapse

DEFAULT_URL =
'http://localhost:8080'.freeze
BROKERS_PATH =
'/brokers'.freeze
TOPICS_PATH =
'/topics'.freeze
CONTENT_JSON =
'application/json'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url: DEFAULT_URL) ⇒ Client

Returns a new instance of Client.



13
14
15
16
17
18
# File 'lib/kafka_rest/client.rb', line 13

def initialize(url: DEFAULT_URL)
  @url = url
  @brokers = []
  @topics = {}
  @consumers = {}
end

Instance Attribute Details

#brokersObject (readonly)

Returns the value of attribute brokers.



11
12
13
# File 'lib/kafka_rest/client.rb', line 11

def brokers
  @brokers
end

#consumersObject (readonly)

Returns the value of attribute consumers.



11
12
13
# File 'lib/kafka_rest/client.rb', line 11

def consumers
  @consumers
end

#topicsObject (readonly)

Returns the value of attribute topics.



11
12
13
# File 'lib/kafka_rest/client.rb', line 11

def topics
  @topics
end

#urlObject (readonly)

Returns the value of attribute url.



11
12
13
# File 'lib/kafka_rest/client.rb', line 11

def url
  @url
end

Instance Method Details

#consumer(group, &block) ⇒ Object



37
38
39
# File 'lib/kafka_rest/client.rb', line 37

def consumer(group, &block)
  @consumers[group] ||= Consumer.new(self, group)
end

#list_brokersObject



20
21
22
23
24
# File 'lib/kafka_rest/client.rb', line 20

def list_brokers
  request(BROKERS_PATH).fetch('brokers'.freeze, []).map do |id|
    KafkaRest::Broker.new(self, id)
  end.tap { |b| @brokers = b }
end

#list_topicsObject



26
27
28
29
30
# File 'lib/kafka_rest/client.rb', line 26

def list_topics
  request(TOPICS_PATH).map do |name|
    @topics[name] = KafkaRest::Topic.new(self, name)
  end
end

#post(path, body = nil, schema = nil, raw_response = false) ⇒ Object



63
64
65
66
67
68
69
# File 'lib/kafka_rest/client.rb', line 63

def post(path, body = nil, schema = nil, raw_response = false)
  raw = nil
  res = request(path, verb: Net::HTTP::Post, body: body, schema: schema) do |resp|
    raw = resp
  end
  raw_response ? raw : res
end

#request(path, verb: Net::HTTP::Get, body: nil, schema: nil, &block) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/kafka_rest/client.rb', line 41

def request(path, verb: Net::HTTP::Get, body: nil, schema: nil, &block)
  uri = URI.parse(path)
  uri = URI.parse(url + path) unless uri.absolute?

  Net::HTTP.start(uri.host, uri.port) do |http|
    req = verb.new(uri)
    req['User-Agent'.freeze] = user_agent
    req['Accept'.freeze] = CONTENT_JSON

    unless verb.is_a? Net::HTTP::Post
      req['Content-Type'.freeze] = schema ? schema.content_type : CONTENT_JSON
      req.body = body.to_json
      KafkaRest.logger.info { "Post body: #{req.body}" }
    end

    res = http.request(req)
    yield res if block_given?

    JSON.parse(res.body.to_s)
  end
end

#topic(name, schema = nil) ⇒ Object Also known as: []



32
33
34
# File 'lib/kafka_rest/client.rb', line 32

def topic(name, schema = nil)
  @topics[name] ||= KafkaRest::Topic.new(self, name, EMPTY_STRING, schema)
end