Class: KafkaRest::Topic
- Inherits:
-
Object
- Object
- KafkaRest::Topic
- Defined in:
- lib/kafka_rest/topic.rb
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#partitions ⇒ Object
readonly
Returns the value of attribute partitions.
-
#raw ⇒ Object
readonly
Returns the value of attribute raw.
-
#schema ⇒ Object
readonly
Returns the value of attribute schema.
Instance Method Summary collapse
- #get ⇒ Object
-
#initialize(client, name, raw = EMPTY_STRING, schema = nil) ⇒ Topic
constructor
A new instance of Topic.
- #list_partitions ⇒ Object
- #partition(id) ⇒ Object (also: #[])
- #produce(*messages) ⇒ Object
- #produce_async(*messages) ⇒ Object
- #to_s ⇒ Object
Constructor Details
#initialize(client, name, raw = EMPTY_STRING, schema = nil) ⇒ Topic
Returns a new instance of Topic.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/kafka_rest/topic.rb', line 7 def initialize(client, name, raw = EMPTY_STRING, schema = nil) @client = client @name = name @schema = schema @raw = raw @partitions = [] @retry_count = 3 @running = true @queue = Queue.new @cond = ConditionVariable.new @mutex = Mutex.new @thread = thread_start end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
5 6 7 |
# File 'lib/kafka_rest/topic.rb', line 5 def client @client end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
5 6 7 |
# File 'lib/kafka_rest/topic.rb', line 5 def name @name end |
#partitions ⇒ Object (readonly)
Returns the value of attribute partitions.
5 6 7 |
# File 'lib/kafka_rest/topic.rb', line 5 def partitions @partitions end |
#raw ⇒ Object (readonly)
Returns the value of attribute raw.
5 6 7 |
# File 'lib/kafka_rest/topic.rb', line 5 def raw @raw end |
#schema ⇒ Object (readonly)
Returns the value of attribute schema.
5 6 7 |
# File 'lib/kafka_rest/topic.rb', line 5 def schema @schema end |
Instance Method Details
#get ⇒ Object
23 24 25 |
# File 'lib/kafka_rest/topic.rb', line 23 def get client.request(topic_path).tap { |res| @raw = res } end |
#list_partitions ⇒ Object
32 33 34 35 36 |
# File 'lib/kafka_rest/topic.rb', line 32 def list_partitions client.request(partitions_path).map do |raw| Partition.new(client, self, raw['partition'], raw) end.tap { |p| @partitions = p } end |
#partition(id) ⇒ Object Also known as: []
27 28 29 |
# File 'lib/kafka_rest/topic.rb', line 27 def partition(id) partitions[id] ||= Partition.new(client, self, id) end |
#produce(*messages) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/kafka_rest/topic.rb', line 38 def produce(*) payload = { records: format() } if schema if schema.id payload[:value_schema_id] = schema.id else payload[:value_schema] = schema.serialized end end res = client.post(topic_path, payload, schema, true) if schema && schema_id = JSON.parse(res.body.to_s)['value_schema_id'] schema.update_id(schema_id) end res end |
#produce_async(*messages) ⇒ Object
58 59 60 61 |
# File 'lib/kafka_rest/topic.rb', line 58 def produce_async(*) @queue << format() @cond.signal end |
#to_s ⇒ Object
63 64 65 |
# File 'lib/kafka_rest/topic.rb', line 63 def to_s "Topic{name=#{name}}".freeze end |