Class: Burstflow::Workflow

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Callbacks, Configuration
Defined in:
lib/burstflow/workflow.rb

Defined Under Namespace

Modules: Callbacks, Configuration Classes: Builder, InternalError

Constant Summary collapse

INITIAL =
'initial'.freeze
RUNNING =
'running'.freeze
FINISHED =
'finished'.freeze
FAILED =
'failed'.freeze
SUSPENDED =
'suspended'.freeze
STATUSES =
[INITIAL, RUNNING, FINISHED, FAILED, SUSPENDED].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#cacheObject

Returns the value of attribute cache.



27
28
29
# File 'lib/burstflow/workflow.rb', line 27

def cache
  @cache
end

#managerObject

Returns the value of attribute manager.



27
28
29
# File 'lib/burstflow/workflow.rb', line 27

def manager
  @manager
end

Class Method Details

.build(*args) ⇒ Object



57
58
59
60
61
62
# File 'lib/burstflow/workflow.rb', line 57

def self.build(*args)
  new.tap do |wf|
    builder = Burstflow::Workflow::Builder.new(wf, *args, &configuration)
    wf.flow = { 'jobs_config' => builder.as_json }
  end
end

Instance Method Details

#add_error(job_orexception) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/burstflow/workflow.rb', line 103

def add_error(job_orexception)
  context = {
    created_at: Time.now.to_i
  }
  if job_orexception.is_a?(::Exception)
    context[:message]     = job_orexception.message
    context[:klass]       = job_orexception.class.to_s
    context[:backtrace]   = job_orexception.backtrace.first(10)
    context[:cause]       = job_orexception.cause
  else
    context[:job] = job_orexception.id
  end

  failures.push(context)
end

#attributesObject



47
48
49
50
51
52
53
54
55
# File 'lib/burstflow/workflow.rb', line 47

def attributes
  {
    id: self.id,
    jobs_config: self.jobs_config,
    type: self.class.to_s,
    status: status,
    failures: failures
  }
end

#complete!Object



133
134
135
136
137
138
139
140
141
# File 'lib/burstflow/workflow.rb', line 133

def complete!
  if has_errors?
    failed!
  elsif has_suspended_jobs?
    suspended!
  else
    finished!
  end
end

#failed!Object



168
169
170
171
172
173
174
175
176
177
# File 'lib/burstflow/workflow.rb', line 168

def failed!
  run_callbacks :failure do
    raise InternalError.new(self, "Can't fail: workflow already failed") if failed?
    raise InternalError.new(self, "Can't fail: workflow already finished") if finished?
    raise InternalError.new(self, "Can't fail: workflow in not runnig") unless running? || suspended?

    self.status = FAILED
    save!
  end
end

#finished!Object



179
180
181
182
183
184
185
186
187
188
# File 'lib/burstflow/workflow.rb', line 179

def finished!
  run_callbacks :finish do
    raise InternalError.new(self, "Can't finish: workflow already finished") if finished?
    raise InternalError.new(self, "Can't finish: workflow already failed") if failed?
    raise InternalError.new(self, "Can't finish: workflow in not runnig") unless running?

    self.status = FINISHED
    save!
  end
end

#finished_atObject



155
156
157
# File 'lib/burstflow/workflow.rb', line 155

def finished_at
  last_job&.finished_at
end

#first_jobObject



143
144
145
# File 'lib/burstflow/workflow.rb', line 143

def first_job
  all_jobs.min_by{|n| n.started_at || Time.now.to_i }
end

#has_errors?Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/burstflow/workflow.rb', line 119

def has_errors?
  failures.any?
end

#has_scheduled_jobs?Boolean

Returns:

  • (Boolean)


123
124
125
126
127
# File 'lib/burstflow/workflow.rb', line 123

def has_scheduled_jobs?
  cache[:has_scheduled_jobs] ||= jobs.any? do |job|
    job.scheduled? || (job.initial? && !job.enqueued?)
  end
end

#has_suspended_jobs?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/burstflow/workflow.rb', line 129

def has_suspended_jobs?
  cache[:has_suspended_jobs] ||= jobs.any?(&:suspended?)
end

#initial_jobsObject



99
100
101
# File 'lib/burstflow/workflow.rb', line 99

def initial_jobs
  cache[:initial_jobs] ||= jobs.select(&:initial?)
end

#job(id) ⇒ Object



91
92
93
# File 'lib/burstflow/workflow.rb', line 91

def job(id)
  Burstflow::Job.from_hash(self, job_hash(id))
end

#job_hash(id) ⇒ Object



87
88
89
# File 'lib/burstflow/workflow.rb', line 87

def job_hash(id)
  jobs_config[id].deep_dup
end

#jobsObject



79
80
81
82
83
84
85
# File 'lib/burstflow/workflow.rb', line 79

def jobs
  Enumerator.new do |y|
    jobs_config.keys.each do |id|
      y << job(id)
    end
  end
end

#last_jobObject



147
148
149
# File 'lib/burstflow/workflow.rb', line 147

def last_job
  all_jobs.max_by{|n| n.finished_at || 0 } if finished?
end

#reloadObject



64
65
66
67
# File 'lib/burstflow/workflow.rb', line 64

def reload(*)
  self.cache = {}
  super
end

#resume!(job_id, data) ⇒ Object



74
75
76
77
# File 'lib/burstflow/workflow.rb', line 74

def resume!(job_id, data)
  manager.resume_workflow!(job_id, data)
  self
end

#resumed!Object



201
202
203
204
205
206
207
208
209
210
211
# File 'lib/burstflow/workflow.rb', line 201

def resumed!
  run_callbacks :resume do
    raise InternalError.new(self, "Can't resume: workflow already running") if running?
    raise InternalError.new(self, "Can't resume: workflow already finished") if finished?
    raise InternalError.new(self, "Can't resume: workflow already failed") if failed?
    raise InternalError.new(self, "Can't resume: workflow in not suspended") unless suspended?

    self.status = RUNNING
    save!
  end
end

#runnig!Object

Raises:



159
160
161
162
163
164
165
166
# File 'lib/burstflow/workflow.rb', line 159

def runnig!
  raise InternalError.new(self, "Can't start: workflow already running") if running? || suspended?
  raise InternalError.new(self, "Can't start: workflow already failed") if failed?
  raise InternalError.new(self, "Can't start: workflow already finished") if finished?

  self.status = RUNNING
  save!
end

#set_job(job) ⇒ Object



95
96
97
# File 'lib/burstflow/workflow.rb', line 95

def set_job(job)
  jobs_config[job.id] = job.as_json
end

#start!Object



69
70
71
72
# File 'lib/burstflow/workflow.rb', line 69

def start!
  manager.start_workflow!
  self
end

#started_atObject



151
152
153
# File 'lib/burstflow/workflow.rb', line 151

def started_at
  first_job&.started_at
end

#suspended!Object



190
191
192
193
194
195
196
197
198
199
# File 'lib/burstflow/workflow.rb', line 190

def suspended!
  run_callbacks :suspend do
    raise InternalError.new(self, "Can't suspend: workflow already finished") if finished?
    raise InternalError.new(self, "Can't suspend: workflow already failed") if failed?
    raise InternalError.new(self, "Can't suspend: workflow in not runnig") unless running?

    self.status = SUSPENDED
    save!
  end
end