Class: DTK::State::WorkflowInstance
- Inherits:
-
Object
- Object
- DTK::State::WorkflowInstance
- Includes:
- Utils::CrdHelper
- Defined in:
- lib/state/workflow_instance.rb,
lib/state/workflow_instance/attribute_type_info.rb
Defined Under Namespace
Classes: AttributeTypeInfo
Constant Summary collapse
- WORKFLOW_INSTANCE_CRD_VERSION =
ENV["WORKFLOW_INSTANCE_CRD_VERSION"]
Instance Attribute Summary collapse
-
#assembly ⇒ Object
readonly
Returns the value of attribute assembly.
-
#attribute_type_info ⇒ Object
readonly
Returns the value of attribute attribute_type_info.
-
#attributes ⇒ Object
readonly
Returns the value of attribute attributes.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#namespace ⇒ Object
readonly
Returns the value of attribute namespace.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#workflow ⇒ Object
Returns the value of attribute workflow.
-
#workflow_template ⇒ Object
readonly
Returns the value of attribute workflow_template.
Class Method Summary collapse
- .find_action(id, workflow = @workflow) ⇒ Object
- .get(namespace, name, opts = {}) ⇒ Object
- .get_action_attributes(namespace, name, action_id, opts = {}) ⇒ Object
- .get_attributes(namespace, name, opts = {}) ⇒ Object
- .get_with_influx_data(namespace, workflow_instance_name, opts = {}) ⇒ Object
- .patchError!(patches, message, action_index_steps) ⇒ Object
- .update_action_level_result_attributes(namespace, name, attributes, action_id, opts = {}) ⇒ Object
- .update_action_status(namespace, name, parent_id, action_id, status, error_message = "", opts = {}) ⇒ Object
Instance Method Summary collapse
- #attribute_metadata ⇒ Object
- #attribute_values ⇒ Object
- #get_workflow_template(opts = {}) ⇒ Object
-
#initialize(namespace, name, crd_content) ⇒ WorkflowInstance
constructor
A new instance of WorkflowInstance.
- #to_hash ⇒ Object
Methods included from Utils::CrdHelper
Constructor Details
#initialize(namespace, name, crd_content) ⇒ WorkflowInstance
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/state/workflow_instance.rb', line 12 def initialize(namespace, name, crd_content) @name = name @namespace = namespace @api_version = crd_content.apiVersion @kind = crd_content.kind @metadata = crd_content. @references = crd_content.references @assembly = @references.assembly @workflow_template = @references.workflow @attributes = crd_content.spec.attributes || {} @status = crd_content.spec.status || {} @workflow = crd_content.spec.workflow || {} end |
Instance Attribute Details
#assembly ⇒ Object (readonly)
Returns the value of attribute assembly.
9 10 11 |
# File 'lib/state/workflow_instance.rb', line 9 def assembly @assembly end |
#attribute_type_info ⇒ Object (readonly)
Returns the value of attribute attribute_type_info.
9 10 11 |
# File 'lib/state/workflow_instance.rb', line 9 def attribute_type_info @attribute_type_info end |
#attributes ⇒ Object (readonly)
Returns the value of attribute attributes.
9 10 11 |
# File 'lib/state/workflow_instance.rb', line 9 def attributes @attributes end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
9 10 11 |
# File 'lib/state/workflow_instance.rb', line 9 def name @name end |
#namespace ⇒ Object (readonly)
Returns the value of attribute namespace.
9 10 11 |
# File 'lib/state/workflow_instance.rb', line 9 def namespace @namespace end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
9 10 11 |
# File 'lib/state/workflow_instance.rb', line 9 def status @status end |
#workflow ⇒ Object
Returns the value of attribute workflow.
9 10 11 |
# File 'lib/state/workflow_instance.rb', line 9 def workflow @workflow end |
#workflow_template ⇒ Object (readonly)
Returns the value of attribute workflow_template.
9 10 11 |
# File 'lib/state/workflow_instance.rb', line 9 def workflow_template @workflow_template end |
Class Method Details
.find_action(id, workflow = @workflow) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/state/workflow_instance.rb', line 134 def self.find_action(id, workflow = @workflow) action = nil subtasks = workflow[:subtasks] subtasks.each do |subtask| if(subtask.id.to_s == id.to_s) action = subtask break elsif subtask[:subtasks] action = find_action(id, subtask) break unless action.nil? end end action end |
.get(namespace, name, opts = {}) ⇒ Object
29 30 31 32 33 |
# File 'lib/state/workflow_instance.rb', line 29 def self.get(namespace, name, opts = {}) opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace) WorkflowInstance.new(namespace, name, workflow_instance) end |
.get_action_attributes(namespace, name, action_id, opts = {}) ⇒ Object
81 82 83 84 85 86 87 |
# File 'lib/state/workflow_instance.rb', line 81 def self.get_action_attributes(namespace, name, action_id, opts = {}) workflow_instance = get(namespace, name, opts) action = WorkflowInstance.find_action(action_id, workflow_instance.workflow) return nil unless action attributes = action[:attributes] || {} attributes.to_h end |
.get_attributes(namespace, name, opts = {}) ⇒ Object
76 77 78 79 |
# File 'lib/state/workflow_instance.rb', line 76 def self.get_attributes(namespace, name, opts = {}) workflow_instance = get(namespace, name, opts) workflow_instance.attributes.to_h end |
.get_with_influx_data(namespace, workflow_instance_name, opts = {}) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/state/workflow_instance.rb', line 35 def self.get_with_influx_data(namespace, workflow_instance_name, opts = {}) workflow_instance = get(namespace, workflow_instance_name, opts) return unless workflow_instance workflow_instance.workflow[:subtasks].each do |subtask| component_name, action_name = subtask[:component].split('.') assembly_name = workflow_instance.assembly[:name] executable_action = ::DTK::State::ExecutableAction.get(namespace, assembly_name, component_name, action_name, opts) attr_type_info = executable_action.attribute_type_info attr_type_info.each do |attr_info| if attr_info.temporal attribute_name = attr_info.name influxdb = ::DTK::State::Component::Attribute::Influxdb.new(:attributes) influxdb_attribute = influxdb.get(namespace, component_name, assembly_name, attribute_name, opts) if valid_attribute = influxdb_attribute.first value = valid_attribute['_value'] subtask[:attributes][attribute_name] = value end end end end workflow_instance end |
.patchError!(patches, message, action_index_steps) ⇒ Object
111 112 113 114 115 116 117 118 |
# File 'lib/state/workflow_instance.rb', line 111 def self.patchError!(patches, , action_index_steps) errorPatch = { "op" => "add", "path" => "/spec/status/steps/#{action_index_steps}/errorMsg", "value" => } patches << errorPatch end |
.update_action_level_result_attributes(namespace, name, attributes, action_id, opts = {}) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/state/workflow_instance.rb', line 89 def self.update_action_level_result_attributes(namespace, name, attributes, action_id, opts = {}) return "Dynamic attributes do not exist for action with id #{@action_id}, nothing to update" if attributes.nil? || attributes.empty? attributes.delete_if { |key, value| value.nil? || value.to_s.strip == '' } opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace) workflow = workflow_instance[:spec][:workflow] action = WorkflowInstance.find_action(action_id, workflow) action[:attributes] = {} if !action[:attributes] attributes.each do |attr_name, attr_val| action[:attributes][attr_name.to_sym] = {} unless action[:attributes][attr_name.to_sym] unless action[:attributes][attr_name.to_sym][:hidden] if attr_val.is_a? Hash action[:attributes][attr_name.to_sym][:value] = attr_val[:value] || attr_val else action[:attributes][attr_name.to_sym][:value] = attr_val end end end ::DTK::CrdClient.get_kubeclient(opts).update_workflow_instance(workflow_instance) end |
.update_action_status(namespace, name, parent_id, action_id, status, error_message = "", opts = {}) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/state/workflow_instance.rb', line 120 def self.update_action_status(namespace, name, parent_id, action_id, status, = "", opts = {}) opts[:apiVersion] = WORKFLOW_INSTANCE_CRD_VERSION workflow_instance = ::DTK::CrdClient.get_kubeclient(opts).get_workflow_instance(name, namespace) steps = workflow_instance[:spec][:status][:steps] action_index_steps = steps.find_index { |action| action[:id].eql? action_id } patch = [{ "op" => "replace", "path" => "/spec/status/steps/#{action_index_steps}/state", "value" => status }] patchError!(patch, , action_index_steps) unless .empty? || .nil? ::DTK::CrdClient.get_kubeclient(opts).json_patch_workflow_instance(name, patch, namespace) end |
Instance Method Details
#attribute_metadata ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/state/workflow_instance.rb', line 153 def attributes = @attributes.to_hash attr_type_info = get_workflow_template.attribute_type_info = {} attr_type_info.each do |attr_info| attr_info_hash = attr_info.to_hash attribute_name = attr_info_hash[:name].to_sym if attribute = attributes[attribute_name] if attribute.is_a?(String) attribute = { value: attribute } end [attribute_name] = attr_info_hash.merge(attribute) end end end |
#attribute_values ⇒ Object
174 175 176 177 178 179 180 |
# File 'lib/state/workflow_instance.rb', line 174 def attribute_values attribute_with_values = {} @attributes.each_pair do |name, content| attribute_with_values.merge!(name => content[:value]) end attribute_with_values end |
#get_workflow_template(opts = {}) ⇒ Object
149 150 151 |
# File 'lib/state/workflow_instance.rb', line 149 def get_workflow_template(opts = {}) Workflow.get(@workflow_template.namespace, @workflow_template.name, opts) end |
#to_hash ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/state/workflow_instance.rb', line 62 def to_hash { apiVersion: @api_version, kind: @kind, metadata: (@metadata), references: @references.to_hash, spec: { attributes: @attributes.to_hash, status: @status.to_hash, workflow: @workflow.to_hash } } end |