Class: Trace::ZipkinKafkaSender
- Inherits:
-
ZipkinSenderBase
- Object
- ZipkinSenderBase
- Trace::ZipkinKafkaSender
- Defined in:
- lib/zipkin-tracer/zipkin_kafka_sender.rb
Overview
This class sends information to Zipkin through Kafka. Spans are encoded using Thrift
Constant Summary collapse
- DEFAULT_KAFKA_TOPIC =
"zipkin".freeze
- IP_FORMAT =
:i32
Instance Method Summary collapse
- #flush! ⇒ Object
-
#initialize(options = {}) ⇒ ZipkinKafkaSender
constructor
A new instance of ZipkinKafkaSender.
Methods inherited from ZipkinSenderBase
#end_span, #skip_flush?, #start_span, #with_new_span
Constructor Details
#initialize(options = {}) ⇒ ZipkinKafkaSender
Returns a new instance of ZipkinKafkaSender.
19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/zipkin-tracer/zipkin_kafka_sender.rb', line 19 def initialize( = {}) @topic = [:topic] || DEFAULT_KAFKA_TOPIC if [:producer] && [:producer].respond_to?(:push) @producer = [:producer] elsif [:zookeepers] initialize_hermann_producer([:zookeepers]) else raise ArgumentError, "No (kafka) :producer option (accepting #push) and no :zookeeper option provided." end super() end |
Instance Method Details
#flush! ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/zipkin-tracer/zipkin_kafka_sender.rb', line 32 def flush! resolved_spans = ::ZipkinTracer::HostnameResolver.new.spans_with_ips(spans, IP_FORMAT) resolved_spans.each do |span| buf = '' trans = Thrift::MemoryBufferTransport.new(buf) oprot = Thrift::BinaryProtocol.new(trans) span.to_thrift.write(oprot) retval = @producer.push(buf, topic: @topic) # If @producer#push returns a promise/promise-like object, block until it # resolves retval.value! if retval.respond_to?(:value!) retval end rescue Exception # Ignore socket errors, etc end |