Class: Cosmos::Operator

Inherits:
Object show all
Defined in:
lib/cosmos/operators/operator.rb

Direct Known Subclasses

MicroserviceOperator

Constant Summary collapse

CYCLE_TIME =

cycle time to check for new microservices

15.0
@@instance =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeOperator

Returns a new instance of Operator.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/cosmos/operators/operator.rb', line 136

def initialize
  Logger.level = Logger::INFO
  # TODO: This is pretty generic. Can we pass in more information to help identify the operator?
  Logger.microservice_name = 'MicroserviceOperator'
  Logger.tag = "operator.log"

  OperatorProcess.setup()
  @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds

  @ruby_process_name = ENV['COSMOS_RUBY']
  if RUBY_ENGINE != 'ruby'
    @ruby_process_name ||= 'jruby'
  else
    @ruby_process_name ||= 'ruby'
  end

  @processes = {}
  @new_processes = {}
  @changed_processes = {}
  @removed_processes = {}
  @mutex = Mutex.new
  @shutdown = false
  @shutdown_complete = false
end

Instance Attribute Details

#cycle_timeObject (readonly)

Returns the value of attribute cycle_time.



130
131
132
# File 'lib/cosmos/operators/operator.rb', line 130

def cycle_time
  @cycle_time
end

#processesObject (readonly)

Returns the value of attribute processes.



130
131
132
# File 'lib/cosmos/operators/operator.rb', line 130

def processes
  @processes
end

Class Method Details

.instanceObject



274
275
276
# File 'lib/cosmos/operators/operator.rb', line 274

def self.instance
  @@instance
end

.processesObject



270
271
272
# File 'lib/cosmos/operators/operator.rb', line 270

def self.processes
  @@instance.processes
end

.runObject



265
266
267
268
# File 'lib/cosmos/operators/operator.rb', line 265

def self.run
  @@instance = self.new
  @@instance.run
end

Instance Method Details

#remove_oldObject



189
190
191
192
193
194
195
196
197
# File 'lib/cosmos/operators/operator.rb', line 189

def remove_old
  @mutex.synchronize do
    if @removed_processes.length > 0
      Logger.info("Shutting down #{@removed_processes.length} removed microservices...")
      shutdown_processes(@removed_processes)
      @removed_processes = {}
    end
  end
end

#respawn_changedObject



176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/cosmos/operators/operator.rb', line 176

def respawn_changed
  @mutex.synchronize do
    if @changed_processes.length > 0
      Logger.info("Cycling #{@changed_processes.length} changed microservices...")
      shutdown_processes(@changed_processes)
      break if @shutdown

      @changed_processes.each { |name, p| p.start }
      @changed_processes = {}
    end
  end
end

#respawn_deadObject



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/cosmos/operators/operator.rb', line 199

def respawn_dead
  @mutex.synchronize do
    @processes.each do |name, p|
      break if @shutdown

      unless p.alive?
        # Respawn process
        p.stdout.rewind
        output = p.stdout.read
        p.stdout.close
        p.stdout.unlink
        p.stderr.rewind
        err_output = p.stderr.read
        p.stderr.close
        p.stderr.unlink
        Logger.error("Unexpected process died... respawning! #{p.process_definition.join(' ')}\nStdout:\n#{output}\nStderr:\n#{err_output}\n", scope: p.scope)
        p.start
      end
    end
  end
end

#runObject



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/cosmos/operators/operator.rb', line 227

def run
  # Use at_exit to shutdown cleanly
  at_exit do
    @shutdown = true
    @mutex.synchronize do
      Logger.info("Shutting down processes...")
      shutdown_processes(@processes)
      @shutdown_complete = true
    end
  end

  # Monitor processes and respawn if died
  Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...")
  loop do
    update()
    remove_old()
    respawn_changed()
    start_new()
    respawn_dead()
    break if @shutdown

    sleep(@cycle_time)
    break if @shutdown
  end

  loop do
    break if @shutdown_complete

    sleep(1)
  end
ensure
  Logger.info("#{self.class} shutdown complete")
end

#shutdown_processes(processes) ⇒ Object



221
222
223
224
225
# File 'lib/cosmos/operators/operator.rb', line 221

def shutdown_processes(processes)
  processes.each { |name, p| p.soft_stop }
  sleep(2) # TODO: This is an arbitrary sleep of 2s ...
  processes.each { |name, p| p.hard_stop }
end

#start_newObject



165
166
167
168
169
170
171
172
173
174
# File 'lib/cosmos/operators/operator.rb', line 165

def start_new
  @mutex.synchronize do
    if @new_processes.length > 0
      # Start all the processes
      Logger.info("#{self.class} starting each new process...")
      @new_processes.each { |name, p| p.start }
      @new_processes = {}
    end
  end
end

#stopObject



261
262
263
# File 'lib/cosmos/operators/operator.rb', line 261

def stop
  @shutdown = true
end

#updateObject



161
162
163
# File 'lib/cosmos/operators/operator.rb', line 161

def update
  raise "Implement in subclass"
end