Class: Pigeon::Engine

Inherits:
Object
  • Object
show all
Extended by:
OptionAccessor
Defined in:
lib/pigeon/engine.rb

Defined Under Namespace

Classes: ConfigurationError, RuntimeError

Constant Summary collapse

CHAINS =

Constants ============================================================

%w[
  after_initialize
  before_start
  after_start
  before_stop
  after_stop
  before_resume
  after_resume
  before_standby
  after_standby
  before_shutdown
  after_shutdown
].collect(&:to_sym).freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from OptionAccessor

option_accessor, option_reader, option_writer

Constructor Details

#initialize(options = nil) ⇒ Engine

Instance Methods =====================================================



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/pigeon/engine.rb', line 283

def initialize(options = nil)
  @id = Pigeon::Support.unique_id

  wrap_chain(:initialize) do
    @options = options || { }
  
    @task_lock = Mutex.new
    @task_locks = { }

    @task_register_lock = Mutex.new
    @registered_tasks = { }
  
    self.logger ||= self.engine_logger
    self.logger.level = Pigeon::Logger::DEBUG if (self.debug?)
  
    @dispatcher = { }
  
    @state = :initialized
  end
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



51
52
53
# File 'lib/pigeon/engine.rb', line 51

def id
  @id
end

#stateObject (readonly)

Returns the value of attribute state.



52
53
54
# File 'lib/pigeon/engine.rb', line 52

def state
  @state
end

Class Method Details

.chain_procs(chain_name) ⇒ Object



460
461
462
463
464
# File 'lib/pigeon/engine.rb', line 460

def chain_procs(chain_name)
  chain_iv = :"@_#{chain_name}_chain"

  instance_variable_defined?(chain_iv) and instance_variable_get(chain_iv)
end

.clear_engines!Object



271
272
273
# File 'lib/pigeon/engine.rb', line 271

def self.clear_engines!
  @engines = [ ]
end

.default_engineObject

Returns a handle to the engine currently running, or nil if no engine is currently active.



251
252
253
254
255
# File 'lib/pigeon/engine.rb', line 251

def self.default_engine
  @engines ||= [ ]

  @engines[0]
end

.engine_loggerObject

Returns a default logger for the engine.



230
231
232
233
234
235
236
237
# File 'lib/pigeon/engine.rb', line 230

def self.engine_logger
  @engine_logger ||= begin
    f = File.open(File.expand_path(self.engine_log_name, self.log_dir), 'a')
    f.sync = true

    Pigeon::Logger.new(f, self.log_rotation)
  end
end

.execute_in_main_thread(&block) ⇒ Object

Schedules a block for execution on the main EventMachine thread. This is a wrapper around the EventMachine.schedule method.



277
278
279
# File 'lib/pigeon/engine.rb', line 277

def self.execute_in_main_thread(&block)
  EventMachine.next_tick(&block)
end

.launch(options = nil) ⇒ Object

Launches the engine with the specified options



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/pigeon/engine.rb', line 125

def self.launch(options = nil)
  engine = nil
  
  EventMachine.run do
    engine = new(options)
    
    Signal.trap('INT') do
      engine.terminate
    end

    Pigeon::Engine.register_engine(engine)

    yield(engine) if (block_given?)

    engine.run
  end

  Pigeon::Engine.unregister_engine(engine)
end

.log_dirObject

Returns the full path to the directory used to store logs.



120
121
122
# File 'lib/pigeon/engine.rb', line 120

def self.log_dir
  @log_file_path ||= Pigeon::Support.find_writable_directory(self.try_log_dirs)
end

.nameObject

Returns the human-readable name of this engine. Defaults to the name of the engine class, but can be replaced to customize a subclass.



74
75
76
# File 'lib/pigeon/engine.rb', line 74

def self.name
  @name ||= self.to_s.gsub(/::/, ' ')
end

.pid_fileObject



145
146
147
# File 'lib/pigeon/engine.rb', line 145

def self.pid_file
  @pid_file ||= Pigeon::Pidfile.new(self.pid_file_path)
end

.pid_file_nameObject

Returns the name of the PID file to use. The full path to the file is specified elsewhere.



103
104
105
# File 'lib/pigeon/engine.rb', line 103

def self.pid_file_name
  @pid_file_name ||= self.name.downcase.gsub(/ /, '-') + '.pid'
end

.pid_file_pathObject

Returns the full path to the PID file that should be used to track the running status of this engine.



109
110
111
112
113
114
115
116
117
# File 'lib/pigeon/engine.rb', line 109

def self.pid_file_path
  @pid_file_path ||= begin
    if (path = Pigeon::Support.find_writable_directory(self.try_pid_dirs))
      File.expand_path(self.pid_file_name, path)
    else
      raise ConfigurationError, "Could not find a writable directory for the PID file in: #{self.try_pid_dirs.join(' ')}"
    end
  end
end

.process_nameObject

Returns the custom process name for this engine or nil if not assigned.



79
80
81
# File 'lib/pigeon/engine.rb', line 79

def self.process_name
  @process_name ||= nil
end

.process_name=(value) ⇒ Object

Assigns the process name. This will be applied only when the engine is started.



85
86
87
# File 'lib/pigeon/engine.rb', line 85

def self.process_name=(value)
  @process_name = value
end

.query_loggerObject

Returns a default logger for queries.



240
241
242
243
244
245
246
247
# File 'lib/pigeon/engine.rb', line 240

def self.query_logger
  @query_logger ||= begin
    f = File.open(File.expand_path(self.query_log_name, self.log_dir), 'a')
    f.sync = true
  
    Pigeon::Logger.new(f, self.log_rotation)
  end
end

.register_engine(engine) ⇒ Object

Registers the engine as running. The first engine running will show up as the default engine.



259
260
261
262
# File 'lib/pigeon/engine.rb', line 259

def self.register_engine(engine)
  @engines ||= [ ]
  @engines << engine
end

.restartObject



209
210
211
212
213
214
215
# File 'lib/pigeon/engine.rb', line 209

def self.restart
  self.stop do |old_pid|
    self.start do |pid|
      yield(pid, old_pid) if (block_given?)
    end
  end
end

.run {|$$| ... } ⇒ Object

Yields:

  • ($$)


173
174
175
176
177
# File 'lib/pigeon/engine.rb', line 173

def self.run
  yield($$) if (block_given?)

  launch(foreground: true)
end

.running?Boolean

Returns:

  • (Boolean)


217
218
219
# File 'lib/pigeon/engine.rb', line 217

def self.running?
  pid_file.running
end

.start(options = nil) {|pid, true| ... } ⇒ Object

Yields:

  • (pid, true)


149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/pigeon/engine.rb', line 149

def self.start(options = nil)
  if (self.pid_file.running?)
    if (block_given?)
      yield(self.pid_file.pid, false)
    end

    return self.pid_file.pid
  end

  logger = self.engine_logger
  
  pid = Pigeon::Support.daemonize(logger) do
    launch({
      logger: logger
    }.merge(options || { }))
  end

  pid_file.create!(pid)

  yield(pid, true) if (block_given?)
  
  pid
end

.status {|pid| ... } ⇒ Object

Yields:

  • (pid)


221
222
223
224
225
226
227
# File 'lib/pigeon/engine.rb', line 221

def self.status
  pid = pid_file.running
  
  yield(pid) if (block_given?)
  
  pid
end

.stop {|pid| ... } ⇒ Object

Yields:

  • (pid)


179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/pigeon/engine.rb', line 179

def self.stop
  pid = self.pid_file.running
  
  if (pid)
    begin
      Process.kill('INT', pid)
      
    rescue Errno::ESRCH
      # No such process exception
      pid = nil
    end
    
    begin
      while (Process.kill(0, pid))
        Pigeon::Support.nap(0.1)
      end
    rescue Errno::ESRCH
      # No such process, already terminated
    end

    pid_file.remove!
  end
  
  pid = pid.to_i if (pid)

  yield(pid) if (block_given?)
  
  pid
end

.unregister_engine(engine) ⇒ Object

Removes the engine from the list of running engines.



265
266
267
268
269
# File 'lib/pigeon/engine.rb', line 265

def self.unregister_engine(engine)
  return unless (defined?(@engines))

  @engines.delete(engine)
end

.userObject

Returns the user this process should run as, or nil if no particular user is required. This will be applied after the engine has been started and the after_start call has been triggered.



92
93
94
# File 'lib/pigeon/engine.rb', line 92

def self.user
  @user ||= nil
end

.user=(value) ⇒ Object

Assigns the user this process should run as, given a username.



97
98
99
# File 'lib/pigeon/engine.rb', line 97

def self.user=(value)
  @user = value
end

Instance Method Details

#defer(&block) ⇒ Object

Used to defer a block of work for near-immediate execution. Is a wrapper around EventMachine.defer and does not perform as well as using the alternate dispatch method.



371
372
373
# File 'lib/pigeon/engine.rb', line 371

def defer(&block)
  EventMachine.defer(&block)
end

#dispatch(name = :default, &block) ⇒ Object

Used to dispatch a block for immediate processing on a background thread. An optional queue name can be used to sequence tasks properly. The main queue has a large number of threads, while the named queues default to only one so they can be processed sequentially.



397
398
399
400
401
402
403
404
405
# File 'lib/pigeon/engine.rb', line 397

def dispatch(name = :default, &block)
  if (self.threaded?)
    target_queue = @dispatcher[name] ||= Pigeon::Dispatcher.new(name == :default ? nil : 1)

    target_queue.perform(&block)
  else
    EventMachine.next_tick(&block)
  end
end

#execute_in_main_thread(&block) ⇒ Object

Schedules a block for execution on the main EventMachine thread. This is a wrapper around the EventMachine.schedule method.



377
378
379
# File 'lib/pigeon/engine.rb', line 377

def execute_in_main_thread(&block)
  EventMachine.schedule(&block)
end

#hostObject

Returns the hostname of the system this engine is running on.



305
306
307
# File 'lib/pigeon/engine.rb', line 305

def host
  Socket.gethostname
end

#periodically(interval, &block) ⇒ Object

Periodically calls a block. No check is performed to see if the block is already executing.



364
365
366
# File 'lib/pigeon/engine.rb', line 364

def periodically(interval, &block)
  EventMachine::PeriodicTimer.new(interval, &block)
end

#periodically_trigger_task(task_name = nil, interval = 1, &block) ⇒ Object

Used to periodically execute a task or block. When giving a task name, a method by that name is called, otherwise a block must be supplied. An interval can be specified in seconds, or will default to 1.



328
329
330
331
332
# File 'lib/pigeon/engine.rb', line 328

def periodically_trigger_task(task_name = nil, interval = 1, &block)
  periodically(interval) do
    trigger_task(task_name, &block)
  end
end

#register_task(task) ⇒ Object

Registers a task with the engine. The given task will then be included in the list returned by registered_tasks.



469
470
471
472
473
# File 'lib/pigeon/engine.rb', line 469

def register_task(task)
  @task_register_lock.synchronize do
    @registered_tasks[task] = task
  end
end

#registered_tasksObject

Returns a list of tasks that have been registered with the engine.



483
484
485
486
487
# File 'lib/pigeon/engine.rb', line 483

def registered_tasks
  @task_register_lock.synchronize do
    @registered_tasks.values
  end
end

#resume!Object



407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/pigeon/engine.rb', line 407

def resume!
  case (@state)
  when :running
    # Ignored since already running.
  when :terminated
    # Invalid operation, should produce error.
  else
    wrap_chain(:resume) do
      @state = :running
    end
  end
end

#runObject

Handles the run phase of the engine, triggers the before_start and after_start events accordingly.



311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/pigeon/engine.rb', line 311

def run
  assign_process_name!

  wrap_chain(:start) do
    STDOUT.sync = true

    logger.info("Engine \##{id} Running")
  
    switch_to_effective_user! if (self.class.user)

    @state = :running
  end
end

#shutdown!Object



433
434
435
436
437
438
439
440
441
442
# File 'lib/pigeon/engine.rb', line 433

def shutdown!
  case (@state)
  when :terminated
    # Already terminated, ignored.
  else
    wrap_chain(:shutdown) do
      self.terminate
    end
  end
end

#standby!Object



420
421
422
423
424
425
426
427
428
429
430
431
# File 'lib/pigeon/engine.rb', line 420

def standby!
  case (@state)
  when :standby
    # Already in standby state, ignored.
  when :terminated
    # Invalid operation, should produce error.
  else
    wrap_chain(:standby) do
      @state = :standby
    end
  end
end

#task_lock(task_name) ⇒ Object

This is a somewhat naive locking mechanism that may break down when two requests are fired off within a nearly identical period. For now, this achieves a general purpose solution that should work under most circumstances. Refactor later to improve.



346
347
348
349
350
351
352
353
354
355
356
# File 'lib/pigeon/engine.rb', line 346

def task_lock(task_name)
  @task_lock.synchronize do
    @task_locks[task_name] ||= Mutex.new
  end
  
  return if (@task_locks[task_name].locked?)
  
  @task_locks[task_name].synchronize do
    yield if (block_given?)
  end
end

#terminateObject

Shuts down the engine. Will also trigger the before_stop and after_stop events.



383
384
385
386
387
388
389
390
391
# File 'lib/pigeon/engine.rb', line 383

def terminate
  wrap_chain(:stop) do
    if (EventMachine.reactor_running?)
      EventMachine.stop_event_loop
    end

    @state = :terminated
  end
end

#timer(interval, &block) ⇒ Object



358
359
360
# File 'lib/pigeon/engine.rb', line 358

def timer(interval, &block)
  EventMachine::Timer.new(interval, &block)
end

#trigger_task(task_name = nil, &block) ⇒ Object

This acts as a lock to prevent over-lapping calls to the same method. While the first call is in progress, all subsequent calls will be ignored.



336
337
338
339
340
# File 'lib/pigeon/engine.rb', line 336

def trigger_task(task_name = nil, &block)
  task_lock(task_name || block) do
    block_given? ? yield : send(task_name)
  end
end

#unregister_task(task) ⇒ Object

Removes a task from the list of tasks registered with this engine.



476
477
478
479
480
# File 'lib/pigeon/engine.rb', line 476

def unregister_task(task)
  @task_register_lock.synchronize do
    @registered_tasks.delete(task)
  end
end