Class: Burstflow::Workflow
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Burstflow::Workflow
show all
- Includes:
- Callbacks, Configuration
- Defined in:
- lib/burstflow/workflow.rb
Defined Under Namespace
Modules: Callbacks, Configuration
Classes: Builder, InternalError
Constant Summary
collapse
- INITIAL =
'initial'.freeze
- RUNNING =
'running'.freeze
- FINISHED =
'finished'.freeze
- FAILED =
'failed'.freeze
- SUSPENDED =
'suspended'.freeze
- STATUSES =
[INITIAL, RUNNING, FINISHED, FAILED, SUSPENDED].freeze
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Instance Attribute Details
#cache ⇒ Object
Returns the value of attribute cache.
27
28
29
|
# File 'lib/burstflow/workflow.rb', line 27
def cache
@cache
end
|
#manager ⇒ Object
Returns the value of attribute manager.
27
28
29
|
# File 'lib/burstflow/workflow.rb', line 27
def manager
@manager
end
|
Class Method Details
.build(*args) ⇒ Object
57
58
59
60
61
62
|
# File 'lib/burstflow/workflow.rb', line 57
def self.build(*args)
new.tap do |wf|
builder = Burstflow::Workflow::Builder.new(wf, *args, &configuration)
wf.flow = { 'jobs_config' => builder.as_json }
end
end
|
Instance Method Details
#add_error(job_orexception) ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/burstflow/workflow.rb', line 103
def add_error(job_orexception)
context = {
created_at: Time.now.to_i
}
if job_orexception.is_a?(::Exception)
context[:message] = job_orexception.message
context[:klass] = job_orexception.class.to_s
context[:backtrace] = job_orexception.backtrace.first(10)
context[:cause] = job_orexception.cause
else
context[:job] = job_orexception.id
end
failures.push(context)
end
|
#attributes ⇒ Object
47
48
49
50
51
52
53
54
55
|
# File 'lib/burstflow/workflow.rb', line 47
def attributes
{
id: self.id,
jobs_config: self.jobs_config,
type: self.class.to_s,
status: status,
failures: failures
}
end
|
#complete! ⇒ Object
133
134
135
136
137
138
139
140
141
|
# File 'lib/burstflow/workflow.rb', line 133
def complete!
if has_errors?
failed!
elsif has_suspended_jobs?
suspended!
else
finished!
end
end
|
#failed! ⇒ Object
168
169
170
171
172
173
174
175
176
177
|
# File 'lib/burstflow/workflow.rb', line 168
def failed!
run_callbacks :failure do
raise InternalError.new(self, "Can't fail: workflow already failed") if failed?
raise InternalError.new(self, "Can't fail: workflow already finished") if finished?
raise InternalError.new(self, "Can't fail: workflow in not runnig") unless running? || suspended?
self.status = FAILED
save!
end
end
|
#finished! ⇒ Object
179
180
181
182
183
184
185
186
187
188
|
# File 'lib/burstflow/workflow.rb', line 179
def finished!
run_callbacks :finish do
raise InternalError.new(self, "Can't finish: workflow already finished") if finished?
raise InternalError.new(self, "Can't finish: workflow already failed") if failed?
raise InternalError.new(self, "Can't finish: workflow in not runnig") unless running?
self.status = FINISHED
save!
end
end
|
#finished_at ⇒ Object
155
156
157
|
# File 'lib/burstflow/workflow.rb', line 155
def finished_at
last_job&.finished_at
end
|
#first_job ⇒ Object
143
144
145
|
# File 'lib/burstflow/workflow.rb', line 143
def first_job
all_jobs.min_by{|n| n.started_at || Time.now.to_i }
end
|
#has_errors? ⇒ Boolean
119
120
121
|
# File 'lib/burstflow/workflow.rb', line 119
def has_errors?
failures.any?
end
|
#has_scheduled_jobs? ⇒ Boolean
123
124
125
126
127
|
# File 'lib/burstflow/workflow.rb', line 123
def has_scheduled_jobs?
cache[:has_scheduled_jobs] ||= jobs.any? do |job|
job.scheduled? || (job.initial? && !job.enqueued?)
end
end
|
#has_suspended_jobs? ⇒ Boolean
129
130
131
|
# File 'lib/burstflow/workflow.rb', line 129
def has_suspended_jobs?
cache[:has_suspended_jobs] ||= jobs.any?(&:suspended?)
end
|
#initial_jobs ⇒ Object
99
100
101
|
# File 'lib/burstflow/workflow.rb', line 99
def initial_jobs
cache[:initial_jobs] ||= jobs.select(&:initial?)
end
|
#job(id) ⇒ Object
91
92
93
|
# File 'lib/burstflow/workflow.rb', line 91
def job(id)
Burstflow::Job.from_hash(self, job_hash(id))
end
|
#job_hash(id) ⇒ Object
87
88
89
|
# File 'lib/burstflow/workflow.rb', line 87
def job_hash(id)
jobs_config[id].deep_dup
end
|
#jobs ⇒ Object
79
80
81
82
83
84
85
|
# File 'lib/burstflow/workflow.rb', line 79
def jobs
Enumerator.new do |y|
jobs_config.keys.each do |id|
y << job(id)
end
end
end
|
#last_job ⇒ Object
147
148
149
|
# File 'lib/burstflow/workflow.rb', line 147
def last_job
all_jobs.max_by{|n| n.finished_at || 0 } if finished?
end
|
#reload ⇒ Object
64
65
66
67
|
# File 'lib/burstflow/workflow.rb', line 64
def reload(*)
self.cache = {}
super
end
|
#resume!(job_id, data) ⇒ Object
74
75
76
77
|
# File 'lib/burstflow/workflow.rb', line 74
def resume!(job_id, data)
manager.resume_workflow!(job_id, data)
self
end
|
#resumed! ⇒ Object
201
202
203
204
205
206
207
208
209
210
211
|
# File 'lib/burstflow/workflow.rb', line 201
def resumed!
run_callbacks :resume do
raise InternalError.new(self, "Can't resume: workflow already running") if running?
raise InternalError.new(self, "Can't resume: workflow already finished") if finished?
raise InternalError.new(self, "Can't resume: workflow already failed") if failed?
raise InternalError.new(self, "Can't resume: workflow in not suspended") unless suspended?
self.status = RUNNING
save!
end
end
|
#runnig! ⇒ Object
159
160
161
162
163
164
165
166
|
# File 'lib/burstflow/workflow.rb', line 159
def runnig!
raise InternalError.new(self, "Can't start: workflow already running") if running? || suspended?
raise InternalError.new(self, "Can't start: workflow already failed") if failed?
raise InternalError.new(self, "Can't start: workflow already finished") if finished?
self.status = RUNNING
save!
end
|
#set_job(job) ⇒ Object
95
96
97
|
# File 'lib/burstflow/workflow.rb', line 95
def set_job(job)
jobs_config[job.id] = job.as_json
end
|
#start! ⇒ Object
69
70
71
72
|
# File 'lib/burstflow/workflow.rb', line 69
def start!
manager.start_workflow!
self
end
|
#started_at ⇒ Object
151
152
153
|
# File 'lib/burstflow/workflow.rb', line 151
def started_at
first_job&.started_at
end
|
#suspended! ⇒ Object
190
191
192
193
194
195
196
197
198
199
|
# File 'lib/burstflow/workflow.rb', line 190
def suspended!
run_callbacks :suspend do
raise InternalError.new(self, "Can't suspend: workflow already finished") if finished?
raise InternalError.new(self, "Can't suspend: workflow already failed") if failed?
raise InternalError.new(self, "Can't suspend: workflow in not runnig") unless running?
self.status = SUSPENDED
save!
end
end
|