Class: SWF::DecisionTaskHandler

Inherits:
Object
  • Object
show all
Extended by:
TaskHandler
Defined in:
lib/swf/decision_task_handler.rb

Overview

subclass must call .register(name, version), and define #handle(runner, task)

Constant Summary collapse

@@handler_classes_by_name_version =
{}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from TaskHandler

handle

Constructor Details

#initialize(runner, decision_task) ⇒ DecisionTaskHandler

Returns a new instance of DecisionTaskHandler.



34
35
36
37
# File 'lib/swf/decision_task_handler.rb', line 34

def initialize(runner, decision_task)
  @runner = runner
  @decision_task = decision_task
end

Instance Attribute Details

#decision_taskObject (readonly)

Returns the value of attribute decision_task.



32
33
34
# File 'lib/swf/decision_task_handler.rb', line 32

def decision_task
  @decision_task
end

#runnerObject (readonly)

Returns the value of attribute runner.



32
33
34
# File 'lib/swf/decision_task_handler.rb', line 32

def runner
  @runner
end

Class Method Details

.configuration_help_messageObject



27
28
29
30
# File 'lib/swf/decision_task_handler.rb', line 27

def self.configuration_help_message
  "Each decision task handler running on this task list in this domain must know how to handle this workflow_type's name and version.\n" +
  "I only know: #{@@handler_classes_by_name_version.inspect}"
end

.fail!(task, args = {}) ⇒ Object



18
19
20
# File 'lib/swf/decision_task_handler.rb', line 18

def self.fail!(task, args={})
  task.fail_workflow_execution(args)
end

.find_handler_class(task) ⇒ Object



22
23
24
25
# File 'lib/swf/decision_task_handler.rb', line 22

def self.find_handler_class(task)
  type = task.workflow_type
  @@handler_classes_by_name_version[ [type.name, type.version] ]
end

.register(name, version) ⇒ Object

Register statically self (subclass) to handle workflow_type with given name and version



14
15
16
# File 'lib/swf/decision_task_handler.rb', line 14

def self.register(name, version)
  @@handler_classes_by_name_version [ [name.to_s, version.to_s] ] = self
end

Instance Method Details

#_new_events(&block) ⇒ Object



53
54
55
56
57
# File 'lib/swf/decision_task_handler.rb', line 53

def _new_events(&block)
  events.each {|e|
    yield(e) if e.new?
  }
end

#call_handleObject



39
40
41
# File 'lib/swf/decision_task_handler.rb', line 39

def call_handle
  handle
end

#event_input(event) ⇒ Object



99
100
101
# File 'lib/swf/decision_task_handler.rb', line 99

def event_input(event)
  JSON.parse(event.attributes.input)
end

#eventsObject



43
44
45
46
47
# File 'lib/swf/decision_task_handler.rb', line 43

def events
  # make events into an array to avoid token timeout issues
  # see https://forums.aws.amazon.com/thread.jspa?threadID=98925
  @events ||= decision_task.events.to_a
end

#new_eventsObject



49
50
51
# File 'lib/swf/decision_task_handler.rb', line 49

def new_events
  enum_for(:_new_events)
end

#slot_timeObject

slot time of 51.2 microseconds is way too little even 1 second still results in collision growing but it doesn’t seem to get to 10 so we’ll leave it at that for now



63
64
65
66
# File 'lib/swf/decision_task_handler.rb', line 63

def slot_time
  #5.12e-5 # you wish
  1
end

#tagsObject

exponential backoff handles rate limiting exceptions when querying tags on a workflow execution.



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/swf/decision_task_handler.rb', line 70

def tags
  runner.tag_lists[decision_task.workflow_execution] ||= begin
    collision = 0
    begin
      decision_task.workflow_execution.tags
    rescue => e
      collision += 1 if collision < 10
      max_slot_delay = 2**collision - 1
      sleep(slot_time * rand(0 .. max_slot_delay))
      retry
    end
  end
end

#workflow_inputObject



95
96
97
# File 'lib/swf/decision_task_handler.rb', line 95

def workflow_input
  @workflow_input ||= JSON.parse(workflow_started_event.attributes.input)
end

#workflow_started_eventObject



85
86
87
88
89
# File 'lib/swf/decision_task_handler.rb', line 85

def workflow_started_event
  @workflow_started_event ||= begin
    events.find {|e| e.event_type == 'WorkflowExecutionStarted' } or raise MissingWorkflowStartedEvent, "Missing WorkflowExecutionStarted event in #{runner}"
  end
end

#workflow_task_listObject



91
92
93
# File 'lib/swf/decision_task_handler.rb', line 91

def workflow_task_list
  @workflow_task_list ||= workflow_started_event.attributes.task_list
end