Class: Trailblazer::Macro::Each
- Defined in:
- lib/trailblazer/macro/each.rb
Defined Under Namespace
Modules: Transitive
Constant Summary collapse
- ITERATION_OUTPUT_PIPE =
This is basically Out() => all mutable variables
Activity::DSL::Linear::VariableMapping::DSL.pipe_for_composable_output()
- ITERATION_INPUT_PIPE =
and this In() => everything
Activity::DSL::Linear::VariableMapping::DSL.pipe_for_composable_input()
Class Method Summary collapse
-
.call(ctx, flow_options, runner:, **circuit_options) ⇒ Object
DISCUSS: do we need start_task?.
-
.compute_runtime_id(ctx, trace_node:, activity:, compile_id:) ⇒ Object
Gets included in Debugger’s Normalizer.
-
.to_h ⇒ Object
call.
Methods inherited from Strategy
Class Method Details
.call(ctx, flow_options, runner:, **circuit_options) ⇒ Object
DISCUSS: do we need start_task?
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/trailblazer/macro/each.rb', line 13 def self.call((ctx, ), runner:, **) # DISCUSS: do we need {start_task}? dataset = ctx.fetch(:dataset) signal = @state.get(:success_signal) item_key = @state.get(:item_key) failing_semantic = @state.get(:failing_semantic) activity = @state.get(:activity) # I'd like to use {collect} but we can't {break} without losing the last iteration's result. dataset.each_with_index do |element, index| # This new {inner_ctx} will be disposed of after invoking the item activity. inner_ctx = ctx.merge( item_key => element, # defaults to {:item} :index => index, ) # TODO: test aliasing wrap_ctx, _ = ITERATION_INPUT_PIPE.({aggregate: {}, original_ctx: inner_ctx}, [[ctx, ], ]) inner_ctx = wrap_ctx[:input_ctx] # using this runner will make it look as if block_activity is being run consequetively within {Each.iterate} as if they were steps # Use TaskWrap::Runner to run the each block. This doesn't create the container_activity # and literally simply invokes {block_activity.call}, which will set its own {wrap_static}. signal, (returned_ctx, ) = runner.( block_activity, [inner_ctx, ], runner: runner, **, activity: activity, ) # {returned_ctx} at this point has Each(..., In => Out =>) applied! # Without configuration, this means {returned_ctx} is empty. # DISCUSS: this is what usually happens in Out(). # merge all mutable parts into the original_ctx. wrap_ctx, _ = ITERATION_OUTPUT_PIPE.({returned_ctx: returned_ctx, aggregate: {}, original_ctx: ctx}, []) ctx = wrap_ctx[:aggregate] # Break the loop if {block_activity} emits failure signal break if failing_semantic.include?(signal.to_h[:semantic]) # TODO: use generic check from older macro end return signal, [ctx, ] end |
.compute_runtime_id(ctx, trace_node:, activity:, compile_id:) ⇒ Object
Gets included in Debugger’s Normalizer. Results in IDs like invoke_block_activityinvoke_block_activity.1.
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/trailblazer/macro/each.rb', line 69 def self.compute_runtime_id(ctx, trace_node:, activity:, compile_id:, **) # activity is the host activity return compile_id unless activity.to_h[:config][:each] == true # Developer::Trace::Snapshot::Ctx.ctx_snapshot_for(trace_node.snapshot_before, .data # FIXME: BETTER API, we need access to stack now # index = trace_node.snapshot_before.data[:ctx_snapshot].fetch(:index) index = trace_node.snapshot_before.data[:ctx_variable_changeset].find { |name, version, value| name == :index }[2] ctx[:runtime_id] = "#{compile_id}.#{index}" end |
.to_h ⇒ Object
call
57 58 59 60 61 |
# File 'lib/trailblazer/macro/each.rb', line 57 def self.to_h container_activity = @state.get(:activity) # FIXME: this is needed for a proper {find_path} introspect lookup. container_activity.to_h.merge(activity: container_activity) end |