Class: Dwf::Item

Inherits:
Object
  • Object
show all
Includes:
Concerns::Checkable
Defined in:
lib/dwf/item.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Concerns::Checkable

#leaf?, #ready_to_start?, #running?, #started?, #succeeded?

Constructor Details

#initialize(options = {}) ⇒ Item

Returns a new instance of Item.



15
16
17
# File 'lib/dwf/item.rb', line 15

def initialize(options = {})
  assign_attributes(options)
end

Instance Attribute Details

#callback_typeObject

Returns the value of attribute callback_type.



13
14
15
# File 'lib/dwf/item.rb', line 13

def callback_type
  @callback_type
end

#enqueued_atObject (readonly)

Returns the value of attribute enqueued_at.



10
11
12
# File 'lib/dwf/item.rb', line 10

def enqueued_at
  @enqueued_at
end

#failed_atObject (readonly)

Returns the value of attribute failed_at.



10
11
12
# File 'lib/dwf/item.rb', line 10

def failed_at
  @failed_at
end

#finished_atObject (readonly)

Returns the value of attribute finished_at.



10
11
12
# File 'lib/dwf/item.rb', line 10

def finished_at
  @finished_at
end

#idObject (readonly)

Returns the value of attribute id.



10
11
12
# File 'lib/dwf/item.rb', line 10

def id
  @id
end

#incomingObject

Returns the value of attribute incoming.



13
14
15
# File 'lib/dwf/item.rb', line 13

def incoming
  @incoming
end

#klassObject (readonly)

Returns the value of attribute klass.



10
11
12
# File 'lib/dwf/item.rb', line 10

def klass
  @klass
end

#outgoingObject

Returns the value of attribute outgoing.



13
14
15
# File 'lib/dwf/item.rb', line 13

def outgoing
  @outgoing
end

#output_payloadObject (readonly)

Returns the value of attribute output_payload.



10
11
12
# File 'lib/dwf/item.rb', line 10

def output_payload
  @output_payload
end

#paramsObject (readonly)

Returns the value of attribute params.



10
11
12
# File 'lib/dwf/item.rb', line 10

def params
  @params
end

#payloadsObject



73
74
75
# File 'lib/dwf/item.rb', line 73

def payloads
  @payloads ||= build_payloads
end

#queueObject (readonly)

Returns the value of attribute queue.



10
11
12
# File 'lib/dwf/item.rb', line 10

def queue
  @queue
end

#started_atObject (readonly)

Returns the value of attribute started_at.



10
11
12
# File 'lib/dwf/item.rb', line 10

def started_at
  @started_at
end

#workflow_idObject (readonly)

Returns the value of attribute workflow_id.



10
11
12
# File 'lib/dwf/item.rb', line 10

def workflow_id
  @workflow_id
end

Class Method Details

.from_hash(hash) ⇒ Object



19
20
21
# File 'lib/dwf/item.rb', line 19

def self.from_hash(hash)
  Module.const_get(hash[:klass]).new(hash)
end

Instance Method Details

#as_jsonObject



153
154
155
# File 'lib/dwf/item.rb', line 153

def as_json
  to_hash.to_json
end

#cb_build_in?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/dwf/item.rb', line 39

def cb_build_in?
  callback_type == Dwf::Workflow::BUILD_IN
end

#current_timestampObject



119
120
121
# File 'lib/dwf/item.rb', line 119

def current_timestamp
  Time.now.to_i
end

#enqueue!Object



77
78
79
80
81
82
# File 'lib/dwf/item.rb', line 77

def enqueue!
  @enqueued_at = current_timestamp
  @started_at = nil
  @finished_at = nil
  @failed_at = nil
end

#enqueue_outgoing_jobsObject



123
124
125
126
127
128
129
130
131
132
# File 'lib/dwf/item.rb', line 123

def enqueue_outgoing_jobs
  return workflow.enqueue_outgoing_jobs if leaf?

  outgoing.each do |job_name|
    client.check_or_lock(workflow_id, job_name)
    out = client.find_node(job_name, workflow_id)
    out.persist_and_perform_async! if out.ready_to_start?
    client.release_lock(workflow_id, job_name)
  end
end

#enqueued?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/dwf/item.rb', line 107

def enqueued?
  !enqueued_at.nil?
end

#fail!Object



103
104
105
# File 'lib/dwf/item.rb', line 103

def fail!
  @finished_at = @failed_at = current_timestamp
end

#failed?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/dwf/item.rb', line 115

def failed?
  !failed_at.nil?
end

#finish!Object



99
100
101
# File 'lib/dwf/item.rb', line 99

def finish!
  @finished_at = current_timestamp
end

#finished?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/dwf/item.rb', line 111

def finished?
  !finished_at.nil?
end

#mark_as_finishedObject



89
90
91
92
# File 'lib/dwf/item.rb', line 89

def mark_as_finished
  finish!
  persist!
end

#mark_as_startedObject



84
85
86
87
# File 'lib/dwf/item.rb', line 84

def mark_as_started
  start!
  persist!
end

#nameObject



57
58
59
# File 'lib/dwf/item.rb', line 57

def name
  @name ||= "#{klass}|#{id}"
end

#no_dependencies?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/dwf/item.rb', line 65

def no_dependencies?
  incoming.empty?
end

#output(data) ⇒ Object



61
62
63
# File 'lib/dwf/item.rb', line 61

def output(data)
  @output_payload = data
end

#parents_succeeded?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/dwf/item.rb', line 69

def parents_succeeded?
  incoming.all? { |name| client.find_node(name, workflow_id).succeeded? }
end

#performObject



37
# File 'lib/dwf/item.rb', line 37

def perform; end

#perform_asyncObject



52
53
54
55
# File 'lib/dwf/item.rb', line 52

def perform_async
  Dwf::Worker.set(queue: queue || client.config.namespace)
    .perform_async(workflow_id, name)
end

#persist!Object



157
158
159
# File 'lib/dwf/item.rb', line 157

def persist!
  client.persist_job(self)
end

#persist_and_perform_async!Object



32
33
34
35
# File 'lib/dwf/item.rb', line 32

def persist_and_perform_async!
  enqueue_and_persist!
  perform_async
end

#reloadObject



47
48
49
50
# File 'lib/dwf/item.rb', line 47

def reload
  item = client.find_job(workflow_id, name)
  assign_attributes(item.to_hash)
end

#start!Object



94
95
96
97
# File 'lib/dwf/item.rb', line 94

def start!
  @started_at = current_timestamp
  @failed_at = nil
end

#start_batch!Object



27
28
29
30
# File 'lib/dwf/item.rb', line 27

def start_batch!
  enqueue_and_persist!
  Dwf::Callback.new.start(self)
end

#start_initial!Object



23
24
25
# File 'lib/dwf/item.rb', line 23

def start_initial!
  cb_build_in? ? persist_and_perform_async! : start_batch!
end

#to_hashObject



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/dwf/item.rb', line 134

def to_hash
  {
    id: id,
    klass: klass.to_s,
    queue: queue,
    incoming: incoming,
    outgoing: outgoing,
    finished_at: finished_at,
    enqueued_at: enqueued_at,
    started_at: started_at,
    failed_at: failed_at,
    params: params,
    workflow_id: workflow_id,
    callback_type: callback_type,
    output_payload: output_payload,
    payloads: @payloads
  }
end

#workflowObject



43
44
45
# File 'lib/dwf/item.rb', line 43

def workflow
  @workflow ||= client.find_workflow(workflow_id)
end