Class: DTK::State::Component::Attribute::Influxdb

Inherits:
DTK::State::Component::Attribute show all
Defined in:
lib/state/component/providers/influxdb.rb,
lib/state/component/providers/influxdb/client.rb,
lib/state/component/providers/influxdb/measurement.rb,
lib/state/component/providers/influxdb/semantictype.rb,
lib/state/component/providers/influxdb/measurement/errors.rb,
lib/state/component/providers/influxdb/measurement/events.rb,
lib/state/component/providers/influxdb/measurement/states.rb,
lib/state/component/providers/influxdb/measurement/attribute_measurement.rb

Defined Under Namespace

Classes: Client, Measurement, SemanticType

Instance Attribute Summary collapse

Attributes inherited from DTK::State::Component::Attribute

#dynamic, #encrypted, #function, #name, #parent, #required, #type, #value

Instance Method Summary collapse

Methods inherited from DTK::State::Component::Attribute

create_from_kube_array, create_from_kube_hash, get, #to_hash

Constructor Details

#initialize(measurement_name) ⇒ Influxdb

Returns a new instance of Influxdb.



11
12
13
14
# File 'lib/state/component/providers/influxdb.rb', line 11

def initialize(measurement_name)
  @client = Influxdb::Client.new
  @measurement = @client.measurement_helper(measurement_name)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



9
10
11
# File 'lib/state/component/providers/influxdb.rb', line 9

def client
  @client
end

#measurementObject (readonly)

Returns the value of attribute measurement.



9
10
11
# File 'lib/state/component/providers/influxdb.rb', line 9

def measurement
  @measurement
end

Instance Method Details

#get(namespace, component_name, assembly_name, attribute_name, opts = {}) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/state/component/providers/influxdb.rb', line 16

def get(namespace, component_name, assembly_name, attribute_name, opts = {})
  required_tags = measurement.get_required_tags(namespace, component_name, assembly_name, attribute_name)
  required_tags.merge! measurement.get_correlator_type(opts[:entrypoint]) if opts[:provider] == "correlation"
  measurement.get_last_point(required_tags)
rescue => e
  raise "Error happened while getting attribute from InfluxDB.\nError: #{e}"
end

#get_event(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id) ⇒ Object



41
42
43
44
45
46
# File 'lib/state/component/providers/influxdb.rb', line 41

def get_event(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id)
  required_tags = measurement.get_required_tags(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id)
  last_point = measurement.get_last_point(required_tags)
rescue => e
  raise "Error happened while getting event from InfluxDB.\nError: #{e}"
end

#get_state(type, name, namespace, object_state, component_name, attribute_name, task_id) ⇒ Object



57
58
59
60
61
62
# File 'lib/state/component/providers/influxdb.rb', line 57

def get_state(type, name, namespace, object_state, component_name, attribute_name, task_id)
  required_tags = measurement.get_required_tags(type, name, namespace, object_state, component_name, attribute_name, task_id)
  measurement.get_last_point(required_tags)
rescue => e
  raise "Error happened while getting state from InfluxDB.\nError: #{e}"
end

#write(namespace, component_name, assembly_name, attribute_name, value, opts = {}, timestamp = nil) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/state/component/providers/influxdb.rb', line 24

def write(namespace, component_name, assembly_name, attribute_name, value, opts = {}, timestamp = nil)
  required_tags = measurement.get_required_tags(namespace, component_name, assembly_name, attribute_name)
  required_tags.merge! measurement.get_correlator_type(opts[:entrypoint]) if opts[:provider] == "correlation"
  measurement.write(value.to_s, required_tags, timestamp)
rescue => e
  raise "Error happened while writing attribute into InfluxDB.\Error: #{e}"
end

#write_event(event_id, pod_name, pod_namespace, event_source, event_message, component_name, attribute_name, task_id, timestamp) ⇒ Object



32
33
34
35
36
37
38
39
# File 'lib/state/component/providers/influxdb.rb', line 32

def write_event(event_id, pod_name, pod_namespace, event_source, event_message, component_name, attribute_name, task_id, timestamp)
  fail "Bad timestamp input, write operation wont be completed" if timestamp > Time.new
  value_to_write = { event_source: event_source, event_message: event_message }
  required_tags = measurement.get_required_tags(event_id, pod_name, pod_namespace, component_name, attribute_name, task_id)
  measurement.write(value_to_write.to_s, required_tags, timestamp)
rescue => error
  raise "Error happened while writing event into InfluxDB.\nError: #{e}"
end

#write_state(type, name, namespace, object_state, spec, status, component_name, attribute_name, task_id, timestamp) ⇒ Object



48
49
50
51
52
53
54
55
# File 'lib/state/component/providers/influxdb.rb', line 48

def write_state(type, name, namespace, object_state, spec, status, component_name, attribute_name, task_id, timestamp)
  raise "Bad timestamp input, write operation to InfluxDB wont be completed" if timestamp > Time.new
  value_to_write = { spec: spec, status: status }
  required_tags = measurement.get_required_tags(type, name, namespace, object_state, component_name, attribute_name, task_id)
  measurement.write(value_to_write.to_s, required_tags, timestamp)
rescue => e
  raise "Error happened while writing state into InfluxDB.\nError: #{e}"
end