Class: DTK::State::WorkflowInstance

Inherits:
Object
  • Object
show all
Includes:
Utils::CrdHelper
Defined in:
lib/state/workflow_instance.rb,
lib/state/workflow_instance/attribute_type_info.rb

Defined Under Namespace

Classes: AttributeTypeInfo

Constant Summary collapse

WORKFLOW_INSTANCE_CRD_VERSION =
ENV["WORKFLOW_INSTANCE_CRD_VERSION"]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utils::CrdHelper

#filter_metadata

Constructor Details

#initialize(namespace, name, crd_content) ⇒ WorkflowInstance



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/state/workflow_instance.rb', line 12

def initialize(namespace, name, crd_content)
  @name        = name
  @namespace   = namespace

  @api_version    = crd_content.apiVersion
  @kind           = crd_content.kind
  @metadata       = crd_content.

  @references        = crd_content.references
  @assembly          = @references.assembly
  @workflow_template = @references.workflow

  @attributes        = crd_content.spec.attributes || {}
  @status            = crd_content.spec.status || {}
  @workflow          = crd_content.spec.workflow || {}
end

Instance Attribute Details

#assemblyObject (readonly)

Returns the value of attribute assembly.



9
10
11
# File 'lib/state/workflow_instance.rb', line 9

def assembly
  @assembly
end

#attribute_type_infoObject (readonly)

Returns the value of attribute attribute_type_info.



9
10
11
# File 'lib/state/workflow_instance.rb', line 9

def attribute_type_info
  @attribute_type_info
end

#attributesObject (readonly)

Returns the value of attribute attributes.



9
10
11
# File 'lib/state/workflow_instance.rb', line 9

def attributes
  @attributes
end

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/state/workflow_instance.rb', line 9

def name
  @name
end

#namespaceObject (readonly)

Returns the value of attribute namespace.



9
10
11
# File 'lib/state/workflow_instance.rb', line 9

def namespace
  @namespace
end

#statusObject (readonly)

Returns the value of attribute status.



9
10
11
# File 'lib/state/workflow_instance.rb', line 9

def status
  @status
end

#workflowObject

Returns the value of attribute workflow.



9
10
11
# File 'lib/state/workflow_instance.rb', line 9

def workflow
  @workflow
end

#workflow_templateObject (readonly)

Returns the value of attribute workflow_template.



9
10
11
# File 'lib/state/workflow_instance.rb', line 9

def workflow_template
  @workflow_template
end

Class Method Details

.find_action(id, workflow = @workflow) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/state/workflow_instance.rb', line 134

def self.find_action(id, workflow = @workflow)
  action = nil
  subtasks = workflow[:subtasks]
  subtasks.each do |subtask|
    if(subtask.id.to_s == id.to_s)
      action = subtask
      break
    elsif subtask[:subtasks]
      action = find_action(id, subtask)
      break unless action.nil?
    end
  end
  action
end

.get(namespace, name, opts = {}) ⇒ Object



29
30
31
32
33
# File 'lib/state/workflow_instance.rb', line 29

def self.get(namespace, name, opts = {})
  opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION
  workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace)
  WorkflowInstance.new(namespace, name, workflow_instance)
end

.get_action_attributes(namespace, name, action_id, opts = {}) ⇒ Object



81
82
83
84
85
86
87
# File 'lib/state/workflow_instance.rb', line 81

def self.get_action_attributes(namespace, name, action_id, opts = {})
  workflow_instance = get(namespace, name, opts)
  action = WorkflowInstance.find_action(action_id, workflow_instance.workflow)
  return nil unless action
  attributes = action[:attributes] || {}
  attributes.to_h
end

.get_attributes(namespace, name, opts = {}) ⇒ Object



76
77
78
79
# File 'lib/state/workflow_instance.rb', line 76

def self.get_attributes(namespace, name, opts = {})
  workflow_instance = get(namespace, name, opts)
  workflow_instance.attributes.to_h
end

.get_with_influx_data(namespace, workflow_instance_name, opts = {}) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/state/workflow_instance.rb', line 35

def self.get_with_influx_data(namespace, workflow_instance_name, opts = {})
  workflow_instance = get(namespace, workflow_instance_name, opts)
  return unless workflow_instance

  workflow_instance.workflow[:subtasks].each do |subtask|
    component_name, action_name = subtask[:component].split('.')
    assembly_name = workflow_instance.assembly[:name]

    executable_action = ::DTK::State::ExecutableAction.get(namespace, assembly_name, component_name, action_name, opts)
    attr_type_info = executable_action.attribute_type_info

    attr_type_info.each do |attr_info|
      if attr_info.temporal
        attribute_name = attr_info.name
        influxdb = ::DTK::State::Component::Attribute::Influxdb.new(:attributes)
        influxdb_attribute = influxdb.get(namespace, component_name, assembly_name, attribute_name, opts)
        if valid_attribute = influxdb_attribute.first
          value = valid_attribute['_value']
          subtask[:attributes][attribute_name] = value
        end
      end
    end
  end

  workflow_instance
end

.patchError!(patches, message, action_index_steps) ⇒ Object



111
112
113
114
115
116
117
118
# File 'lib/state/workflow_instance.rb', line 111

def self.patchError!(patches, message, action_index_steps)
  errorPatch = {
    "op" => "add",
    "path" => "/spec/status/steps/#{action_index_steps}/errorMsg",
    "value" => message
  }
  patches << errorPatch
end

.update_action_level_result_attributes(namespace, name, attributes, action_id, opts = {}) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/state/workflow_instance.rb', line 89

def self.update_action_level_result_attributes(namespace, name, attributes, action_id, opts = {})
  return "Dynamic attributes do not exist for action with id #{@action_id}, nothing to update" if attributes.nil? || attributes.empty?
  attributes.delete_if { |key, value| value.nil? || value.to_s.strip == '' }
  opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION
  workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace)
  workflow = workflow_instance[:spec][:workflow]

  action = WorkflowInstance.find_action(action_id, workflow)
  action[:attributes] = {} if !action[:attributes]
  attributes.each do |attr_name, attr_val|
    action[:attributes][attr_name.to_sym] = {} unless action[:attributes][attr_name.to_sym]
    unless action[:attributes][attr_name.to_sym][:hidden]
      if attr_val.is_a? Hash
        action[:attributes][attr_name.to_sym][:value] = attr_val[:value] || attr_val
      else
        action[:attributes][attr_name.to_sym][:value] = attr_val
      end
    end
  end
  ::DTK::CrdClient.get_kubeclient(opts).update_workflow_instance(workflow_instance)
end

.update_action_status(namespace, name, parent_id, action_id, status, error_message = "", opts = {}) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/state/workflow_instance.rb', line 120

def self.update_action_status(namespace, name, parent_id, action_id, status, error_message = "", opts = {})
  opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION
  workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace)
  steps = workflow_instance[:spec][:status][:steps]
  action_index_steps = steps.find_index { |action| action[:id].eql? action_id }
  patch = [{
    "op" => "replace",
    "path" => "/spec/status/steps/#{action_index_steps}/state",
    "value" => status
  }]
  patchError!(patch, error_message, action_index_steps) unless error_message.empty? || error_message.nil?
  ::DTK::CrdClient.get_kubeclient(opts).json_patch_workflow_instance(name, patch, namespace)
end

Instance Method Details

#attribute_metadataObject



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/state/workflow_instance.rb', line 153

def 
  attributes         = @attributes.to_hash
  attr_type_info     = get_workflow_template.attribute_type_info
   = {}

  attr_type_info.each do |attr_info|
    attr_info_hash = attr_info.to_hash
    attribute_name = attr_info_hash[:name].to_sym

    if attribute = attributes[attribute_name]
      if attribute.is_a?(String)
        attribute = { value: attribute }
      end

      [attribute_name] = attr_info_hash.merge(attribute)
    end
  end

  
end

#attribute_valuesObject



174
175
176
177
178
179
180
# File 'lib/state/workflow_instance.rb', line 174

def attribute_values
  attribute_with_values = {}
  @attributes.each_pair do |name, content|
    attribute_with_values.merge!(name => content[:value])
  end
  attribute_with_values
end

#get_workflow_template(opts = {}) ⇒ Object



149
150
151
# File 'lib/state/workflow_instance.rb', line 149

def get_workflow_template(opts = {})
  Workflow.get(@workflow_template.namespace, @workflow_template.name, opts)
end

#to_hashObject



62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/state/workflow_instance.rb', line 62

def to_hash
  {
    apiVersion: @api_version,
    kind: @kind,
    metadata: (@metadata),
    references: @references.to_hash,
    spec: {
      attributes: @attributes.to_hash,
      status: @status.to_hash,
      workflow: @workflow.to_hash
    }
  }
end