Class: Avro2Kafka
- Inherits:
-
Object
show all
- Defined in:
- lib/avro2kafka.rb,
lib/avro2kafka/version.rb,
lib/avro2kafka/avro_reader.rb,
lib/avro2kafka/kafka_publisher.rb
Defined Under Namespace
Classes: AvroReader, KafkaPublisher
Constant Summary
collapse
- VERSION =
"0.4.1"
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(options) ⇒ Avro2Kafka
Returns a new instance of Avro2Kafka.
14
15
16
17
|
# File 'lib/avro2kafka.rb', line 14
def initialize(options)
@path = ARGV.first
@options = options
end
|
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
8
9
10
|
# File 'lib/avro2kafka.rb', line 8
def options
@options
end
|
Class Method Details
.logger ⇒ Object
10
11
12
|
# File 'lib/avro2kafka.rb', line 10
def self.logger
@logger ||= Logr::Logger.new('avro2kafka')
end
|
Instance Method Details
#broker_list ⇒ Object
45
46
47
|
# File 'lib/avro2kafka.rb', line 45
def broker_list
options.fetch(:broker_list).split(',')
end
|
49
50
51
52
53
54
|
# File 'lib/avro2kafka.rb', line 49
def
options.fetch(:data, []).each_with_object({}) do |data, hash|
key, value = data.split('=')
hash[key] = value
end
end
|
#filename ⇒ Object
37
38
39
|
# File 'lib/avro2kafka.rb', line 37
def filename
File.basename(@path)
end
|
#kafka_options ⇒ Object
56
57
58
59
60
61
62
63
|
# File 'lib/avro2kafka.rb', line 56
def kafka_options
{
broker_list: broker_list,
topic: topic,
data: ,
keys: options.fetch(:key, '').split(',').map(&:strip),
}
end
|
#publish ⇒ Object
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/avro2kafka.rb', line 23
def publish
Avro2Kafka.logger.event('started_publishing', { filename: filename, topic: topic }.merge())
.monitored("Started publishing #{filename}", "Started publishing #{filename} to the #{topic} Kafka topic.")
.info("Started publishing #{filename}")
records = AvroReader.new(reader).read
KafkaPublisher.new(**kafka_options).publish(records)
Avro2Kafka.logger.event('finished_publishing', { filename: filename, topic: topic }.merge())
.monitored("Finished publishing #{filename}", "Finished publishing #{filename} to the #{topic} Kafka topic.")
.metric('lines_processed', records.count)
.info("Finished publishing #{filename}")
end
|
#reader ⇒ Object
19
20
21
|
# File 'lib/avro2kafka.rb', line 19
def reader
ARGF.tap { |argf| argf.rewind }
end
|
#topic ⇒ Object
41
42
43
|
# File 'lib/avro2kafka.rb', line 41
def topic
options.fetch(:topic)
end
|