Class: Dataflow::RemoteWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/dataflow/remote_worker.rb

Class Method Summary collapse

Class Method Details

.execute(node, payload_data) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/dataflow/remote_worker.rb', line 46

def execute(node, payload_data)
  # execute in a different process, so that once it's finished
  # we can purge the memory
  Parallel.map([payload_data]) do |data|
    result = {}
    logger.log("[#{data['msg_id']}] working on '#{node.name}'...")

    begin
      if data['is_batch']
        records = node.execute_local_batch_computation(data['params'])
        # in ruby, we already have access to the node, so we
        # add the data directly here instead of returning it through
        # the queue. The default batch behavior on other languages
        # is to return the output data in the 'data' key, e.g.:
        # result['data] = records
        node.data_node&.add(records: records)
      else
        node.execute_local_computation
      end
    rescue StandardError => e
      result = { error: { message: e.message, backtrace: e.backtrace } }
    end

    logger.log("[#{data['msg_id']}] done working on '#{node.name}'.")
    result
  end
end

.loggerObject



74
75
76
# File 'lib/dataflow/remote_worker.rb', line 74

def logger
  @logger ||= Dataflow::Logger.new(prefix: 'Worker')
end

.process(data) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/dataflow/remote_worker.rb', line 31

def process(data)
  node = Dataflow::Nodes::ComputeNode.find(data['node_id'])

  unless node.execution_valid?(data['execution_uuid'])
    logger.log("[#{data['msg_id']}] work on '#{node.name}' has expired. Skipping.")
    return
  end

  results = execute(node, data)
  response = { msg_id: data['msg_id'] }
  response.merge(results[0])
rescue Mongoid::Errors::DocumentNotFound => e
  { error: { message: e.message, backtrace: e.backtrace } }
end

.work(work_queue_name = 'dataflow.ruby') ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/dataflow/remote_worker.rb', line 8

def work(work_queue_name = 'dataflow.ruby')
  conn = Bunny.new(ENV['MOJACO_RABBITMQ_URI'])
  conn.start

  ch = conn.create_channel
  queue = ch.queue(work_queue_name)
  ch.prefetch(1)

  logger.log("Accepting work on #{work_queue_name}...")

  queue.subscribe(block: true, manual_ack: true) do |delivery_info, _properties, payload|
    data = JSON.parse(payload)
    response = process(data)
    if response.present?
      ch.default_exchange.publish(response.to_json, routing_key: data['completion_queue_name'])
    end
    ch.ack(delivery_info.delivery_tag)
  end
ensure
  conn.close
  logger.log('Connection closed, stopped accepting work.')
end