Logstash Plugin: logstash-filter-kafka_time_machine
This is a filter plugin for Logstash
Description
This filter plugin will generate new events in the logstash pipeline. These events are generated based on fields in the original that are extracted and passed to the filter. The new events generate metrics for log events that have traversed multiple kafka and logstash blocks for aggregation.
The typical flow for log events:
Service Log ---> kafka_shipper <--- logstash_shipper ---> | ott_network_link | ---> kafka_indexer <--- logstash_indexer ---> elastic_search
The filter leverages metadata inserted into the log event on both logstash_shipper
and logstash_indexer
nodes to track dwell time of log events through this pipeline.
Kafka Time Machine Result
When the kafka_time_machine
executes it will return a InfluxDB Line Protocol formatted metric, i.e.:
ktm,datacenter=kafka_datacenter_shipper-test,es_cluster=some_cluster_name,es_cluster_index=some_cluster_index_name,lag_type=total,owner=ktm_test@cisco.com lag_ms=300i,payload_size_bytes=40i 1634662795000000000
The plugin will also emit a metric if an error was encountered, i.e.:
ktm_error,datacenter=kafka_datacenter_shipper-test,es_cluster=some_cluster_name,es_cluster_index=some_cluster_index_name,owner=ktm_test@cisco.com,source=shipper count=1i 1634662795000000000
To ensure a logstash output{}
block can properly route this metric, the new event are tagged with a [@metadata][ktm_tag][ktm_metric]
field, i.e.:
{
"ktm_metric" => "ktm,datacenter=kafka_datacenter_shipper-test,lag_type=total,es_cluster=some_cluster_name,es_cluster_index=some_cluster_index_name,[email protected] lag_ms=300i,payload_size_bytes=40i 1634662795000000000",
"@timestamp" => 2021-10-20T23:46:24.704Z,
"@metadata" => {
"ktm_tags" => {
"ktm_metric" => "true"
}
},
"@version" => "1"
}
Metric Event Breakdown
The kafka_time_machine
can insert one or more new events in the pipeline. The ktm_metric
created will be one of:
ktm
ktm_error
In the case of ktm
the metric breakdown is:
Line Protocol Element | Line Protocol Type | Description |
---|---|---|
datacenter | tag | Echo of kafka_datacenter_shipper |
es_cluster | tag | Echo of elasticsearch_cluster |
es_cluster_index | tag | Echo of elasticsearch_cluster_index |
lag_type | tag | Calculated lag type |
owner | tag | Echo of event_owner |
lag_ms | field | Calculated lag in milliseconds |
payload_size_bytes | field | Calculated size of payload field in bytes |
Meaning of lag_type
:
total
: Lag calculated includes dwell time on both on shipper and indexerindexer
: Lag calculated is dwell time for indexer only. Insufficient data provided for shipper to computetotal
lag.shipper
: Lag calculated is dwell time for shipper only. Insufficient data provided for indexer to computetotal
lag.
In the case of ktm_error
the metric breakdown is:
Line Protocol Element | Line Protocol Type | Description |
---|---|---|
datacenter | tag | Echo of kafka_datacenter_shipper |
es_cluster | tag | Echo of elasticsearch_cluster |
es_cluster_index | tag | Echo of elasticsearch_cluster_index |
source | tag | Source of the error metric |
owner | tag | Echo of event_owner |
count | field | Count to track error; not cumulative |
Meaning of source
:
indexer
: Insufficient data provided for indexer to computetotal
lag.shipper
: Insufficient data provided for shipper to computetotal
lag.insufficient_data
: Insufficient data provided both indexer and shipper to computetotal
lag.unknown
: Unknown error encountered
Metric Event Timestamp
When the kafka_time_machine
generates the InfluxDB Line Protocol metric it must also set the timestamp on the event. To ensure the caller of filter has control of this the event_time_ms
configuration is used to set the metric timestamp.
For example if event_time_ms
is provided as 1634662795000
the resulting metric would be:
ktm,datacenter=kafka_datacenter_shipper-test,lag_type=total,owner=ktm_test@cisco.com lag_ms=300i,payload_size_bytes=40i 1634662795000000000
Kafka Time Machine Configuration Options
This plugin requires the following configurations:
Setting | Input Type | Required |
---|---|---|
kafka_datacenter_shipper | string | Yes |
kafka_topic_shipper | string | Yes |
kafka_consumer_group_shipper | string | Yes |
kafka_append_time_shipper | string | Yes |
logstash_kafka_read_time_shipper | string | Yes |
kafka_topic_indexer | string | Yes |
kafka_consumer_group_indexer | string | Yes |
kafka_append_time_indexer | string | Yes |
logstash_kafka_read_time_indexer | string | Yes |
event_owner | string | Yes |
event_time_ms | string | Yes |
elasticsearch_cluster | string | Yes |
elasticsearch_cluster_index | string | Yes |
Why are all settings required?
This was a design decision based on the use case. Tracking a Kafka "lag by time" metric, but not knowing the topic and consumer group would be essentially useless. By leveraging the Kafka input
decorate_events
feature we know we'll always have the required fields.While they are required, they can be passed as empty strings. The plugin will handle these cases, i.e. the
kafka_consumer_group_shipper
name is empty string, and only returnindexer
results
kafka_datacenter_shipper
- Value type is string
- There is no default value for this setting.
Provide datacenter that log event originated from; datacenter kafka_shipper is in. Field values can be static or dynamic:
filter {
kafka_time_machine {
kafka_datacenter_shipper => "static_field"
}
}
filter {
kafka_time_machine {
kafka_datacenter_shipper => "%{[dynamic_field]}"
}
}
kafka_topic_shipper
- Value type is string
- There is no default value for this setting.
Provide kafka topic log event was read from on shipper. Field values can be static or dynamic:
filter {
kafka_time_machine {
kafka_topic_shipper => "static_field"
}
}
filter {
kafka_time_machine {
kafka_topic_shipper => "%{[dynamic_field]}"
}
}
kafka_consumer_group_shipper
- Value type is string
- There is no default value for this setting.
Provide kafka consumer group log event was read from on shipper. Field values can be static or dynamic:
filter {
kafka_time_machine {
kafka_consumer_group_shipper => "static_field"
}
}
filter {
kafka_time_machine {
kafka_consumer_group_shipper => "%{[dynamic_field]}"
}
}
kafka_append_time_shipper
- Value type is string
- There is no default value for this setting.
Provide EPOCH time in milliseconds log event was added to kafka_shipper
. Field values can be static or dynamic:
filter {
kafka_time_machine {
kafka_append_time_shipper => 1624394191000
}
}
filter {
kafka_time_machine {
kafka_append_time_shipper => "%{[dynamic_field]}"
}
}
logstash_kafka_read_time_shipper
- Value type is string
- There is no default value for this setting.
Provide EPOCH time in milliseconds log event read from to kafka_shipper
. Field values can be static or dynamic:
filter {
kafka_time_machine {
logstash_kafka_read_time_shipper => 1624394191000
}
}
filter {
kafka_time_machine {
logstash_kafka_read_time_shipper => "%{[dynamic_field]}"
}
}
kafka_topic_indexer
- Value type is string
- There is no default value for this setting.
Provide kafka topic log event was read from on indexer. Field values can be static or dynamic:
filter {
kafka_time_machine {
kafka_topic_indexer => "static_field"
}
}
filter {
kafka_time_machine {
kafka_topic_indexer => "%{[dynamic_field]}"
}
}
kafka_consumer_group_indexer
- Value type is string
- There is no default value for this setting.
Provide kafka consumer group log event was read from on indexer. Field values can be static or dynamic:
filter {
kafka_time_machine {
kafka_consumer_group_indexer => "static_field"
}
}
filter {
kafka_time_machine {
kafka_consumer_group_indexer => "%{[dynamic_field]}"
}
}
kafka_append_time_indexer
- Value type is string
- There is no default value for this setting.
Provide EPOCH time in milliseconds log event was added to kafka_indexer
. Field values can be static or dynamic:
filter {
kafka_time_machine {
kafka_append_time_indexer => 1624394191000
}
}
filter {
kafka_time_machine {
kafka_append_time_indexer => "%{[dynamic_field]}"
}
}
logstash_kafka_read_time_indexer
- Value type is string
- There is no default value for this setting.
Provide EPOCH time in milliseconds log event read from to kafka_indexer
. Field values can be static or dynamic:
filter {
kafka_time_machine {
logstash_kafka_read_time_indexer => 1624394191000
}
}
filter {
kafka_time_machine {
logstash_kafka_read_time_indexer => "%{[dynamic_field]}"
}
}
event_owner
- Value type is string
- There is no default value for this setting.
Provide event owner; represents the owner of the log. Field values can be static or dynamic:
filter {
kafka_time_machine {
event_owner => "static_field"
}
}
filter {
kafka_time_machine {
event_owner => "%{[dynamic_field]}"
}
}
event_time_ms
- Value type is string
- There is no default value for this setting.
Provide EPOCH time in milliseconds that this event is being processed. This time will be appending the generated InfluxDb Line Protocol metric. Field values can be static or dynamic:
filter {
kafka_time_machine {
event_time_ms => 1624394191000
}
}
filter {
kafka_time_machine {
event_time_ms => "%{[dynamic_field]}"
}
}
elasticsearch_cluster
- Value type is string
- There is no default value for this setting.
Provide identifier for ElasticSearch cluster log was sent to; represents the owner of the log. Field values can be static or dynamic:
filter {
kafka_time_machine {
elasticsearch_cluster => "static_field"
}
}
filter {
kafka_time_machine {
elasticsearch_cluster => "%{[dynamic_field]}"
}
}
elasticsearch_cluster_index
- Value type is string
- There is no default value for this setting.
Provide identifier for ElasticSearch cluster index log will be indexed in; represents the owner of the log. Field values can be static or dynamic:
filter {
kafka_time_machine {
elasticsearch_cluster_index => "static_field"
}
}
filter {
kafka_time_machine {
elasticsearch_cluster_index => "%{[dynamic_field]}"
}
}