Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/kafka/consumer.rb,
lib/kafka/consumer/message.rb,
lib/kafka/consumer/version.rb,
lib/kafka/consumer/partition_consumer.rb

Defined Under Namespace

Classes: Message, PartitionConsumer

Constant Summary collapse

VERSION =
"0.1.2"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, subscription, zookeeper: nil, max_wait_ms: 200, initial_offset: :latest_offset, logger: nil) ⇒ Consumer

Returns a new instance of Consumer.

Raises:

  • (ArgumentError)


19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/kafka/consumer.rb', line 19

def initialize(name, subscription, zookeeper: nil, max_wait_ms: 200, initial_offset: :latest_offset, logger: nil)
  raise ArgumentError, "The consumer's name cannot be empty" if name.nil? || name.empty?
  raise ArgumentError, "You have to specify a zookeeper connection string" if zookeeper.nil? || zookeeper.empty?

  @name = name
  @max_wait_ms, @initial_offset = max_wait_ms, initial_offset
  @logger = logger || Logger.new($stdout)

  @cluster = Kazoo::Cluster.new(zookeeper)
  @group = Kazoo::Consumergroup.new(@cluster, name)

  @group.create unless @group.exists?
  @instance = @group.instantiate(subscription: Kazoo::Subscription.build(subscription)).register
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



14
15
16
# File 'lib/kafka/consumer.rb', line 14

def cluster
  @cluster
end

#groupObject (readonly)

Returns the value of attribute group.



14
15
16
# File 'lib/kafka/consumer.rb', line 14

def group
  @group
end

#initial_offsetObject (readonly)

Returns the value of attribute initial_offset.



14
15
16
# File 'lib/kafka/consumer.rb', line 14

def initial_offset
  @initial_offset
end

#instanceObject (readonly)

Returns the value of attribute instance.



14
15
16
# File 'lib/kafka/consumer.rb', line 14

def instance
  @instance
end

#loggerObject (readonly)

Returns the value of attribute logger.



14
15
16
# File 'lib/kafka/consumer.rb', line 14

def logger
  @logger
end

#max_wait_msObject (readonly)

Returns the value of attribute max_wait_ms.



14
15
16
# File 'lib/kafka/consumer.rb', line 14

def max_wait_ms
  @max_wait_ms
end

#subscriptionObject (readonly)

Returns the value of attribute subscription.



14
15
16
# File 'lib/kafka/consumer.rb', line 14

def subscription
  @subscription
end

Class Method Details

.distribute_partitions(instances, partitions) ⇒ Object



94
95
96
97
98
99
100
101
102
# File 'lib/kafka/consumer.rb', line 94

def self.distribute_partitions(instances, partitions)
  return {} if instances.empty?
  partitions_per_instance = partitions.length.to_f / instances.length.to_f

  partitions.group_by.with_index do |partition, index|
    instance_index = index.fdiv(partitions_per_instance).floor
    instances[instance_index]
  end
end

Instance Method Details

#dead?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/kafka/consumer.rb', line 75

def dead?
  @consumer_manager.status == false
end

#each(&block) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/kafka/consumer.rb', line 79

def each(&block)
  mutex = Mutex.new

  handler = lambda do |message|
    mutex.synchronize { block.call(message) }
  end

  @consumer_manager = Thread.new do
    Thread.current.abort_on_exception = true
    manage_partition_consumers(handler)
  end

  wait
end

#idObject



38
39
40
# File 'lib/kafka/consumer.rb', line 38

def id
  instance.id
end

#interruptObject



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/kafka/consumer.rb', line 50

def interrupt
  Thread.new do
    Thread.current.abort_on_exception = true

    logger.info "Stopping partition consumers..."
    @consumer_manager[:interrupted] = true

    # Make sure to wake up the manager thread, so it can shut down
    continue
  end
end

#interrupted?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/kafka/consumer.rb', line 62

def interrupted?
  @consumer_manager[:interrupted]
end

#nameObject



34
35
36
# File 'lib/kafka/consumer.rb', line 34

def name
  group.name
end

#partitionsObject



46
47
48
# File 'lib/kafka/consumer.rb', line 46

def partitions
  subscription.partitions(@cluster).sort_by { |partition| [partition.preferred_leader.id, partition.topic.name, partition.id] }
end

#stopObject



66
67
68
69
# File 'lib/kafka/consumer.rb', line 66

def stop
  interrupt
  wait
end

#waitObject



71
72
73
# File 'lib/kafka/consumer.rb', line 71

def wait
  @consumer_manager.join if @consumer_manager.alive?
end