Class: Dataflow::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/dataflow/executor.rb

Class Method Summary collapse

Class Method Details

.await_execution_completion(node, completion_queue, expected_completion_count) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/dataflow/executor.rb', line 79

def await_execution_completion(node, completion_queue, expected_completion_count)
  completed_message_indexes = []
  unblock = Queue.new

  consumer = completion_queue.subscribe do |_delivery_info, _properties, payload|
    data = JSON.parse(payload)
    unblock.enq(data['error']) if data['error'].present?

    # Support adding the data to the compute's data_node is the
    # remote process returns anything.
    node.data_node&.add(records: data['data']) if data['data'].present?

    completed_message_indexes << data['msg_id']
    if completed_message_indexes.count == expected_completion_count
      unblock.enq(false)
    end
  end

  error_data = unblock.deq
  consumer.cancel

  error_data
end

.execute(node) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/dataflow/executor.rb', line 9

def execute(node)
  case node.execution_model
  when :remote
    execute_remote_computation(node: node, is_batch_execution: false)
  when :remote_batch
    execute_remote_computation(node: node, is_batch_execution: true)
  when :local
    node.execute_local_computation
  else
    raise ArgumentError, "Unknown execution model #{execution_model}"
  end
end

.execute_remote_computation(node:, is_batch_execution:) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/dataflow/executor.rb', line 22

def execute_remote_computation(node:, is_batch_execution:)
  execution_uuid = node.execution_uuid
  raise ArgumentError, "Expected execution uuid to be set on '#{node.name}' (##{node._id})" unless execution_uuid.present?

  logger.log("Started processing '#{node.name}'")
  conn, channel, completion_queue = open_communication_channel
  logger.log("Opened a completion queue for '#{node.name}': #{completion_queue.name}")

  messages = send_execution_messages(channel, node, is_batch_execution, completion_queue.name)
  error_data = await_execution_completion(node, completion_queue, messages.count)
  logger.log("Finished processing '#{node.name}'")

  raise Errors::RemoteExecutionError.new(error_data['message'], error_data['backtrace']) if error_data
ensure
  conn&.close
end

.loggerObject



103
104
105
# File 'lib/dataflow/executor.rb', line 103

def logger
  @logger ||= Dataflow::Logger.new(prefix: 'Executor')
end

.make_execution_params(node, is_batch_execution, completion_queue_name) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/dataflow/executor.rb', line 60

def make_execution_params(node, is_batch_execution, completion_queue_name)
  execution_params = if is_batch_execution
                       node.make_batch_params
                     else
                       [{}]
                     end

  execution_params.each_with_index.map do |params, idx|
    {
      msg_id: idx,
      node_id: node._id.to_s,
      is_batch: is_batch_execution,
      params: params,
      execution_uuid: node.execution_uuid.to_s,
      completion_queue_name: completion_queue_name
    }
  end
end

.open_communication_channelObject



39
40
41
42
43
44
45
46
47
# File 'lib/dataflow/executor.rb', line 39

def open_communication_channel
  conn = Bunny.new(ENV['MOJACO_RABBITMQ_URI'])
  conn.start

  ch = conn.create_channel
  completion_queue = ch.queue('', exclusive: true)

  [conn, ch, completion_queue]
end

.send_execution_messages(channel, node, is_batch_execution, completion_queue_name) ⇒ Object



49
50
51
52
53
54
55
56
57
58
# File 'lib/dataflow/executor.rb', line 49

def send_execution_messages(channel, node, is_batch_execution, completion_queue_name)
  execution_params = make_execution_params(node, is_batch_execution, completion_queue_name)

  execution_queue = channel.queue(node.execution_queue)
  execution_params.each do |exec_params|
    execution_queue.publish(exec_params.to_json)
  end

  execution_params
end