Class: AWS::Flow::WorkflowWorker
- Inherits:
-
GenericWorker
- Object
- GenericWorker
- AWS::Flow::WorkflowWorker
- Defined in:
- lib/aws/decider/worker.rb
Overview
This worker class is intended for use by the workflow implementation. It is configured with a task list and a workflow implementation. The worker class polls for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @ execute() decorated method to process the task.
Instance Attribute Summary collapse
-
#workflow_type ⇒ Object
The workflow type for this workflow worker.
Instance Method Summary collapse
- #add_implementation(workflow_class) ⇒ Object
-
#add_workflow_implementation(workflow_class) ⇒ Object
private
Called by #add_implementation.
-
#initialize(service, domain, task_list, *args, &block) ⇒ WorkflowWorker
constructor
Creates a new WorkflowWorker instance.
-
#register ⇒ Object
Registers this workflow with Amazon SWF.
-
#run_once(should_register = false, poller = nil) ⇒ Object
Starts the workflow and runs it once, with an optional WorkflowTaskPoller.
- #set_workflow_implementation_types(workflow_implementation_types) ⇒ Object
-
#start(should_register = true) ⇒ Object
Starts the workflow with a WorkflowTaskPoller.
Methods inherited from GenericWorker
#camel_case_to_snake_case, #resolve_default_task_list
Constructor Details
#initialize(service, domain, task_list, *args, &block) ⇒ WorkflowWorker
Creates a new WorkflowWorker instance.
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/aws/decider/worker.rb', line 130 def initialize(service, domain, task_list, *args, &block) @workflow_definition_map = {} = [] = Utilities::(WorkerOptions, block) @logger = .logger if @logger ||= Utilities::LogFactory.make_logger(self) .logger ||= @logger if super(service, domain, task_list, *args) end |
Instance Attribute Details
#workflow_type ⇒ Object
The workflow type for this workflow worker.
114 115 116 |
# File 'lib/aws/decider/worker.rb', line 114 def workflow_type @workflow_type end |
Instance Method Details
#add_implementation(workflow_class) ⇒ Object
146 147 148 |
# File 'lib/aws/decider/worker.rb', line 146 def add_implementation(workflow_class) add_workflow_implementation(workflow_class) end |
#add_workflow_implementation(workflow_class) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by #add_implementation.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/aws/decider/worker.rb', line 152 def add_workflow_implementation(workflow_class) workflow_class.workflows.delete_if do |workflow_type| workflow_type.version.nil? || workflow_type.name.nil? end @workflow_definition_map.merge!( WorkflowDefinitionFactory.generate_definition_map(workflow_class) ) workflow_class.workflows.each do |workflow_type| # TODO should probably do something like # GenericWorkflowWorker#registerWorkflowTypes = workflow_type. workflow_hash = .( [ :default_task_start_to_close_timeout, :default_execution_start_to_close_timeout, :default_child_policy, :default_task_priority ], { :domain => @domain.name, :name => workflow_type.name, :version => workflow_type.version } ) if .default_task_list workflow_hash.merge!( :default_task_list => {:name => resolve_default_task_list(.default_task_list)} ) end << workflow_hash end end |
#register ⇒ Object
Registers this workflow with Amazon SWF.
189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/aws/decider/worker.rb', line 189 def register .delete_if {|| [:version].nil?} .each do || begin @service.register_workflow_type() rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e @logger.warn "#{e.class} while trying to register workflow #{e.message} with options #{workflow_type_options}" # Purposefully eaten up, the alternative is to check first, and who # wants to do two trips when one will do? end end end |
#run_once(should_register = false, poller = nil) ⇒ Object
Starts the workflow and runs it once, with an optional AWS::Flow::WorkflowTaskPoller.
241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/aws/decider/worker.rb', line 241 def run_once(should_register = false, poller = nil) register if should_register poller = WorkflowTaskPoller.new( @service, @domain, DecisionTaskHandler.new(@workflow_definition_map, ), @task_list, ) if poller.nil? Kernel.exit if @shutting_down poller.poll_and_process_single_task end |
#set_workflow_implementation_types(workflow_implementation_types) ⇒ Object
142 143 144 |
# File 'lib/aws/decider/worker.rb', line 142 def set_workflow_implementation_types(workflow_implementation_types) workflow_implementation_types.each {|type| add_workflow_implementation(type)} end |
#start(should_register = true) ⇒ Object
Starts the workflow with a AWS::Flow::WorkflowTaskPoller.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/aws/decider/worker.rb', line 210 def start(should_register = true) # TODO check to make sure that the correct properties are set # TODO Register the domain if not already registered # TODO register types to poll # TODO Set up throttler # TODO Set up a timeout on the throttler correctly, # TODO Make this a generic poller, go to the right kind correctly poller = WorkflowTaskPoller.new( @service, @domain, DecisionTaskHandler.new(@workflow_definition_map, ), @task_list, ) register if should_register @logger.debug "Starting an infinite loop to poll and process workflow tasks." loop do run_once(false, poller) end end |