Class: Cosmos::Operator
Direct Known Subclasses
Constant Summary collapse
- CYCLE_TIME =
cycle time to check for new microservices
15.0
- @@instance =
nil
Instance Attribute Summary collapse
-
#cycle_time ⇒ Object
readonly
Returns the value of attribute cycle_time.
-
#processes ⇒ Object
readonly
Returns the value of attribute processes.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize ⇒ Operator
constructor
A new instance of Operator.
- #remove_old ⇒ Object
- #respawn_changed ⇒ Object
- #respawn_dead ⇒ Object
- #run ⇒ Object
- #shutdown_processes(processes) ⇒ Object
- #start_new ⇒ Object
- #stop ⇒ Object
- #update ⇒ Object
Constructor Details
#initialize ⇒ Operator
Returns a new instance of Operator.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/cosmos/operators/operator.rb', line 136 def initialize Logger.level = Logger::INFO # TODO: This is pretty generic. Can we pass in more information to help identify the operator? Logger.microservice_name = 'MicroserviceOperator' Logger.tag = "operator.log" OperatorProcess.setup() @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds @ruby_process_name = ENV['COSMOS_RUBY'] if RUBY_ENGINE != 'ruby' @ruby_process_name ||= 'jruby' else @ruby_process_name ||= 'ruby' end @processes = {} @new_processes = {} @changed_processes = {} @removed_processes = {} @mutex = Mutex.new @shutdown = false @shutdown_complete = false end |
Instance Attribute Details
#cycle_time ⇒ Object (readonly)
Returns the value of attribute cycle_time.
130 131 132 |
# File 'lib/cosmos/operators/operator.rb', line 130 def cycle_time @cycle_time end |
#processes ⇒ Object (readonly)
Returns the value of attribute processes.
130 131 132 |
# File 'lib/cosmos/operators/operator.rb', line 130 def processes @processes end |
Class Method Details
.instance ⇒ Object
274 275 276 |
# File 'lib/cosmos/operators/operator.rb', line 274 def self.instance @@instance end |
.processes ⇒ Object
270 271 272 |
# File 'lib/cosmos/operators/operator.rb', line 270 def self.processes @@instance.processes end |
.run ⇒ Object
265 266 267 268 |
# File 'lib/cosmos/operators/operator.rb', line 265 def self.run @@instance = self.new @@instance.run end |
Instance Method Details
#remove_old ⇒ Object
189 190 191 192 193 194 195 196 197 |
# File 'lib/cosmos/operators/operator.rb', line 189 def remove_old @mutex.synchronize do if @removed_processes.length > 0 Logger.info("Shutting down #{@removed_processes.length} removed microservices...") shutdown_processes(@removed_processes) @removed_processes = {} end end end |
#respawn_changed ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/cosmos/operators/operator.rb', line 176 def respawn_changed @mutex.synchronize do if @changed_processes.length > 0 Logger.info("Cycling #{@changed_processes.length} changed microservices...") shutdown_processes(@changed_processes) break if @shutdown @changed_processes.each { |name, p| p.start } @changed_processes = {} end end end |
#respawn_dead ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/cosmos/operators/operator.rb', line 199 def respawn_dead @mutex.synchronize do @processes.each do |name, p| break if @shutdown unless p.alive? # Respawn process p.stdout.rewind output = p.stdout.read p.stdout.close p.stdout.unlink p.stderr.rewind err_output = p.stderr.read p.stderr.close p.stderr.unlink Logger.error("Unexpected process died... respawning! #{p.process_definition.join(' ')}\nStdout:\n#{output}\nStderr:\n#{err_output}\n", scope: p.scope) p.start end end end end |
#run ⇒ Object
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/cosmos/operators/operator.rb', line 227 def run # Use at_exit to shutdown cleanly at_exit do @shutdown = true @mutex.synchronize do Logger.info("Shutting down processes...") shutdown_processes(@processes) @shutdown_complete = true end end # Monitor processes and respawn if died Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...") loop do update() remove_old() respawn_changed() start_new() respawn_dead() break if @shutdown sleep(@cycle_time) break if @shutdown end loop do break if @shutdown_complete sleep(1) end ensure Logger.info("#{self.class} shutdown complete") end |
#shutdown_processes(processes) ⇒ Object
221 222 223 224 225 |
# File 'lib/cosmos/operators/operator.rb', line 221 def shutdown_processes(processes) processes.each { |name, p| p.soft_stop } sleep(2) # TODO: This is an arbitrary sleep of 2s ... processes.each { |name, p| p.hard_stop } end |
#start_new ⇒ Object
165 166 167 168 169 170 171 172 173 174 |
# File 'lib/cosmos/operators/operator.rb', line 165 def start_new @mutex.synchronize do if @new_processes.length > 0 # Start all the processes Logger.info("#{self.class} starting each new process...") @new_processes.each { |name, p| p.start } @new_processes = {} end end end |
#stop ⇒ Object
261 262 263 |
# File 'lib/cosmos/operators/operator.rb', line 261 def stop @shutdown = true end |
#update ⇒ Object
161 162 163 |
# File 'lib/cosmos/operators/operator.rb', line 161 def update raise "Implement in subclass" end |