Class: Cosmos::Store
Constant Summary collapse
- @@instance =
Variable that holds the singleton instance
nil
- @@instance_mutex =
Mutex used to ensure that only one instance is created
Mutex.new
Instance Attribute Summary collapse
-
#redis_pool ⇒ Object
readonly
Returns the value of attribute redis_pool.
-
#redis_url ⇒ Object
readonly
Returns the value of attribute redis_url.
Class Method Summary collapse
- .get_last_offset(topic) ⇒ Object
- .get_newest_message(topic) ⇒ Object
- .get_oldest_message(topic) ⇒ Object
-
.initialize_streams(topics) ⇒ Object
Stream APIs.
-
.instance(pool_size = 100) ⇒ Object
Get the singleton instance.
-
.method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown class methods to delegate to the instance.
- .read_topic_last(topic) ⇒ Object
- .read_topics(topics, offsets = nil, timeout_ms = 1000, &block) ⇒ Object
-
.trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer
Trims older entries of the redis stream if needed.
-
.update_topic_offsets(topics) ⇒ Object
TODO: Currently unused def decrement_id(id) time, sequence = id.split(‘-’) if sequence == ‘0’ “#- 1-18446744073709551615” else “#time-#- 1” end end.
-
.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true) ⇒ String
Add new entry to the redis stream.
Instance Method Summary collapse
- #build_redis ⇒ Object
- #get_cmd_item(target_name, packet_name, param_name, type: :WITH_UNITS, scope: $cosmos_scope) ⇒ Object
- #get_last_offset(topic) ⇒ Object
- #get_newest_message(topic) ⇒ Object
- #get_oldest_message(topic) ⇒ Object
-
#initialize(pool_size = 10) ⇒ Store
constructor
A new instance of Store.
- #initialize_streams(topics) ⇒ Object
-
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown methods to redis through the @redis_pool.
- #read_topic_last(topic) ⇒ Object
- #read_topics(topics, offsets = nil, timeout_ms = 1000) ⇒ Object
-
#trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer
Trims older entries of the redis stream if needed.
- #update_topic_offsets(topics) ⇒ Object
-
#write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true) ⇒ String
Add new entry to the redis stream.
Constructor Details
#initialize(pool_size = 10) ⇒ Store
Returns a new instance of Store.
75 76 77 78 79 80 81 82 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 75 def initialize(pool_size = 10) Redis.exists_returns_integer = true @redis_username = ENV['COSMOS_REDIS_USERNAME'] @redis_key = ENV['COSMOS_REDIS_PASSWORD'] @redis_url = "redis://#{ENV['COSMOS_REDIS_HOSTNAME']}:#{ENV['COSMOS_REDIS_PORT']}" @redis_pool = ConnectionPool.new(size: pool_size) { build_redis() } @topic_offsets = {} end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown methods to redis through the @redis_pool
60 61 62 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 60 def method_missing(, *args, &block) @redis_pool.with { |redis| redis.public_send(, *args, &block) } end |
Instance Attribute Details
#redis_pool ⇒ Object (readonly)
Returns the value of attribute redis_pool.
40 41 42 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 40 def redis_pool @redis_pool end |
#redis_url ⇒ Object (readonly)
Returns the value of attribute redis_url.
39 40 41 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 39 def redis_url @redis_url end |
Class Method Details
.get_last_offset(topic) ⇒ Object
159 160 161 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 159 def self.get_last_offset(topic) self.instance.get_last_offset(topic) end |
.get_newest_message(topic) ⇒ Object
148 149 150 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 148 def self.(topic) self.instance.(topic) end |
.get_oldest_message(topic) ⇒ Object
137 138 139 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 137 def self.(topic) self.instance.(topic) end |
.initialize_streams(topics) ⇒ Object
Stream APIs
124 125 126 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 124 def self.initialize_streams(topics) self.instance.initialize_streams(topics) end |
.instance(pool_size = 100) ⇒ Object
Get the singleton instance
43 44 45 46 47 48 49 50 51 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 43 def self.instance(pool_size = 100) # Logger.level = Logger::DEBUG return @@instance if @@instance @@instance_mutex.synchronize do @@instance ||= self.new(pool_size) return @@instance end end |
.method_missing(message, *args, **kwargs, &block) ⇒ Object
Delegate all unknown class methods to delegate to the instance
55 56 57 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 55 def self.method_missing(, *args, &block) self.instance.public_send(, *args, &block) end |
.read_topic_last(topic) ⇒ Object
174 175 176 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 174 def self.read_topic_last(topic) self.instance.read_topic_last(topic) end |
.read_topics(topics, offsets = nil, timeout_ms = 1000, &block) ⇒ Object
224 225 226 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 224 def self.read_topics(topics, offsets = nil, timeout_ms = 1000, &block) self.instance.read_topics(topics, offsets, timeout_ms, &block) end |
.trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer
Trims older entries of the redis stream if needed. > www.rubydoc.info/github/redis/redis-rb/Redis:xtrim
305 306 307 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 305 def self.trim_topic(topic, minid, approximate = true, limit: 0) self.instance.trim_topic(topic, minid, approximate, limit: limit) end |
.update_topic_offsets(topics) ⇒ Object
TODO: Currently unused def decrement_id(id)
time, sequence = id.split('-')
if sequence == '0'
"#{time.to_i - 1}-18446744073709551615"
else
"#{time}-#{sequence.to_i - 1}"
end
end
202 203 204 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 202 def self.update_topic_offsets(topics) self.instance.update_topic_offsets(topics) end |
.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true) ⇒ String
Add new entry to the redis stream. > www.rubydoc.info/github/redis/redis-rb/Redis:xadd
263 264 265 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 263 def self.write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true) self.instance.write_topic(topic, msg_hash, id, maxlen, approximate) end |
Instance Method Details
#build_redis ⇒ Object
85 86 87 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 85 def build_redis return Redis.new(url: @redis_url, username: @redis_username, password: @redis_key) end |
#get_cmd_item(target_name, packet_name, param_name, type: :WITH_UNITS, scope: $cosmos_scope) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 90 def get_cmd_item(target_name, packet_name, param_name, type: :WITH_UNITS, scope: $cosmos_scope) msg_id, msg_hash = read_topic_last("#{scope}__DECOMCMD__{#{target_name}}__#{packet_name}") if msg_id # TODO: We now have these reserved items directly on command packets # Do we still calculate from msg_hash['time'] or use the times directly? # # if param_name == 'RECEIVED_TIMESECONDS' || param_name == 'PACKET_TIMESECONDS' # Time.from_nsec_from_epoch(msg_hash['time'].to_i).to_f # elsif param_name == 'RECEIVED_TIMEFORMATTED' || param_name == 'PACKET_TIMEFORMATTED' # Time.from_nsec_from_epoch(msg_hash['time'].to_i).formatted if param_name == 'RECEIVED_COUNT' msg_hash['received_count'].to_i else json = msg_hash['json_data'] hash = JSON.parse(json) # Start from the most complex down to the basic raw value value = hash["#{param_name}__U"] return value if value && type == :WITH_UNITS value = hash["#{param_name}__F"] return value if value && (type == :WITH_UNITS || type == :FORMATTED) value = hash["#{param_name}__C"] return value if value && (type == :WITH_UNITS || type == :FORMATTED || type == :CONVERTED) return hash[param_name] end end end |
#get_last_offset(topic) ⇒ Object
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 163 def get_last_offset(topic) @redis_pool.with do |redis| result = redis.xrevrange(topic, count: 1) if result and result[0] and result[0][0] result[0][0] else "0-0" end end end |
#get_newest_message(topic) ⇒ Object
152 153 154 155 156 157 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 152 def (topic) @redis_pool.with do |redis| result = redis.xrevrange(topic, count: 1) return result[0] end end |
#get_oldest_message(topic) ⇒ Object
141 142 143 144 145 146 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 141 def (topic) @redis_pool.with do |redis| result = redis.xrange(topic, count: 1) return result[0] end end |
#initialize_streams(topics) ⇒ Object
128 129 130 131 132 133 134 135 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 128 def initialize_streams(topics) @redis_pool.with do |redis| topics.each do |topic| # Create empty stream with maxlen 0 redis.xadd(topic, { a: 'b' }, maxlen: 0) end end end |
#read_topic_last(topic) ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 178 def read_topic_last(topic) @redis_pool.with do |redis| # Default in xrevrange is range end '+', start '-' which means get all # elements from higher ID to lower ID and since we're limiting to 1 # we get the last element. See https://redis.io/commands/xrevrange. result = redis.xrevrange(topic, count: 1) if result and result.length > 0 return result[0] else return nil end end end |
#read_topics(topics, offsets = nil, timeout_ms = 1000) ⇒ Object
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 228 def read_topics(topics, offsets = nil, timeout_ms = 1000) # Logger.debug "read_topics: #{topics}, #{offsets} pool:#{@redis_pool}" @redis_pool.with do |redis| offsets = update_topic_offsets(topics) unless offsets result = redis.xread(topics, offsets, block: timeout_ms) if result and result.length > 0 result.each do |topic, | .each do |msg_id, msg_hash| @topic_offsets[topic] = msg_id yield topic, msg_id, msg_hash, redis if block_given? end end end # Logger.debug "result:#{result}" if result and result.length > 0 return result end end |
#trim_topic(topic, minid, approximate = true, limit: 0) ⇒ Integer
Trims older entries of the redis stream if needed. > www.rubydoc.info/github/redis/redis-rb/Redis:xtrim
322 323 324 325 326 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 322 def trim_topic(topic, minid, approximate = true, limit: 0) @redis_pool.with do |redis| return redis.xtrim_minid(topic, minid, approximate: approximate, limit: limit) end end |
#update_topic_offsets(topics) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 206 def update_topic_offsets(topics) offsets = [] topics.each do |topic| # Normally we will just be grabbing the topic offset # this allows xread to get everything past this point last_id = @topic_offsets[topic] if last_id offsets << last_id else # If there is no topic offset this is the first call. # Get the last offset ID so we'll start getting everything from now on offsets << get_last_offset(topic) @topic_offsets[topic] = offsets[-1] end end return offsets end |
#write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true) ⇒ String
Add new entry to the redis stream. > www.rubydoc.info/github/redis/redis-rb/Redis:xadd
284 285 286 287 288 289 290 |
# File 'lib/cosmos/utilities/store_autoload.rb', line 284 def write_topic(topic, msg_hash, id = '*', maxlen = nil, approximate = true) id = '*' if id.nil? # Logger.debug "write_topic topic:#{topic} id:#{id} hash:#{msg_hash}" @redis_pool.with do |redis| return redis.xadd(topic, msg_hash, id: id, maxlen: maxlen, approximate: approximate) end end |