Class: Kafka::Consumer::PartitionConsumer
- Inherits:
-
Object
- Object
- Kafka::Consumer::PartitionConsumer
- Defined in:
- lib/kafka/consumer/partition_consumer.rb
Instance Attribute Summary collapse
-
#commit_interval ⇒ Object
readonly
Returns the value of attribute commit_interval.
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#handler ⇒ Object
readonly
Returns the value of attribute handler.
-
#initial_offset ⇒ Object
readonly
Returns the value of attribute initial_offset.
-
#last_committed_offset ⇒ Object
readonly
Returns the value of attribute last_committed_offset.
-
#last_processed_offset ⇒ Object
readonly
Returns the value of attribute last_processed_offset.
-
#max_wait_ms ⇒ Object
readonly
Returns the value of attribute max_wait_ms.
-
#partition ⇒ Object
readonly
Returns the value of attribute partition.
Instance Method Summary collapse
- #background_committer ⇒ Object
- #claim_partition ⇒ Object
- #commit_last_offset ⇒ Object
- #continue ⇒ Object
-
#initialize(consumer, partition, handler: nil, max_wait_ms: 100, initial_offset: :latest_offset, commit_interval: 5.0) ⇒ PartitionConsumer
constructor
A new instance of PartitionConsumer.
- #interrupt ⇒ Object
- #interrupted? ⇒ Boolean
- #manage_partition_consumer ⇒ Object
- #stop ⇒ Object
- #wait ⇒ Object
Constructor Details
#initialize(consumer, partition, handler: nil, max_wait_ms: 100, initial_offset: :latest_offset, commit_interval: 5.0) ⇒ PartitionConsumer
Returns a new instance of PartitionConsumer.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 8 def initialize(consumer, partition, handler: nil, max_wait_ms: 100, initial_offset: :latest_offset, commit_interval: 5.0) @consumer, @partition, @handler = consumer, partition, handler @initial_offset, @max_wait_ms, @commit_interval = initial_offset, max_wait_ms, commit_interval @commit_mutex = Mutex.new @consumer_thread = Thread.new do Thread.current.abort_on_exception = true manage_partition_consumer end Thread.new do Thread.current.abort_on_exception = true background_committer end end |
Instance Attribute Details
#commit_interval ⇒ Object (readonly)
Returns the value of attribute commit_interval.
5 6 7 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 5 def commit_interval @commit_interval end |
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
5 6 7 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 5 def consumer @consumer end |
#handler ⇒ Object (readonly)
Returns the value of attribute handler.
5 6 7 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 5 def handler @handler end |
#initial_offset ⇒ Object (readonly)
Returns the value of attribute initial_offset.
5 6 7 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 5 def initial_offset @initial_offset end |
#last_committed_offset ⇒ Object (readonly)
Returns the value of attribute last_committed_offset.
5 6 7 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 5 def last_committed_offset @last_committed_offset end |
#last_processed_offset ⇒ Object (readonly)
Returns the value of attribute last_processed_offset.
5 6 7 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 5 def last_processed_offset @last_processed_offset end |
#max_wait_ms ⇒ Object (readonly)
Returns the value of attribute max_wait_ms.
5 6 7 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 5 def max_wait_ms @max_wait_ms end |
#partition ⇒ Object (readonly)
Returns the value of attribute partition.
5 6 7 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 5 def partition @partition end |
Instance Method Details
#background_committer ⇒ Object
79 80 81 82 83 84 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 79 def background_committer until interrupted? commit_last_offset sleep(commit_interval) end end |
#claim_partition ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 48 def claim_partition consumer.logger.info "Claiming partition #{partition.topic.name}/#{partition.id}..." begin other_instance, change = consumer.group.watch_partition_claim(partition) { continue } if other_instance.nil? consumer.instance.claim_partition(partition) elsif other_instance == consumer.instance raise Kazoo::Error, "Already claimed this partition myself. That should not happen" else consumer.logger.warn "Partition #{partition.topic.name}/#{partition.id} is still claimed by instance #{other_instance.id}. Waiting for the claim to be released..." Thread.stop unless change.completed? return false if interrupted? raise Kazoo::PartitionAlreadyClaimed end rescue Kazoo::PartitionAlreadyClaimed retry unless interrupted? end true end |
#commit_last_offset ⇒ Object
70 71 72 73 74 75 76 77 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 70 def commit_last_offset @commit_mutex.synchronize do if last_processed_offset && (last_committed_offset.nil? || last_committed_offset < last_processed_offset) consumer.group.commit_offset(partition, last_processed_offset) @last_committed_offset = last_processed_offset + 1 end end end |
#continue ⇒ Object
44 45 46 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 44 def continue @consumer_thread.run if @consumer_thread.status == 'sleep' end |
#interrupt ⇒ Object
29 30 31 32 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 29 def interrupt @consumer_thread[:interrupted] = true continue end |
#interrupted? ⇒ Boolean
34 35 36 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 34 def interrupted? @consumer_thread[:interrupted] end |
#manage_partition_consumer ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 86 def manage_partition_consumer # First, we will try to claim the partition in Zookeeper to ensure there's # only one consumer for it simultaneously. if claim_partition @last_committed_offset = consumer.group.retrieve_offset(partition) case start_offset = last_committed_offset || initial_offset when :earliest_offset, -2 consumer.logger.info "Starting consumer for #{partition.topic.name}/#{partition.id} at the earliest available offset..." when :latest_offset, -1 consumer.logger.info "Starting consumer for #{partition.topic.name}/#{partition.id} for new messages..." else consumer.logger.info "Starting consumer for #{partition.topic.name}/#{partition.id} at offset #{start_offset}..." end begin pc = Poseidon::PartitionConsumer.consumer_for_partition( consumer.group.name, consumer.cluster.brokers.values.map(&:addr), partition.topic.name, partition.id, start_offset ) until interrupted? pc.fetch(max_wait_ms: max_wait_ms).each do || = Message.new(partition.topic.name, partition.id, ) handler.call() @last_processed_offset = .offset end end rescue Poseidon::Errors::OffsetOutOfRange pc.close consumer.logger.warn "Offset #{start_offset} is no longer available for #{partition.topic.name}/#{partition.id}!" case initial_offset when :earliest_offset, -2 consumer.logger.warn "Instead, start consuming #{partition.topic.name}/#{partition.id} at the earliest available offset." when :latest_offset, -1 consumer.logger.warn "Instead, start consuming #{partition.topic.name}/#{partition.id} for new messages only." end start_offset = initial_offset retry ensure consumer.logger.debug "Stopping consumer for #{partition.topic.name}/#{partition.id}..." pc.close end commit_last_offset consumer.logger.info "Committed offset #{last_committed_offset - 1} for #{partition.topic.name}/#{partition.id}..." if last_committed_offset consumer.instance.release_partition(partition) consumer.logger.debug "Released claim for partition #{partition.topic.name}/#{partition.id}." end end |
#stop ⇒ Object
38 39 40 41 42 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 38 def stop interrupt wait consumer.logger.info "Consumer for #{partition.topic.name}/#{partition.id} stopped." end |
#wait ⇒ Object
25 26 27 |
# File 'lib/kafka/consumer/partition_consumer.rb', line 25 def wait @consumer_thread.join if @consumer_thread.alive? end |