Class: Cosmos::Store

Inherits:
Object show all
Defined in:
lib/cosmos/utilities/store_autoload.rb

Constant Summary collapse

@@instance =

Variable that holds the singleton instance

nil
@@instance_mutex =

Mutex used to ensure that only one instance is created

Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pool_size = 10) ⇒ Store

Returns a new instance of Store.



75
76
77
78
79
80
81
82
# File 'lib/cosmos/utilities/store_autoload.rb', line 75

def initialize(pool_size = 10)
  Redis.exists_returns_integer = true
  @redis_username = ENV['COSMOS_REDIS_USERNAME']
  @redis_key = ENV['COSMOS_REDIS_PASSWORD']
  @redis_url = "redis://#{ENV['COSMOS_REDIS_HOSTNAME']}:#{ENV['COSMOS_REDIS_PORT']}"
  @redis_pool = ConnectionPool.new(size: pool_size) { build_redis() }
  @topic_offsets = {}
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(message, *args, **kwargs, &block) ⇒ Object

Delegate all unknown methods to redis through the @redis_pool



60
61
62
# File 'lib/cosmos/utilities/store_autoload.rb', line 60

def method_missing(message, *args, &block)
  @redis_pool.with { |redis| redis.public_send(message, *args, &block) }
end

Instance Attribute Details

#redis_poolObject (readonly)

Returns the value of attribute redis_pool.



40
41
42
# File 'lib/cosmos/utilities/store_autoload.rb', line 40

def redis_pool
  @redis_pool
end

#redis_urlObject (readonly)

Returns the value of attribute redis_url.



39
40
41
# File 'lib/cosmos/utilities/store_autoload.rb', line 39

def redis_url
  @redis_url
end

Class Method Details

.get_last_offset(topic) ⇒ Object



159
160
161
# File 'lib/cosmos/utilities/store_autoload.rb', line 159

def self.get_last_offset(topic)
  self.instance.get_last_offset(topic)
end

.get_newest_message(topic) ⇒ Object



148
149
150
# File 'lib/cosmos/utilities/store_autoload.rb', line 148

def self.get_newest_message(topic)
  self.instance.get_newest_message(topic)
end

.get_oldest_message(topic) ⇒ Object



137
138
139
# File 'lib/cosmos/utilities/store_autoload.rb', line 137

def self.get_oldest_message(topic)
  self.instance.get_oldest_message(topic)
end

.initialize_streams(topics) ⇒ Object

Stream APIs



124
125
126
# File 'lib/cosmos/utilities/store_autoload.rb', line 124

def self.initialize_streams(topics)
  self.instance.initialize_streams(topics)
end

.instance(pool_size = 100) ⇒ Object

Get the singleton instance



43
44
45
46
47
48
49
50
51
# File 'lib/cosmos/utilities/store_autoload.rb', line 43

def self.instance(pool_size = 100)
  # Logger.level = Logger::DEBUG
  return @@instance if @@instance

  @@instance_mutex.synchronize do
    @@instance ||= self.new(pool_size)
    return @@instance
  end
end

.method_missing(message, *args, **kwargs, &block) ⇒ Object

Delegate all unknown class methods to delegate to the instance



55
56
57
# File 'lib/cosmos/utilities/store_autoload.rb', line 55

def self.method_missing(message, *args, &block)
  self.instance.public_send(message, *args, &block)
end

.read_topic_last(topic) ⇒ Object



174
175
176
# File 'lib/cosmos/utilities/store_autoload.rb', line 174

def self.read_topic_last(topic)
  self.instance.read_topic_last(topic)
end

.read_topics(topics, offsets = nil, timeout_ms = 1000, &block) ⇒ Object



224
225
226
# File 'lib/cosmos/utilities/store_autoload.rb', line 224

def self.read_topics(topics, offsets = nil, timeout_ms = 1000, &block)
  self.instance.read_topics(topics, offsets, timeout_ms, &block)
end

.trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer

Trims older entries of the redis stream if needed. > www.rubydoc.info/github/redis/redis-rb/Redis:xtrim

Examples:

Without options

COSMOS::Store.trim_topic('MANGO__TOPIC', 1000)

With options

COSMOS::Store.trim_topic('MANGO__TOPIC', 1000, approximate: true, limit: 0)

Parameters:

  • topic (String)

    the stream key

  • minid (Integer)

    max length of entries to trim

  • limit (Boolean) (defaults to: 0)

    whether to add '~` modifier of maxlen or not

Returns:

  • (Integer)

    the number of entries actually deleted



305
306
307
# File 'lib/cosmos/utilities/store_autoload.rb', line 305

def self.trim_topic(topic, minid, approximate = true, limit: 0)
  self.instance.trim_topic(topic, minid, approximate, limit: limit)
end

.update_topic_offsets(topics) ⇒ Object

TODO: Currently unused def decrement_id(id)

time, sequence = id.split('-')
if sequence == '0'
  "#{time.to_i - 1}-18446744073709551615"
else
  "#{time}-#{sequence.to_i - 1}"
end

end



202
203
204
# File 'lib/cosmos/utilities/store_autoload.rb', line 202

def self.update_topic_offsets(topics)
  self.instance.update_topic_offsets(topics)
end

.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true) ⇒ String

Add new entry to the redis stream. > www.rubydoc.info/github/redis/redis-rb/Redis:xadd

Examples:

Without options

COSMOS::Store().write_topic('MANGO__TOPIC', {'message' => 'something'})

With options

COSMOS::Store().write_topic('MANGO__TOPIC', {'message' => 'something'}, id: '0-0', maxlen: 1000, approximate: false)

Parameters:

  • topic (String)

    the stream / topic

  • msg_hash (Hash)

    one or multiple field-value pairs

  • opts (Hash)

    a customizable set of options

Returns:



263
264
265
# File 'lib/cosmos/utilities/store_autoload.rb', line 263

def self.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true)
  self.instance.write_topic(topic, msg_hash, id, maxlen, approximate)
end

Instance Method Details

#build_redisObject



85
86
87
# File 'lib/cosmos/utilities/store_autoload.rb', line 85

def build_redis
  return Redis.new(url: @redis_url, username: @redis_username, password: @redis_key)
end

#get_cmd_item(target_name, packet_name, param_name, type: :WITH_UNITS, scope: $cosmos_scope) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/cosmos/utilities/store_autoload.rb', line 90

def get_cmd_item(target_name, packet_name, param_name, type: :WITH_UNITS, scope: $cosmos_scope)
  msg_id, msg_hash = read_topic_last("#{scope}__DECOMCMD__{#{target_name}}__#{packet_name}")
  if msg_id
    # TODO: We now have these reserved items directly on command packets
    # Do we still calculate from msg_hash['time'] or use the times directly?
    #
    # if param_name == 'RECEIVED_TIMESECONDS' || param_name == 'PACKET_TIMESECONDS'
    #   Time.from_nsec_from_epoch(msg_hash['time'].to_i).to_f
    # elsif param_name == 'RECEIVED_TIMEFORMATTED' || param_name == 'PACKET_TIMEFORMATTED'
    #   Time.from_nsec_from_epoch(msg_hash['time'].to_i).formatted
    if param_name == 'RECEIVED_COUNT'
      msg_hash['received_count'].to_i
    else
      json = msg_hash['json_data']
      hash = JSON.parse(json)
      # Start from the most complex down to the basic raw value
      value = hash["#{param_name}__U"]
      return value if value && type == :WITH_UNITS

      value = hash["#{param_name}__F"]
      return value if value && (type == :WITH_UNITS || type == :FORMATTED)

      value = hash["#{param_name}__C"]
      return value if value && (type == :WITH_UNITS || type == :FORMATTED || type == :CONVERTED)

      return hash[param_name]
    end
  end
end

#get_last_offset(topic) ⇒ Object



163
164
165
166
167
168
169
170
171
172
# File 'lib/cosmos/utilities/store_autoload.rb', line 163

def get_last_offset(topic)
  @redis_pool.with do |redis|
    result = redis.xrevrange(topic, count: 1)
    if result and result[0] and result[0][0]
      result[0][0]
    else
      "0-0"
    end
  end
end

#get_newest_message(topic) ⇒ Object



152
153
154
155
156
157
# File 'lib/cosmos/utilities/store_autoload.rb', line 152

def get_newest_message(topic)
  @redis_pool.with do |redis|
    result = redis.xrevrange(topic, count: 1)
    return result[0]
  end
end

#get_oldest_message(topic) ⇒ Object



141
142
143
144
145
146
# File 'lib/cosmos/utilities/store_autoload.rb', line 141

def get_oldest_message(topic)
  @redis_pool.with do |redis|
    result = redis.xrange(topic, count: 1)
    return result[0]
  end
end

#initialize_streams(topics) ⇒ Object



128
129
130
131
132
133
134
135
# File 'lib/cosmos/utilities/store_autoload.rb', line 128

def initialize_streams(topics)
  @redis_pool.with do |redis|
    topics.each do |topic|
      # Create empty stream with maxlen 0
      redis.xadd(topic, { a: 'b' }, maxlen: 0)
    end
  end
end

#read_topic_last(topic) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/cosmos/utilities/store_autoload.rb', line 178

def read_topic_last(topic)
  @redis_pool.with do |redis|
    # Default in xrevrange is range end '+', start '-' which means get all
    # elements from higher ID to lower ID and since we're limiting to 1
    # we get the last element. See https://redis.io/commands/xrevrange.
    result = redis.xrevrange(topic, count: 1)
    if result and result.length > 0
      return result[0]
    else
      return nil
    end
  end
end

#read_topics(topics, offsets = nil, timeout_ms = 1000) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/cosmos/utilities/store_autoload.rb', line 228

def read_topics(topics, offsets = nil, timeout_ms = 1000)
  # Logger.debug "read_topics: #{topics}, #{offsets} pool:#{@redis_pool}"
  @redis_pool.with do |redis|
    offsets = update_topic_offsets(topics) unless offsets
    result = redis.xread(topics, offsets, block: timeout_ms)
    if result and result.length > 0
      result.each do |topic, messages|
        messages.each do |msg_id, msg_hash|
          @topic_offsets[topic] = msg_id
          yield topic, msg_id, msg_hash, redis if block_given?
        end
      end
    end
    # Logger.debug "result:#{result}" if result and result.length > 0
    return result
  end
end

#trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer

Trims older entries of the redis stream if needed. > www.rubydoc.info/github/redis/redis-rb/Redis:xtrim

Examples:

Without options

store.trim_topic('MANGO__TOPIC', 1000)

With options

store.trim_topic('MANGO__TOPIC', 1000, approximate: true, limit: 0)

Parameters:

  • topic (String)

    the stream key

  • minid (Integer)

    mid id length of entries to trim

  • limit (Boolean) (defaults to: 0)

    whether to add '~` modifier of maxlen or not

Returns:

  • (Integer)

    the number of entries actually deleted



322
323
324
325
326
# File 'lib/cosmos/utilities/store_autoload.rb', line 322

def trim_topic(topic, minid, approximate = true, limit: 0)
  @redis_pool.with do |redis|
    return redis.xtrim_minid(topic, minid, approximate: approximate, limit: limit)
  end
end

#update_topic_offsets(topics) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/cosmos/utilities/store_autoload.rb', line 206

def update_topic_offsets(topics)
  offsets = []
  topics.each do |topic|
    # Normally we will just be grabbing the topic offset
    # this allows xread to get everything past this point
    last_id = @topic_offsets[topic]
    if last_id
      offsets << last_id
    else
      # If there is no topic offset this is the first call.
      # Get the last offset ID so we'll start getting everything from now on
      offsets << get_last_offset(topic)
      @topic_offsets[topic] = offsets[-1]
    end
  end
  return offsets
end

#write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true) ⇒ String

Add new entry to the redis stream. > www.rubydoc.info/github/redis/redis-rb/Redis:xadd

Examples:

Without options

store.write_topic('MANGO__TOPIC', {'message' => 'something'})

With options

store.write_topic('MANGO__TOPIC', {'message' => 'something'}, id: '0-0', maxlen: 1000, approximate: true)

Parameters:

  • topic (String)

    the stream / topic

  • msg_hash (Hash)

    one or multiple field-value pairs

  • opts (Hash)

    a customizable set of options

Returns:



284
285
286
287
288
289
290
# File 'lib/cosmos/utilities/store_autoload.rb', line 284

def write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true)
  id = '*' if id.nil?
  # Logger.debug "write_topic topic:#{topic} id:#{id} hash:#{msg_hash}"
  @redis_pool.with do |redis|
    return redis.xadd(topic, msg_hash, id: id, maxlen: maxlen, approximate: approximate)
  end
end