Class: KafkaRestClient::AvroProducer
- Inherits:
-
Object
- Object
- KafkaRestClient::AvroProducer
- Defined in:
- lib/kafka_rest_client/avro_producer.rb
Constant Summary collapse
- AVRO_AS_JSON =
'application/vnd.kafka.avro.v1+json'.freeze
Instance Method Summary collapse
- #enabled? ⇒ Boolean
-
#initialize(config = Configuration.current) ⇒ AvroProducer
constructor
A new instance of AvroProducer.
- #produce(topic, events, options = {}) ⇒ Object
Constructor Details
#initialize(config = Configuration.current) ⇒ AvroProducer
Returns a new instance of AvroProducer.
13 14 15 |
# File 'lib/kafka_rest_client/avro_producer.rb', line 13 def initialize(config = Configuration.current) @config = config end |
Instance Method Details
#enabled? ⇒ Boolean
17 18 19 20 21 22 23 |
# File 'lib/kafka_rest_client/avro_producer.rb', line 17 def enabled? if config.kafka_proxy_url && config.schema_registry_url true else false end end |
#produce(topic, events, options = {}) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/kafka_rest_client/avro_producer.rb', line 25 def produce(topic, events, = {}) if enabled? events = [events].flatten.map { |event| event_to_hash event } if [:annotate_optional_fields] serialized_events = annotate_events_optional_fields(topic, events, ) end payload = build_event_payload(topic, (serialized_events || events), ) response = post_event_to_kafka(topic, payload) config.logger.debug("Produced to Kafka topic #{topic}: #{payload.inspect}") response end end |