Class: Kafka::Producer::DeliveryReport
- Inherits:
-
Object
- Object
- Kafka::Producer::DeliveryReport
- Defined in:
- lib/kafka/producer/delivery_report.rb
Instance Attribute Summary collapse
- #error ⇒ nil, Kafka::ResponseError readonly
- #offset ⇒ nil, Integer readonly
- #partition ⇒ nil, Integer readonly
- #topic ⇒ nil, String readonly
Instance Method Summary collapse
-
#done(message) ⇒ Object
Set the response based on the message and notify anyone waiting on the result.
-
#error? ⇒ Boolean
Is the report for an error?.
-
#initialize(&block) ⇒ DeliveryReport
constructor
Initializes a new DeliveryReport.
-
#received? ⇒ Boolean
Returns true when the report has been received back from the kafka cluster.
-
#successful? ⇒ Boolean
Returns if the delivery was successful.
-
#wait(timeout: 5000) ⇒ Object
Wait for a report to be received for the delivery from the cluster.
Constructor Details
#initialize(&block) ⇒ DeliveryReport
Initializes a new DeliveryReport
26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/kafka/producer/delivery_report.rb', line 26 def initialize(&block) @mutex = Mutex.new @waiter = ConditionVariable.new @error = nil @topic = nil @offset = nil @partition = nil @callback = block # Will be set to true by a call to #done. Fast out for any callers to # #wait that may come in after done has already been called. @done = false end |
Instance Attribute Details
#error ⇒ nil, Kafka::ResponseError (readonly)
8 9 10 |
# File 'lib/kafka/producer/delivery_report.rb', line 8 def error @error end |
#offset ⇒ nil, Integer (readonly)
16 17 18 |
# File 'lib/kafka/producer/delivery_report.rb', line 16 def offset @offset end |
#partition ⇒ nil, Integer (readonly)
20 21 22 |
# File 'lib/kafka/producer/delivery_report.rb', line 20 def partition @partition end |
#topic ⇒ nil, String (readonly)
12 13 14 |
# File 'lib/kafka/producer/delivery_report.rb', line 12 def topic @topic end |
Instance Method Details
#done(message) ⇒ Object
Set the response based on the message and notify anyone waiting on the result.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/kafka/producer/delivery_report.rb', line 69 def done() @mutex.synchronize do @error = .error @offset = .offset @topic = .topic @partition = .partition @done = true @waiter.broadcast remove_instance_variable(:@mutex) remove_instance_variable(:@waiter) end if @callback @callback.call(self) end end |
#error? ⇒ Boolean
Returns Is the report for an error?.
51 52 53 |
# File 'lib/kafka/producer/delivery_report.rb', line 51 def error? error.nil? end |
#received? ⇒ Boolean
Returns true when the report has been received back from the kafka cluster.
46 47 48 |
# File 'lib/kafka/producer/delivery_report.rb', line 46 def received? @done end |
#successful? ⇒ Boolean
Returns if the delivery was successful
59 60 61 |
# File 'lib/kafka/producer/delivery_report.rb', line 59 def successful? !error end |
#wait(timeout: 5000) ⇒ Object
Wait for a report to be received for the delivery from the cluster.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/kafka/producer/delivery_report.rb', line 95 def wait(timeout: 5000) # Fast out since the delivery report has already been reported back from # the cluster. if @done return end @mutex.synchronize do # Convert from milliseconds to seconds to match Ruby's API. Takes # milliseconds to be consistent with librdkafka APIs. if timeout timeout /= 1000.0 end @waiter.wait(@mutex, timeout) # No report was received for the message before we timed out waiting. if !@done raise ::Kafka::ResponseError, ::Kafka::FFI::RD_KAFKA_RESP_ERR__TIMED_OUT end end nil end |