Class: Cosmos::LimitsEventTopic

Inherits:
Topic show all
Defined in:
lib/cosmos/topics/limits_event_topic.rb

Overview

LimitsEventTopic keeps track of not only the <SCOPE>__cosmos_limits_events topic but also the ancillary key value stores. The LIMITS_CHANGE event updates the <SCOPE>__current_limits key. The LIMITS_SET event updates the <SCOPE>__limits_sets. While this isn’t a clean separation of topics (streams) and models (key-value) it helps maintain consistency as the topic and model are linked.

Class Method Summary collapse

Methods inherited from Topic

clear_topics, initialize_streams, read_topics, topics

Class Method Details

.current_set(scope:) ⇒ Object



94
95
96
# File 'lib/cosmos/topics/limits_event_topic.rb', line 94

def self.current_set(scope:)
  sets(scope: scope).key('true') || "DEFAULT"
end

.delete(_target_name, _packet_name, scope:) ⇒ Object



98
99
100
101
102
103
# File 'lib/cosmos/topics/limits_event_topic.rb', line 98

def self.delete(_target_name, _packet_name, scope:)
  limits = Store.hgetall("#{scope}__current_limits")
  limits.each do |item, _limits_state|
    Store.hdel("#{scope}__current_limits", item)
  end
end

.out_of_limits(scope:) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/cosmos/topics/limits_event_topic.rb', line 74

def self.out_of_limits(scope:)
  out_of_limits = []
  limits = Store.hgetall("#{scope}__current_limits")
  limits.each do |item, limits_state|
    if %w(RED RED_HIGH RED_LOW YELLOW YELLOW_HIGH YELLOW_LOW).include?(limits_state)
      target_name, packet_name, item_name = item.split('__')
      out_of_limits << [target_name, packet_name, item_name, limits_state]
    end
  end
  out_of_limits
end

.read(offset = nil, count: 100, scope:) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/cosmos/topics/limits_event_topic.rb', line 58

def self.read(offset = nil, count: 100, scope:)
  topic = "#{scope}__cosmos_limits_events"
  if offset
    result = Store.xread(topic, offset, count: count)
    if result.empty?
      [] # We want to return an empty array rather than an empty hash
    else
      # result is a hash with the topic key followed by an array of results
      # This returns just the array of arrays [[offset, hash], [offset, hash], ...]
      result[topic]
    end
  else
    Store.xrevrange(topic, count: 1)
  end
end

.sets(scope:) ⇒ Hash{String => String}

Returns all the limits sets as keys with the value ‘true’ or ‘false’ where only the active set is ‘true’

Returns:

  • (Hash{String => String})

    Set name followed by 'true' if enabled else 'false'



90
91
92
# File 'lib/cosmos/topics/limits_event_topic.rb', line 90

def self.sets(scope:)
  Store.hgetall("#{scope}__limits_sets")
end

.write(event, scope:) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/cosmos/topics/limits_event_topic.rb', line 29

def self.write(event, scope:)
  case event[:type]
  when :LIMITS_CHANGE
    # The current_limits hash keeps only the current limits state of items
    # It is used by the API to determine the overall limits state
    field = "#{event[:target_name]}__#{event[:packet_name]}__#{event[:item_name]}"
    Store.hset("#{scope}__current_limits", field, event[:new_limits_state])

  when :LIMITS_SETTINGS
    # Limits updated in limits_api.rb to avoid circular reference to TargetModel
    unless sets(scope: scope).has_key?(event[:limits_set])
      Store.hset("#{scope}__limits_sets", event[:limits_set], 'false')
    end

  when :LIMITS_SET
    sets = sets(scope: scope)
    raise "Set '#{event[:set]}' does not exist!" unless sets.key?(event[:set])

    # Set all existing sets to "false"
    sets = sets.transform_values! { |_key, _value| "false" }
    sets[event[:set]] = "true" # Enable the requested set
    Store.hmset("#{scope}__limits_sets", *sets)
  else
    raise "Invalid limits event type '#{event[:type]}'"
  end

  Store.write_topic("#{scope}__cosmos_limits_events", event, '*', 1000)
end