Class: Dataflow::Executor
- Inherits:
-
Object
- Object
- Dataflow::Executor
- Defined in:
- lib/dataflow/executor.rb
Class Method Summary collapse
- .await_execution_completion(node, completion_queue, expected_completion_count) ⇒ Object
- .execute(node) ⇒ Object
- .execute_remote_computation(node:, is_batch_execution:) ⇒ Object
- .logger ⇒ Object
- .make_execution_params(node, is_batch_execution, completion_queue_name) ⇒ Object
- .open_communication_channel ⇒ Object
- .send_execution_messages(channel, node, is_batch_execution, completion_queue_name) ⇒ Object
Class Method Details
.await_execution_completion(node, completion_queue, expected_completion_count) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/dataflow/executor.rb', line 79 def await_execution_completion(node, completion_queue, expected_completion_count) = [] unblock = Queue.new consumer = completion_queue.subscribe do |_delivery_info, _properties, payload| data = JSON.parse(payload) unblock.enq(data['error']) if data['error'].present? # Support adding the data to the compute's data_node is the # remote process returns anything. node.data_node&.add(records: data['data']) if data['data'].present? << data['msg_id'] if .count == expected_completion_count unblock.enq(false) end end error_data = unblock.deq consumer.cancel error_data end |
.execute(node) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/dataflow/executor.rb', line 9 def execute(node) case node.execution_model when :remote execute_remote_computation(node: node, is_batch_execution: false) when :remote_batch execute_remote_computation(node: node, is_batch_execution: true) when :local node.execute_local_computation else raise ArgumentError, "Unknown execution model #{execution_model}" end end |
.execute_remote_computation(node:, is_batch_execution:) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/dataflow/executor.rb', line 22 def execute_remote_computation(node:, is_batch_execution:) execution_uuid = node.execution_uuid raise ArgumentError, "Expected execution uuid to be set on '#{node.name}' (##{node._id})" unless execution_uuid.present? logger.log("Started processing '#{node.name}'") conn, channel, completion_queue = open_communication_channel logger.log("Opened a completion queue for '#{node.name}': #{completion_queue.name}") = (channel, node, is_batch_execution, completion_queue.name) error_data = await_execution_completion(node, completion_queue, .count) logger.log("Finished processing '#{node.name}'") raise Errors::RemoteExecutionError.new(error_data['message'], error_data['backtrace']) if error_data ensure conn&.close end |
.logger ⇒ Object
103 104 105 |
# File 'lib/dataflow/executor.rb', line 103 def logger @logger ||= Dataflow::Logger.new(prefix: 'Executor') end |
.make_execution_params(node, is_batch_execution, completion_queue_name) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/dataflow/executor.rb', line 60 def make_execution_params(node, is_batch_execution, completion_queue_name) execution_params = if is_batch_execution node.make_batch_params else [{}] end execution_params.each_with_index.map do |params, idx| { msg_id: idx, node_id: node._id.to_s, is_batch: is_batch_execution, params: params, execution_uuid: node.execution_uuid.to_s, completion_queue_name: completion_queue_name } end end |
.open_communication_channel ⇒ Object
39 40 41 42 43 44 45 46 47 |
# File 'lib/dataflow/executor.rb', line 39 def open_communication_channel conn = Bunny.new(ENV['MOJACO_RABBITMQ_URI']) conn.start ch = conn.create_channel completion_queue = ch.queue('', exclusive: true) [conn, ch, completion_queue] end |
.send_execution_messages(channel, node, is_batch_execution, completion_queue_name) ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/dataflow/executor.rb', line 49 def (channel, node, is_batch_execution, completion_queue_name) execution_params = make_execution_params(node, is_batch_execution, completion_queue_name) execution_queue = channel.queue(node.execution_queue) execution_params.each do |exec_params| execution_queue.publish(exec_params.to_json) end execution_params end |