Module: AWS::Flow::Core

Defined in:
lib/aws/flow/fiber.rb,
lib/aws/flow/tasks.rb,
lib/aws/flow/future.rb,
lib/aws/flow/flow_utils.rb,
lib/aws/flow/simple_dfa.rb,
lib/aws/flow/async_scope.rb,
lib/aws/flow/implementation.rb,
lib/aws/flow/async_backtrace.rb,
lib/aws/flow/begin_rescue_ensure.rb
more...

Defined Under Namespace

Modules: SimpleDFA Classes: AlreadySetException, AsyncBacktrace, AsyncEventLoop, AsyncScope, BeginRescueEnsure, BeginRescueEnsureWrapper, CancellationException, DaemonBeginRescueEnsure, DaemonTask, ExternalTask, ExternalTaskCompletionHandle, FiberConditionVariable, FlowFiber, Future, IllegalStateException, NoContextException, RootAsyncScope, Task, TaskContext

Instance Method Summary collapse

Instance Method Details

#_error_handler(&block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

[View source]

110
111
112
# File 'lib/aws/flow/implementation.rb', line 110

def _error_handler(&block)
  error_handler(&block).result
end

#daemon_task(&block) ⇒ Future

Returns The tasks result, which is a Future.

Parameters:

  • block

    The block of code to be executed when the daemon task is run.

Returns:

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.

[View source]

56
57
58
59
60
61
62
63
64
# File 'lib/aws/flow/implementation.rb', line 56

def daemon_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = DaemonTask.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end

#error_handler(&block) ⇒ Object

Creates a new error handler for asynchronous tasks.

Parameters:

Returns:

  • The result of the ‘begin` statement if there is no error; otherwise the value of the `return` statement.

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.

[View source]

97
98
99
100
101
102
103
104
105
106
# File 'lib/aws/flow/implementation.rb', line 97

def error_handler(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  begin_rescue_ensure = BeginRescueEnsure.new(:parent => context.get_closest_containing_scope)
  bge = BeginRescueEnsureWrapper.new(block, begin_rescue_ensure)
  context << bge
  context << begin_rescue_ensure
  begin_rescue_ensure
end

#external_task(&block) ⇒ nil

Parameters:

  • block

    The block of code to be executed when the external task is run.

Returns:

  • (nil)

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.

[View source]

75
76
77
78
79
80
81
82
83
# File 'lib/aws/flow/implementation.rb', line 75

def external_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = ExternalTask.new(:parent => context.get_closest_containing_scope, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  nil
end

#gate_by_version(version, method, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

[View source]

23
24
25
26
27
# File 'lib/aws/flow/async_scope.rb', line 23

def gate_by_version(version, method, &block)
  if RUBY_VERSION.send(method, version)
    block.call
  end
end

#make_backtrace(parent_backtrace) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

[View source]

33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/aws/flow/flow_utils.rb', line 33

def make_backtrace(parent_backtrace)
  # 1 frame for the function that actually removes the stack traces.
  # 1 frame for the function that calls into the function that removes
  # frames in AsyncBacktrace.
  # 1 frame for the call into this function.
  # 1 frame for the initialize call of the BeginRescueEnsure or ExternalTask.
  # 1 frame for the new call into the BeginRescueEnsure or ExternalTask.
  # 1 frame for the AsyncScope initialize that the BeginRescueEnsure/ExternalTask has to be in.

  # "./lib/aws/rubyflow/asyncBacktrace.rb:75:in `caller'"
  # "./lib/aws/rubyflow/asyncBacktrace.rb:21:in `create'"
  # "./lib/aws/rubyflow/flow.rb:16:in `make_backtrace'"
  # "./lib/aws/rubyflow/flow.rb:103:in `initialize'"
  # "./lib/aws/rubyflow/asyncScope.rb:17:in `new'"
  # "./lib/aws/rubyflow/asyncScope.rb:17:in `initialize'"

  frames_to_skip = 7
  backtrace = AsyncBacktrace.create(parent_backtrace, frames_to_skip)
end

#task(future = nil, &block) ⇒ Future

Returns The tasks result, which is a Future.

Parameters:

  • future (Future) (defaults to: nil)

    Unused; defaults to nil.

  • block

    The block of code to be executed when the task is run.

Returns:

Raises:

  • (NoContextException)

    If the current fiber does not respond to ‘Fiber.__context__`.

[View source]

36
37
38
39
40
41
42
43
44
# File 'lib/aws/flow/implementation.rb', line 36

def task(future = nil, &block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = Task.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end

#wait_for_all(*futures) ⇒ Array<Future>

Blocks until all of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return only when all of them are set.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.

[View source]

162
163
164
# File 'lib/aws/flow/implementation.rb', line 162

def wait_for_all(*futures)
  wait_for_function(lambda {|result, future_list| result.size == future_list.size}, futures)
end

#wait_for_any(*futures) ⇒ Array<Future>

Blocks until any of the arguments are set.

Parameters:

  • futures (Array<Future>)

    A list of futures to wait for. The function will return when at least one of these is set.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.

[View source]

150
151
152
# File 'lib/aws/flow/implementation.rb', line 150

def wait_for_any(*futures)
  wait_for_function(lambda {|result, future_list| result.length >= 1 }, futures)
end

#wait_for_function(function, *futures) ⇒ Array<Future>

Waits for the passed-in function to complete, setting values for the provided futures when it does.

Parameters:

  • function

    The function to wait for.

  • futures (Array<Future>)

    A list of futures to provide values for when the function completes.

Returns:

  • (Array<Future>)

    A list of the set futures, in the order of being set.

[View source]

126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/aws/flow/implementation.rb', line 126

def wait_for_function(function, *futures)
  conditional = FiberConditionVariable.new
  futures.flatten!
  return nil if futures.empty?
  result = futures.select(&:set?)
  return futures.find(&:set?)if function.call(result, futures)
  futures.each do |f|
    f.on_set do |set_one|
      result << set_one
      conditional.broadcast if function.call(result, futures)
    end
  end
  conditional.wait
  result
end