KafkaRestClient 
A Ruby client to interact with Kafka REST Proxy
Current Version: 0.4.0
Supported Ruby versions: 2.0, 2.1, 2.2
Installation
Add this line to your application's Gemfile:
gem 'kafka_rest_client', git: '[email protected]:FundingCircle/kafka_rest_client.git', tag: 'v0.3.0'
Usage
Configuration
# Configure global settings
# The client is disabled if kafka_proxy_url and schema_registry_url are not set
KafkaRestClient.configure do |config|
config.kafka_proxy_url = ENV['KAFKA_PROXY_URL']
config.schema_registry_url = ENV['SCHEMA_REGISTRY_URL']
config.timeout = 10
config.logger = Rails.logger
end
Producing events
require 'kafka_rest_client'
producer = KafkaRestClient::AvroProducer.new
# Produce a single event using the topic name and payload
# The schema_id will be fetched from the schema registry by looking up a schema
# with the "#{topic}-value" name
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' })
# Produce an event using the topic name, payload and a partition key
# The partition key should be the name of a field in the message
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' }, { key: :temperature })
# This would post a request to the REST Proxy e.g. :
# {"id": 1, "temperature": 32, "unit": "celsius"}
# Produce multiple events
# The schema_id will be fetched from the schema registry by looking up a schema
# with the "#{topic}-value" name
producer.produce('ice-cream-melted', [{ temperature: 35, unit: 'celsius' }])
# Produce an event providing the schema id
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' }, value_schema_id: 1)
# Produce an event providing the full schema as a JSON string
schema = {
type: 'record',
name: 'IceCreamMelted',
fields: [{ name: 'Temperature', type: 'string' }, { name: 'Unit', type: 'string' }]
}
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' }, value_schema: schema.to_json)
# Produce event objects by relying on #to_json and #as_json when using Rails ActiveSupport
class IceCreamMeltedEvent
attr_reader :temperature, :unit
def initialize(temperature, unit)
@temperature = temperature
@unit = unit
end
def as_json( = nil)
{ temperature: temperature, unit: unit }
end
end
producer.produce('ice-cream-melted', [IceCreamMeltedEvent.new(35, 'celsius')])
# All of the above examples expect events to have union types explicitly defined.
# For example, if you have a nullable string field, the type annotation needs to be:
# {"field_name": { "string": "field_value"}}
# You can provide the following option to automatically annotate nullable fields.
# ⚠️ This does not currently support nested documents
producer.produce('ice-cream-melted',
{ temperature: 35, unit: 'celsius' },
annotate_optional_fields: true,
value_schema: {"namespace": "com.fundingcircle",
"type": "record",
"name": "IceCreamMelted",
"fields": [{"name": "id", "type": "string"},
{"name": "temperature","type": "int"},
{"name": "unit",
"type": ["null", "string"]
}]
})
# This would post a request to the REST Proxy with the correct type annotation, e.g. :
# {"id": 1, "temperature": 32, "unit": { "string": "celsius"}}
# Exception handling
begin
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' })
rescue KafkaRestClient::SchemaNotFoundError => e
# Schema has not been registered in the schema registry
rescue KafkaRestClient::TopicNotFoundError => e
# Topic does not exist in Kafka, create it with confluent tools
rescue Net::ReadTimeout => e
# Rescue read timeout errors
end
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/fundingcircle/kafka_rest_client.