Gem Version Build Status

Logstash Plugin: logstash-filter-kafka_time_machine

This is a filter plugin for Logstash

Description

This filter plugin will generate new events in the logstash pipeline. These events are generated based on fields in the original that are extracted and passed to the filter. The new events generate metrics for log events that have traversed multiple kafka and logstash blocks for aggregation.

The typical flow for log events:


Service Log ---> kafka_shipper <--- logstash_shipper ---> | ott_network_link | ---> kafka_indexer <--- logstash_indexer ---> elastic_search

The filter leverages metadata inserted into the log event on both logstash_shipper and logstash_indexer nodes to track dwell time of log events through this pipeline.

Kafka Time Machine Result

When the kafka_time_machine executes it will return a InfluxDB Line Protocol formatted metric, i.e.:

ktm,datacenter=kafka_datacenter_shipper-test,es_cluster=some_cluster_name,es_cluster_index=some_cluster_index_name,lag_type=total,owner=ktm_test@cisco.com lag_ms=300i,payload_size_bytes=40i 1634662795000000000

The plugin will also emit a metric if an error was encountered, i.e.:

ktm_error,datacenter=kafka_datacenter_shipper-test,es_cluster=some_cluster_name,es_cluster_index=some_cluster_index_name,owner=ktm_test@cisco.com,source=shipper count=1i 1634662795000000000

To ensure a logstash output{} block can properly route this metric, the new event are tagged with a [@metadata][ktm_tag][ktm_metric] field, i.e.:

{
    "ktm_metric" => "ktm,datacenter=kafka_datacenter_shipper-test,lag_type=total,es_cluster=some_cluster_name,es_cluster_index=some_cluster_index_name,[email protected] lag_ms=300i,payload_size_bytes=40i 1634662795000000000",
    "@timestamp" => 2021-10-20T23:46:24.704Z,
     "@metadata" => {
        "ktm_tags" => {
            "ktm_metric" => "true"
        }
    },
      "@version" => "1"
}

Metric Event Breakdown

The kafka_time_machine can insert one or more new events in the pipeline. The ktm_metric created will be one of:

  • ktm
  • ktm_error

In the case of ktm the metric breakdown is:

Line Protocol Element Line Protocol Type Description
datacenter tag Echo of kafka_datacenter_shipper
es_cluster tag Echo of elasticsearch_cluster
es_cluster_index tag Echo of elasticsearch_cluster_index
lag_type tag Calculated lag type
owner tag Echo of event_owner
lag_ms field Calculated lag in milliseconds
payload_size_bytes field Calculated size of payload field in bytes

Meaning of lag_type:

  • total: Lag calculated includes dwell time on both on shipper and indexer
  • indexer: Lag calculated is dwell time for indexer only. Insufficient data provided for shipper to compute total lag.
  • shipper: Lag calculated is dwell time for shipper only. Insufficient data provided for indexer to compute total lag.

In the case of ktm_error the metric breakdown is:

Line Protocol Element Line Protocol Type Description
datacenter tag Echo of kafka_datacenter_shipper
es_cluster tag Echo of elasticsearch_cluster
es_cluster_index tag Echo of elasticsearch_cluster_index
source tag Source of the error metric
owner tag Echo of event_owner
count field Count to track error; not cumulative

Meaning of source:

  • indexer: Insufficient data provided for indexer to compute total lag.
  • shipper: Insufficient data provided for shipper to compute total lag.
  • insufficient_data: Insufficient data provided both indexer and shipper to compute total lag.
  • unknown: Unknown error encountered

Metric Event Timestamp

When the kafka_time_machine generates the InfluxDB Line Protocol metric it must also set the timestamp on the event. To ensure the caller of filter has control of this the event_time_ms configuration is used to set the metric timestamp.

For example if event_time_ms is provided as 1634662795000 the resulting metric would be:

ktm,datacenter=kafka_datacenter_shipper-test,lag_type=total,owner=ktm_test@cisco.com lag_ms=300i,payload_size_bytes=40i 1634662795000000000

Kafka Time Machine Configuration Options

This plugin requires the following configurations:

Setting Input Type Required
kafka_datacenter_shipper string Yes
kafka_topic_shipper string Yes
kafka_consumer_group_shipper string Yes
kafka_append_time_shipper string Yes
logstash_kafka_read_time_shipper string Yes
kafka_topic_indexer string Yes
kafka_consumer_group_indexer string Yes
kafka_append_time_indexer string Yes
logstash_kafka_read_time_indexer string Yes
event_owner string Yes
event_time_ms string Yes
elasticsearch_cluster string Yes
elasticsearch_cluster_index string Yes

Why are all settings required?

This was a design decision based on the use case. Tracking a Kafka "lag by time" metric, but not knowing the topic and consumer group would be essentially useless. By leveraging the Kafka input decorate_events feature we know we'll always have the required fields.

While they are required, they can be passed as empty strings. The plugin will handle these cases, i.e. the kafka_consumer_group_shipper name is empty string, and only return indexer results

kafka_datacenter_shipper

  • Value type is string
  • There is no default value for this setting.

Provide datacenter that log event originated from; datacenter kafka_shipper is in. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    kafka_datacenter_shipper => "static_field"
  }
}
filter {
  kafka_time_machine {
    kafka_datacenter_shipper => "%{[dynamic_field]}"
  }
}

kafka_topic_shipper

  • Value type is string
  • There is no default value for this setting.

Provide kafka topic log event was read from on shipper. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    kafka_topic_shipper => "static_field"
  }
}
filter {
  kafka_time_machine {
    kafka_topic_shipper => "%{[dynamic_field]}"
  }
}

kafka_consumer_group_shipper

  • Value type is string
  • There is no default value for this setting.

Provide kafka consumer group log event was read from on shipper. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    kafka_consumer_group_shipper => "static_field"
  }
}
filter {
  kafka_time_machine {
    kafka_consumer_group_shipper => "%{[dynamic_field]}"
  }
}

kafka_append_time_shipper

  • Value type is string
  • There is no default value for this setting.

Provide EPOCH time in milliseconds log event was added to kafka_shipper. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    kafka_append_time_shipper => 1624394191000
  }
}
filter {
  kafka_time_machine {
    kafka_append_time_shipper => "%{[dynamic_field]}"
  }
}

logstash_kafka_read_time_shipper

  • Value type is string
  • There is no default value for this setting.

Provide EPOCH time in milliseconds log event read from to kafka_shipper. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    logstash_kafka_read_time_shipper => 1624394191000
  }
}
filter {
  kafka_time_machine {
    logstash_kafka_read_time_shipper => "%{[dynamic_field]}"
  }
}

kafka_topic_indexer

  • Value type is string
  • There is no default value for this setting.

Provide kafka topic log event was read from on indexer. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    kafka_topic_indexer => "static_field"
  }
}
filter {
  kafka_time_machine {
    kafka_topic_indexer => "%{[dynamic_field]}"
  }
}

kafka_consumer_group_indexer

  • Value type is string
  • There is no default value for this setting.

Provide kafka consumer group log event was read from on indexer. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    kafka_consumer_group_indexer => "static_field"
  }
}
filter {
  kafka_time_machine {
    kafka_consumer_group_indexer => "%{[dynamic_field]}"
  }
}

kafka_append_time_indexer

  • Value type is string
  • There is no default value for this setting.

Provide EPOCH time in milliseconds log event was added to kafka_indexer. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    kafka_append_time_indexer => 1624394191000
  }
}
filter {
  kafka_time_machine {
    kafka_append_time_indexer => "%{[dynamic_field]}"
  }
}

logstash_kafka_read_time_indexer

  • Value type is string
  • There is no default value for this setting.

Provide EPOCH time in milliseconds log event read from to kafka_indexer. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    logstash_kafka_read_time_indexer => 1624394191000
  }
}
filter {
  kafka_time_machine {
    logstash_kafka_read_time_indexer => "%{[dynamic_field]}"
  }
}

event_owner

  • Value type is string
  • There is no default value for this setting.

Provide event owner; represents the owner of the log. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    event_owner => "static_field"
  }
}
filter {
  kafka_time_machine {
    event_owner => "%{[dynamic_field]}"
  }
}

event_time_ms

  • Value type is string
  • There is no default value for this setting.

Provide EPOCH time in milliseconds that this event is being processed. This time will be appending the generated InfluxDb Line Protocol metric. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    event_time_ms => 1624394191000
  }
}
filter {
  kafka_time_machine {
    event_time_ms => "%{[dynamic_field]}"
  }
}

elasticsearch_cluster

  • Value type is string
  • There is no default value for this setting.

Provide identifier for ElasticSearch cluster log was sent to; represents the owner of the log. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    elasticsearch_cluster => "static_field"
  }
}
filter {
  kafka_time_machine {
    elasticsearch_cluster => "%{[dynamic_field]}"
  }
}

elasticsearch_cluster_index

  • Value type is string
  • There is no default value for this setting.

Provide identifier for ElasticSearch cluster index log will be indexed in; represents the owner of the log. Field values can be static or dynamic:

filter {
  kafka_time_machine {
    elasticsearch_cluster_index => "static_field"
  }
}
filter {
  kafka_time_machine {
    elasticsearch_cluster_index => "%{[dynamic_field]}"
  }
}