Class: KafkaRest::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka_rest/topic.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, name, raw = EMPTY_STRING, schema = nil) ⇒ Topic

Returns a new instance of Topic.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/kafka_rest/topic.rb', line 7

def initialize(client, name, raw = EMPTY_STRING, schema = nil)
  @client = client
  @name = name
  @schema = schema
  @raw = raw
  @partitions = []

  @retry_count = 3
  @running = true
  @queue = Queue.new
  @cond = ConditionVariable.new
  @mutex = Mutex.new

  @thread = thread_start
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



5
6
7
# File 'lib/kafka_rest/topic.rb', line 5

def client
  @client
end

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/kafka_rest/topic.rb', line 5

def name
  @name
end

#partitionsObject (readonly)

Returns the value of attribute partitions.



5
6
7
# File 'lib/kafka_rest/topic.rb', line 5

def partitions
  @partitions
end

#rawObject (readonly)

Returns the value of attribute raw.



5
6
7
# File 'lib/kafka_rest/topic.rb', line 5

def raw
  @raw
end

#schemaObject (readonly)

Returns the value of attribute schema.



5
6
7
# File 'lib/kafka_rest/topic.rb', line 5

def schema
  @schema
end

Instance Method Details

#getObject



23
24
25
# File 'lib/kafka_rest/topic.rb', line 23

def get
  client.request(topic_path).tap { |res| @raw = res }
end

#list_partitionsObject



32
33
34
35
36
# File 'lib/kafka_rest/topic.rb', line 32

def list_partitions
  client.request(partitions_path).map do |raw|
    Partition.new(client, self, raw['partition'], raw)
  end.tap { |p| @partitions = p }
end

#partition(id) ⇒ Object Also known as: []



27
28
29
# File 'lib/kafka_rest/topic.rb', line 27

def partition(id)
  partitions[id] ||= Partition.new(client, self, id)
end

#produce(*messages) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/kafka_rest/topic.rb', line 38

def produce(*messages)
  payload = { records: format(messages) }

  if schema
    if schema.id
      payload[:value_schema_id] = schema.id
    else
      payload[:value_schema] = schema.serialized
    end
  end

  res = client.post(topic_path, payload, schema, true)

  if schema && schema_id = JSON.parse(res.body.to_s)['value_schema_id']
    schema.update_id(schema_id)
  end

  res
end

#produce_async(*messages) ⇒ Object



58
59
60
61
# File 'lib/kafka_rest/topic.rb', line 58

def produce_async(*messages)
  @queue << format(messages)
  @cond.signal
end

#to_sObject



63
64
65
# File 'lib/kafka_rest/topic.rb', line 63

def to_s
  "Topic{name=#{name}}".freeze
end