Class: Trailblazer::Macro::Each

Inherits:
Strategy
  • Object
show all
Defined in:
lib/trailblazer/macro/each.rb

Defined Under Namespace

Modules: Transitive

Constant Summary collapse

ITERATION_OUTPUT_PIPE =

This is basically Out() => all mutable variables

Activity::DSL::Linear::VariableMapping::DSL.pipe_for_composable_output()
ITERATION_INPUT_PIPE =

and this In() => everything

Activity::DSL::Linear::VariableMapping::DSL.pipe_for_composable_input()

Class Method Summary collapse

Methods inherited from Strategy

block_activity

Class Method Details

.call(ctx, flow_options, runner:, **circuit_options) ⇒ Object

DISCUSS: do we need start_task?



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/trailblazer/macro/each.rb', line 13

def self.call((ctx, flow_options), runner:, **circuit_options) # DISCUSS: do we need {start_task}?
  dataset           = ctx.fetch(:dataset)
  signal            = @state.get(:success_signal)
  item_key          = @state.get(:item_key)
  failing_semantic  = @state.get(:failing_semantic)
  activity          = @state.get(:activity)

  # I'd like to use {collect} but we can't {break} without losing the last iteration's result.
  dataset.each_with_index do |element, index|
    # This new {inner_ctx} will be disposed of after invoking the item activity.
    inner_ctx = ctx.merge(
      item_key => element, # defaults to {:item}
      :index   => index,
    )

    # TODO: test aliasing
    wrap_ctx, _ = ITERATION_INPUT_PIPE.({aggregate: {}, original_ctx: inner_ctx}, [[ctx, flow_options], circuit_options])
    inner_ctx   = wrap_ctx[:input_ctx]

    # using this runner will make it look as if block_activity is being run consequetively within {Each.iterate} as if they were steps
    # Use TaskWrap::Runner to run the each block. This doesn't create the container_activity
    # and literally simply invokes {block_activity.call}, which will set its own {wrap_static}.
    signal, (returned_ctx, flow_options) = runner.(
      block_activity,
      [inner_ctx, flow_options],
      runner:   runner,
      **circuit_options,
      activity: activity,
    )

    # {returned_ctx} at this point has Each(..., In => Out =>) applied!
    #   Without configuration, this means {returned_ctx} is empty.
    # DISCUSS: this is what usually happens in Out().
    # merge all mutable parts into the original_ctx.
    wrap_ctx, _ = ITERATION_OUTPUT_PIPE.({returned_ctx: returned_ctx, aggregate: {}, original_ctx: ctx}, [])
    ctx         = wrap_ctx[:aggregate]

    # Break the loop if {block_activity} emits failure signal
    break if failing_semantic.include?(signal.to_h[:semantic]) # TODO: use generic check from older macro
  end

  return signal, [ctx, flow_options]
end

.compute_runtime_id(ctx, trace_node:, activity:, compile_id:) ⇒ Object

Gets included in Debugger’s Normalizer. Results in IDs like invoke_block_activityinvoke_block_activity.1.



69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/trailblazer/macro/each.rb', line 69

def self.compute_runtime_id(ctx, trace_node:, activity:, compile_id:, **)
  # activity is the host activity
  return compile_id unless activity.to_h[:config][:each] == true

  # Developer::Trace::Snapshot::Ctx.ctx_snapshot_for(trace_node.snapshot_before, .data
# FIXME: BETTER API, we need access to stack now


  # index = trace_node.snapshot_before.data[:ctx_snapshot].fetch(:index)
  index = trace_node.snapshot_before.data[:ctx_variable_changeset].find { |name, version, value| name == :index }[2]

  ctx[:runtime_id] = "#{compile_id}.#{index}"
end

.to_hObject

call



57
58
59
60
61
# File 'lib/trailblazer/macro/each.rb', line 57

def self.to_h
  container_activity = @state.get(:activity)
  # FIXME: this is needed for a proper {find_path} introspect lookup.
  container_activity.to_h.merge(activity: container_activity)
end