Module: DaemonicThreads::Prototype

Defined in:
lib/ruby-daemonic-threads/prototype.rb

Constant Summary collapse

RESTART_ON =

TODO: inheritable

IPSocket::SOCKET_EXEPTIONS + [DaemonicThreads::MustTerminatedState]

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



78
79
80
# File 'lib/ruby-daemonic-threads/prototype.rb', line 78

def logger
  @logger
end

Instance Method Details

#execute_thread(thread_name, thread_title, *args) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/ruby-daemonic-threads/prototype.rb', line 143

def execute_thread(thread_name, thread_title, *args)
 
  panic_on_exception(thread_title) do
    Thread.current[:title] = thread_title
    Thread.current[:started_at] = Time.now
    
    if block_given?
      yield(*args)
    elsif respond_to?(thread_name)
      __send__(thread_name, *args)
    else
      raise("Thread block was not given or method `#{thread_name}' not found. Don't know what to do.")
    end
  end
  
ensure
  panic_on_exception("#{thread_title} -- Release ActiveRecord connection to pool") { ActiveRecord::Base.clear_active_connections! }
end

#initialize(name, runner, parent = nil) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/ruby-daemonic-threads/prototype.rb', line 53

def initialize(name, runner, parent = nil)
  @name = name
  @runner = runner
  @config = runner.config
  @process = runner.process
  @logger = Rails.logger
  @parent = parent
  
  @queues = {}
  
  @config["queues"].each do |queue_handler, queue_name|
    @queues[queue_handler.to_sym] = @process.queues[queue_name.to_sym]
  end if @config["queues"]
    
  @queues.each do |queue_handler, queue|
    instance_variable_set("@#{queue_handler}", queue)
  end
  
  @threads = ThreadGroup.new
  @daemons = []
  @creatures_mutex = Mutex.new
  @stop_condition = ConditionVariable.new
  @must_terminate = false
end

#joinObject



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/ruby-daemonic-threads/prototype.rb', line 80

def join
  @creatures_mutex.synchronize do
    @stop_condition.wait(@creatures_mutex) unless @must_terminate
  end
  
  deinitialize_http if respond_to?(:deinitialize_http)
  
  @daemons.each {|daemon| daemon.join }
  
  @threads.list.each {|thread| thread.join }
end

#log(severity, message = nil) ⇒ Object



194
195
196
# File 'lib/ruby-daemonic-threads/prototype.rb', line 194

def log severity, message = nil
  @logger.__send__(severity) {"#{self.class}##{caller.first.match(/`(.*)'/)[1]} -- #{block_given? ? yield : message}"}
end

#must_terminate?Boolean

Returns:

  • (Boolean)


189
190
191
# File 'lib/ruby-daemonic-threads/prototype.rb', line 189

def must_terminate?
  @creatures_mutex.synchronize { @must_terminate }
end

#panic_on_exception(title = nil, handler = nil) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/ruby-daemonic-threads/prototype.rb', line 163

def panic_on_exception(title = nil, handler = nil)
  yield
rescue *(RESTART_ON) => exception
  begin
    exception.log!(@logger, :warn, title, (@process.controller.env == "production" ? :inspect : :inspect_with_backtrace))
    handler.call(exception) if handler
    @runner.restart_daemon
  rescue Exception => handler_exception
    begin
      handler_exception.log!(@logger, :fatal, title)
    ensure
      @process.controller.stop
    end
  end
rescue Exception => exception
  begin
    exception.log!(@logger, :fatal, title)
    handler.call(exception) if handler
  rescue Exception => handler_exception
    handler_exception.log!(@logger, :fatal, title)
  ensure
    @process.controller.stop
  end
end

#perform_initialize_daemon(*args) ⇒ Object



109
110
111
112
# File 'lib/ruby-daemonic-threads/prototype.rb', line 109

def perform_initialize_daemon(*args)
  initialize_http if respond_to?(:initialize_http)
  initialize_daemon(*args) if respond_to? :initialize_daemon
end

#spawn_daemon(name, klass, *args) ⇒ Object

Можно запускать из initialize_daemon или из любого треда



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/ruby-daemonic-threads/prototype.rb', line 116

def spawn_daemon name, klass, *args
  @creatures_mutex.synchronize do
    raise(DaemonicThreads::MustTerminatedState, "Unable to spawn new daemons after stop() is called") if @must_terminate
    
    # Мы не ловим никаких exceptions, потому что они поймаются или panic_on_exception (тред или http-запрос) или runner-ом (initialize, initialize_daemon).
    # Полагаться на себя тот должен, кто spawn_daemon вызвал из треда, запущенного без помощи spawn_thread, а значит без должной обработки ошибок.
     
    @daemons.push(daemon = klass.new(name, @runner, self))
    daemon.perform_initialize_daemon(*args)
    return daemon
  end
end

#spawn_thread(thread_name, *args, &block) ⇒ Object

Можно запускать из initialize_daemon или из любого треда



131
132
133
134
135
136
137
138
139
140
141
# File 'lib/ruby-daemonic-threads/prototype.rb', line 131

def spawn_thread(thread_name, *args, &block)
  @creatures_mutex.synchronize do
    raise(DaemonicThreads::MustTerminatedState, "Unable to spawn new threads after stop() is called") if @must_terminate
    
    thread_title = "#{self.class}#thread:`#{thread_name}' @name:`#{@name}'"
    
    @threads.add(thread = Thread.new { execute_thread(thread_name, thread_title, *args, &block) } )
    
    return thread
  end
end

#stopObject



93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/ruby-daemonic-threads/prototype.rb', line 93

def stop
  @creatures_mutex.synchronize do
    @must_terminate = true
    @stop_condition.signal
  end

  @daemons.each {|daemon| daemon.stop }
  
  @threads.list.each do |thread|
    @queues.each do |queue_handler, queue|
      queue.release_blocked thread
    end
  end unless @queues.empty?
end