Class: AWS::Flow::ActivityWorker
- Inherits:
-
GenericWorker
- Object
- GenericWorker
- AWS::Flow::ActivityWorker
- Defined in:
- lib/aws/decider/worker.rb
Overview
Used to implement an activity worker. You can use the ‘ActivityWorker` class to conveniently poll a task list for activity tasks.
You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the WorkflowWorker, which creates a new instance for every decision task, the ‘ActivityWorker` simply uses the object you provided.
Instance Method Summary collapse
-
#add_activities_implementation(class_or_instance) ⇒ Object
Adds an activity implementation to this ‘ActivityWorker`.
-
#add_implementation(class_or_instance) ⇒ Object
Adds an activity implementation to this ‘ActivityWorker`.
-
#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker
constructor
Creates a new ‘ActivityWorker` instance.
-
#register ⇒ Object
Registers the activity type.
-
#run_once(should_register = true, poller = nil) ⇒ Object
Starts the activity that was added to the ‘ActivityWorker` and, optionally, sets the ActivityTaskPoller.
-
#start(should_register = true) ⇒ Object
Starts the activity that was added to the ‘ActivityWorker`.
Methods inherited from GenericWorker
#camel_case_to_snake_case, #resolve_default_task_list
Constructor Details
#initialize(service, domain, task_list, *args, &block) ⇒ ActivityWorker
Creates a new ‘ActivityWorker` instance.
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/aws/decider/worker.rb', line 285 def initialize(service, domain, task_list, *args, &block) @activity_definition_map = {} = [] = Utilities::(WorkerOptions, block) @logger = .logger if @logger ||= Utilities::LogFactory.make_logger(self) .logger ||= @logger if max_workers = .execution_workers if max_workers = 20 if (max_workers.nil? || max_workers.zero?) @executor = ForkingExecutor.new( :max_workers => max_workers, :logger => @logger ) @shutdown_first_time_function = lambda do @executor.shutdown Float::INFINITY Kernel.exit end super(service, domain, task_list, *args) end |
Instance Method Details
#add_activities_implementation(class_or_instance) ⇒ Object
Adds an activity implementation to this ‘ActivityWorker`.
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
# File 'lib/aws/decider/worker.rb', line 354 def add_activities_implementation(class_or_instance) klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance klass.activities.each do |activity_type| # TODO this should assign to an activityImplementation, so that we can # call execute on it later @activity_definition_map[activity_type] = ActivityDefinition.new( instance, activity_type.name.split(".").last, nil, activity_type., activity_type..data_converter ) = activity_type. option_hash = { :domain => @domain.name, :name => activity_type.name.to_s, :version => activity_type.version } option_hash.merge!(.) if .default_task_list option_hash.merge!( :default_task_list => {:name => resolve_default_task_list(.default_task_list)} ) end << option_hash end end |
#add_implementation(class_or_instance) ⇒ Object
Adds an activity implementation to this ‘ActivityWorker`.
312 313 314 |
# File 'lib/aws/decider/worker.rb', line 312 def add_implementation(class_or_instance) add_activities_implementation(class_or_instance) end |
#register ⇒ Object
Registers the activity type.
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/aws/decider/worker.rb', line 318 def register .each do || begin @service.register_activity_type() rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e @logger.warn "#{e.class} while trying to register activity #{e.message} with options #{activity_type_options}" previous_registration = @service.describe_activity_type( :domain => @domain.name, :activity_type => { :name => [:name], :version => [:version] } ) = .select { |key, val| key =~ /default/} previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym} previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)] if previous_registration[:default_task_list] previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name") end registration_difference = .sort.to_a - previous_registration.sort.to_a unless registration_difference.empty? raise "There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}" end # Purposefully eaten up, the alternative is to check first, and who # wants to do two trips when one will do? end end end |
#run_once(should_register = true, poller = nil) ⇒ Object
Starts the activity that was added to the ‘ActivityWorker` and, optionally, sets the AWS::Flow::ActivityTaskPoller.
423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'lib/aws/decider/worker.rb', line 423 def run_once(should_register = true, poller = nil) register if should_register poller = ActivityTaskPoller.new( @service, @domain, @task_list, @activity_definition_map, @executor, ) if poller.nil? Kernel.exit if @shutting_down poller.poll_and_process_single_task(.use_forking) end |
#start(should_register = true) ⇒ Object
Starts the activity that was added to the ‘ActivityWorker`.
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/aws/decider/worker.rb', line 394 def start(should_register = true) register if should_register poller = ActivityTaskPoller.new( @service, @domain, @task_list, @activity_definition_map, @executor, ) @logger.debug "Starting an infinite loop to poll and process activity tasks." loop do run_once(false, poller) end end |