Class: Kafka::Producer::DeliveryReport

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/producer/delivery_report.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ DeliveryReport

Initializes a new DeliveryReport



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/kafka/producer/delivery_report.rb', line 37

def initialize(&block)
  @mutex = Mutex.new
  @waiter = ConditionVariable.new

  @error = nil
  @topic = nil
  @offset = nil
  @latency = nil
  @partition = nil
  @callback = block

  # Will be set to true by a call to #done. Fast out for any callers to
  # #wait that may come in after done has already been called.
  @done = false
end

Instance Attribute Details

#errornil, Kafka::ResponseError (readonly)



8
9
10
# File 'lib/kafka/producer/delivery_report.rb', line 8

def error
  @error
end

#latencynil, Integer (readonly)

Note:

Latency is in microseconds (μs) while most other timestamps are in milliseconds.

Returns the number of microseconds since the message was enqueued for delivery until the message was confirmed by the cluster or permanently failed.



31
32
33
# File 'lib/kafka/producer/delivery_report.rb', line 31

def latency
  @latency
end

#offsetnil, Integer (readonly)



16
17
18
# File 'lib/kafka/producer/delivery_report.rb', line 16

def offset
  @offset
end

#partitionnil, Integer (readonly)



20
21
22
# File 'lib/kafka/producer/delivery_report.rb', line 20

def partition
  @partition
end

#topicnil, String (readonly)



12
13
14
# File 'lib/kafka/producer/delivery_report.rb', line 12

def topic
  @topic
end

Instance Method Details

#done(message) ⇒ Object

Set the response based on the message and notify anyone waiting on the result.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/kafka/producer/delivery_report.rb', line 85

def done(message)
  @mutex.synchronize do
    @error = message.error

    @offset = message.offset
    @topic = message.topic
    @partition = message.partition
    @latency = message.latency

    @done = true
    @waiter.broadcast

    remove_instance_variable(:@mutex)
    remove_instance_variable(:@waiter)
  end

  if @callback
    @callback.call(self)
  end

  nil
end

#error?Boolean

Returns if the delivery errored

See Also:

  • #see


67
68
69
# File 'lib/kafka/producer/delivery_report.rb', line 67

def error?
  received? && !successful?
end

#received?Boolean

Returns true when the report has been received back from the kafka cluster.



58
59
60
# File 'lib/kafka/producer/delivery_report.rb', line 58

def received?
  @done
end

#successful?Boolean

Returns if the delivery was successful



75
76
77
# File 'lib/kafka/producer/delivery_report.rb', line 75

def successful?
  received? && error.nil?
end

#wait(timeout: 5000) ⇒ Object

Wait for a report to be received for the delivery from the cluster.

Raises:



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/kafka/producer/delivery_report.rb', line 114

def wait(timeout: 5000)
  # Fast out since the delivery report has already been reported back from
  # the cluster.
  if @done
    return
  end

  @mutex.synchronize do
    # Convert from milliseconds to seconds to match Ruby's API. Takes
    # milliseconds to be consistent with librdkafka APIs.
    if timeout
      timeout /= 1000.0
    end

    @waiter.wait(@mutex, timeout)

    # No report was received for the message before we timed out waiting.
    if !@done
      raise ::Kafka::ResponseError, ::Kafka::FFI::RD_KAFKA_RESP_ERR__TIMED_OUT
    end
  end

  nil
end