Class: Ruote::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/ruote/worker.rb

Overview

Workers fetch ‘msgs’ and ‘schedules’ from the storage and process them.

Read more at ruote.rubyforge.org/configuration.html

Defined Under Namespace

Classes: Info

Constant Summary collapse

EXP_ACTIONS =
%w[ reply cancel fail receive dispatched pause resume ]
PROC_ACTIONS =

‘apply’ is comprised in ‘launch’ ‘receive’ is a ParticipantExpression alias for ‘reply’

%w[ cancel kill pause resume ].collect { |a| a + '_process' }
DISP_ACTIONS =
%w[ dispatch dispatch_cancel dispatch_pause dispatch_resume ]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, storage = nil) ⇒ Worker

Given a storage, creates a new instance of a Worker.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/ruote/worker.rb', line 54

def initialize(name, storage=nil)

  if storage.nil?
    storage = name
    name = nil
  end

  @name = name || 'worker'

  if storage.respond_to?(:storage)
    @storage = storage.storage
    @context = storage.context
  else
    @storage = storage
    @context = Ruote::Context.new(storage)
  end

  service_name = @name
  service_name << '_worker' unless service_name.match(/worker$/)

  @context.add_service(service_name, self)

  @last_time = Time.at(0.0).utc # 1970...

  @state = 'running'
  @run_thread = nil
  @state_mutex = Mutex.new

  @msgs = []

  @sleep_time = @context['restless_worker'] ? nil : 0.000

  @info = @context['worker_info_enabled'] == false ? nil : Info.new(self)
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



47
48
49
# File 'lib/ruote/worker.rb', line 47

def context
  @context
end

#nameObject (readonly)

Returns the value of attribute name.



44
45
46
# File 'lib/ruote/worker.rb', line 44

def name
  @name
end

#run_threadObject (readonly)

Returns the value of attribute run_thread.



50
51
52
# File 'lib/ruote/worker.rb', line 50

def run_thread
  @run_thread
end

#stateObject (readonly)

Returns the value of attribute state.



49
50
51
# File 'lib/ruote/worker.rb', line 49

def state
  @state
end

#storageObject (readonly)

Returns the value of attribute storage.



46
47
48
# File 'lib/ruote/worker.rb', line 46

def storage
  @storage
end

Instance Method Details

#inactive?Boolean

Returns true if the engine system is inactive, ie if all the process instances are terminated or are stuck in an error.

NOTE : for now, if a branch of a process is in error while another is still running, this method will consider the process instance inactive (and it will return true if all the processes are considered inactive).

Returns:

  • (Boolean)


134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/ruote/worker.rb', line 134

def inactive?

  # the cheaper tests first

  return false if @msgs.size > 0
  return false unless @context.storage.empty?('schedules')
  return false unless @context.storage.empty?('msgs')

  wfids = @context.storage.get_many('expressions').collect { |exp|
    exp['fei']['wfid']
  }

  error_wfids = @context.storage.get_many('errors').collect { |err|
    err['fei']['wfid']
  }

  (wfids - error_wfids == [])
end

#joinObject

Joins the run thread of this worker (if there is no such thread, this method will return immediately, without any effect).



112
113
114
115
# File 'lib/ruote/worker.rb', line 112

def join

  @run_thread.join rescue nil
end

#runObject

Runs the worker in the current thread. See #run_in_thread for running in a dedicated thread.



92
93
94
95
# File 'lib/ruote/worker.rb', line 92

def run

  step while @state != 'stopped'
end

#run_in_threadObject

Triggers the run method of the worker in a dedicated thread.



99
100
101
102
103
104
105
106
107
# File 'lib/ruote/worker.rb', line 99

def run_in_thread

  #Thread.abort_on_exception = true

  @state = 'running'

  @run_thread = Thread.new { run }
  @run_thread['ruote_worker'] = self
end

#shutdownObject

Shuts down this worker (makes sure it won’t fetch further messages and schedules).



120
121
122
123
124
125
# File 'lib/ruote/worker.rb', line 120

def shutdown

  @state_mutex.synchronize { @state = 'stopped' }

  join
end