Class: Ruote::Worker
- Inherits:
-
Object
- Object
- Ruote::Worker
- Defined in:
- lib/ruote/worker.rb
Overview
Workers fetch ‘msgs’ and ‘schedules’ from the storage and process them.
Read more at ruote.rubyforge.org/configuration.html
Defined Under Namespace
Classes: Info
Constant Summary collapse
- EXP_ACTIONS =
%w[ reply cancel fail receive dispatched pause resume ]
- PROC_ACTIONS =
‘apply’ is comprised in ‘launch’ ‘receive’ is a ParticipantExpression alias for ‘reply’
%w[ cancel kill pause resume ].collect { |a| a + '_process' }
- DISP_ACTIONS =
%w[ dispatch dispatch_cancel dispatch_pause dispatch_resume ]
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#run_thread ⇒ Object
readonly
Returns the value of attribute run_thread.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
Instance Method Summary collapse
-
#inactive? ⇒ Boolean
Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.
-
#initialize(name, storage = nil) ⇒ Worker
constructor
Given a storage, creates a new instance of a Worker.
-
#join ⇒ Object
Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).
-
#run ⇒ Object
Runs the worker in the current thread.
-
#run_in_thread ⇒ Object
Triggers the run method of the worker in a dedicated thread.
-
#shutdown ⇒ Object
Shuts down this worker (makes sure it won’t fetch further messages and schedules).
Constructor Details
#initialize(name, storage = nil) ⇒ Worker
Given a storage, creates a new instance of a Worker.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/ruote/worker.rb', line 54 def initialize(name, storage=nil) if storage.nil? storage = name name = nil end @name = name || 'worker' if storage.respond_to?(:storage) @storage = storage.storage @context = storage.context else @storage = storage @context = Ruote::Context.new(storage) end service_name = @name service_name << '_worker' unless service_name.match(/worker$/) @context.add_service(service_name, self) @last_time = Time.at(0.0).utc # 1970... @state = 'running' @run_thread = nil @state_mutex = Mutex.new @msgs = [] @sleep_time = @context['restless_worker'] ? nil : 0.000 @info = @context['worker_info_enabled'] == false ? nil : Info.new(self) end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
47 48 49 |
# File 'lib/ruote/worker.rb', line 47 def context @context end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
44 45 46 |
# File 'lib/ruote/worker.rb', line 44 def name @name end |
#run_thread ⇒ Object (readonly)
Returns the value of attribute run_thread.
50 51 52 |
# File 'lib/ruote/worker.rb', line 50 def run_thread @run_thread end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
49 50 51 |
# File 'lib/ruote/worker.rb', line 49 def state @state end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
46 47 48 |
# File 'lib/ruote/worker.rb', line 46 def storage @storage end |
Instance Method Details
#inactive? ⇒ Boolean
Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.
NOTE : for now, if a branch of a process is in error while another is still running, this method will consider the process instance inactive (and it will return true if all the processes are considered inactive).
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/ruote/worker.rb', line 134 def inactive? # the cheaper tests first return false if @msgs.size > 0 return false unless @context.storage.empty?('schedules') return false unless @context.storage.empty?('msgs') wfids = @context.storage.get_many('expressions').collect { |exp| exp['fei']['wfid'] } error_wfids = @context.storage.get_many('errors').collect { |err| err['fei']['wfid'] } (wfids - error_wfids == []) end |
#join ⇒ Object
Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).
112 113 114 115 |
# File 'lib/ruote/worker.rb', line 112 def join @run_thread.join rescue nil end |
#run ⇒ Object
Runs the worker in the current thread. See #run_in_thread for running in a dedicated thread.
92 93 94 95 |
# File 'lib/ruote/worker.rb', line 92 def run step while @state != 'stopped' end |
#run_in_thread ⇒ Object
Triggers the run method of the worker in a dedicated thread.
99 100 101 102 103 104 105 106 107 |
# File 'lib/ruote/worker.rb', line 99 def run_in_thread #Thread.abort_on_exception = true @state = 'running' @run_thread = Thread.new { run } @run_thread['ruote_worker'] = self end |
#shutdown ⇒ Object
Shuts down this worker (makes sure it won’t fetch further messages and schedules).
120 121 122 123 124 125 |
# File 'lib/ruote/worker.rb', line 120 def shutdown @state_mutex.synchronize { @state = 'stopped' } join end |