Class: Kafka::Producer::DeliveryReport

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/producer/delivery_report.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ DeliveryReport

Initializes a new DeliveryReport

Parameters:

  • block (Proc)

    Callback to call with the DeliveryReport when it is received from the cluster.


26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/kafka/producer/delivery_report.rb', line 26

def initialize(&block)
  @mutex = Mutex.new
  @waiter = ConditionVariable.new

  @error = nil
  @topic = nil
  @offset = nil
  @partition = nil
  @callback = block

  # Will be set to true by a call to #done. Fast out for any callers to
  # #wait that may come in after done has already been called.
  @done = false
end

Instance Attribute Details

#errornil, Kafka::ResponseError (readonly)

Returns:

  • (nil)

    Delivery was successful or report has not been received yet.

  • (Kafka::ResponseError)

    Error delivering the Message.


8
9
10
# File 'lib/kafka/producer/delivery_report.rb', line 8

def error
  @error
end

#offsetnil, Integer (readonly)

Returns:

  • (nil)

    Report has not been received yet

  • (Integer)

    Offset for the message on partition.


16
17
18
# File 'lib/kafka/producer/delivery_report.rb', line 16

def offset
  @offset
end

#partitionnil, Integer (readonly)

Returns:

  • (nil)

    Report has not been received yet

  • (Integer)

    Partition the message was delivered to.


20
21
22
# File 'lib/kafka/producer/delivery_report.rb', line 20

def partition
  @partition
end

#topicnil, String (readonly)

Returns:

  • (nil)

    Report has not been received yet

  • (String)

    Name of the topic Message was delivered to.


12
13
14
# File 'lib/kafka/producer/delivery_report.rb', line 12

def topic
  @topic
end

Instance Method Details

#done(message) ⇒ Object

Set the response based on the message and notify anyone waiting on the result.

Parameters:


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/kafka/producer/delivery_report.rb', line 69

def done(message)
  @mutex.synchronize do
    @error = message.error

    @offset = message.offset
    @topic = message.topic
    @partition = message.partition

    @done = true
    @waiter.broadcast

    remove_instance_variable(:@mutex)
    remove_instance_variable(:@waiter)
  end

  if @callback
    @callback.call(self)
  end
end

#error?Boolean

Returns Is the report for an error?.

Returns:

  • (Boolean)

    Is the report for an error?


51
52
53
# File 'lib/kafka/producer/delivery_report.rb', line 51

def error?
  error.nil?
end

#received?Boolean

Returns true when the report has been received back from the kafka cluster.

Returns:

  • (Boolean)

    True when the server has reported back on the delivery.


46
47
48
# File 'lib/kafka/producer/delivery_report.rb', line 46

def received?
  @done
end

#successful?Boolean

Returns if the delivery was successful

Returns:

  • (Boolean)

    True when the report was delivered to the cluster successfully.


59
60
61
# File 'lib/kafka/producer/delivery_report.rb', line 59

def successful?
  !error
end

#wait(timeout: 5000) ⇒ Object

Wait for a report to be received for the delivery from the cluster.

Parameters:

  • timeout (Integer) (defaults to: 5000)

    Maximum time to wait in milliseconds.

Raises:


95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/kafka/producer/delivery_report.rb', line 95

def wait(timeout: 5000)
  # Fast out since the delivery report has already been reported back from
  # the cluster.
  if @done
    return
  end

  @mutex.synchronize do
    # Convert from milliseconds to seconds to match Ruby's API. Takes
    # milliseconds to be consistent with librdkafka APIs.
    if timeout
      timeout /= 1000.0
    end

    @waiter.wait(@mutex, timeout)

    # No report was received for the message before we timed out waiting.
    if !@done
      raise ::Kafka::ResponseError, ::Kafka::FFI::RD_KAFKA_RESP_ERR__TIMED_OUT
    end
  end

  nil
end