Class: Opener::Daemons::Worker

Inherits:
Oni::Worker
  • Object
show all
Includes:
NewRelic::Agent::Instrumentation::ControllerInstrumentation, NewRelic::Agent::MethodTracer
Defined in:
lib/opener/daemons/worker.rb

Overview

Downlods a KAF document, passes it to a component and submits the output to a callback URL or a default queue. Each Worker instance runs in an isolated thread

Constant Summary collapse

INLINE_IO =
!!ENV['INLINE_IO']

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Worker

Returns a new instance of Worker.

Parameters:



31
32
33
34
35
36
37
38
# File 'lib/opener/daemons/worker.rb', line 31

def initialize(config)
  @config           = config
  @downloader       = Downloader.new
  @uploader         = Uploader.new
  @callback_handler = CallbackHandler.new
  @input            = nil
  @output           = nil
end

Instance Attribute Details

#callback_handlerOpener::CallbackHandler (readonly)

Returns:

  • (Opener::CallbackHandler)


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/opener/daemons/worker.rb', line 20

class Worker < Oni::Worker
  attr_reader :config, :uploader, :downloader, :callback_handler

  INLINE_IO = !!ENV['INLINE_IO']

  include NewRelic::Agent::Instrumentation::ControllerInstrumentation
  include NewRelic::Agent::MethodTracer

  ##
  # @param [Opener::Daemons::Configuration] config
  #
  def initialize(config)
    @config           = config
    @downloader       = Downloader.new
    @uploader         = Uploader.new
    @callback_handler = CallbackHandler.new
    @input            = nil
    @output           = nil
  end

  ##
  # Processes a document.
  #
  # @raise [Oni::WrappedError]
  #
  def process
    add_transaction_attributes

    begin
      process_input
      run_component
      process_output
      submit_callbacks

    # Unsupported languages are handled in a different manner as they can
    # occur quite often. In these cases we _do_ want the data to be sent
    # to the final callback URL (skipping whatever comes before it) so it
    # can act upon it.
    rescue Core::UnsupportedLanguageError
      handle_unsupported_language
    end
  end

  ##
  #
  def process_input
    if config.input
      @input = Zlib.gunzip Base64.decode64 config.input
      @input.force_encoding 'UTF-8'
    else
      @input = downloader.download config.input_url
    end
  end

  ##
  # @return [String]
  #
  def run_component
    @output = config.component_instance.run @input, config.['custom_config']
  end

  ##
  # @param [String] output
  # @return [Aws::S3::Object]
  #
  def process_output
    if INLINE_IO
      @next_input = Base64.encode64 Zlib.gzip @output
    else
      @object = uploader.upload config.identifier, @output, config.
    end
  end

  ##
  # Sends the object's URL to the next callback URL.
  #
  # @param [Aws::S3::Object] object
  #
  def submit_callbacks
    urls     = config.callbacks.dup
    next_url = urls.shift

    callback_handler.post next_url, next_input_params.merge(
      identifier: config.identifier,
      callbacks:  urls,
      metadata:   config.,
    )

    Core::Syslog.info("Submitted response to #{next_url}", config.)
  end

  ##
  # Sends the unsupported input to the last callback URL.
  #
  def handle_unsupported_language
    last_url = config.callbacks.last

    callback_handler.post last_url, input_params.merge(
      identifier: config.identifier,
      metadata:   config.,
    )

    Core::Syslog.info(
      "Submitted input with an unsupported language to #{last_url}",
      config.
    )
  end

  private

  ##
  # Preserve input for last callback
  #
  def input_params
    if config.input_url
      {input_url: config.input_url}
    else
      {input:     config.input}
    end
  end

  ##
  # Use generated output as new input
  #
  def next_input_params
    if INLINE_IO
      {input:     @next_input}
    else
      {input_url: @object.public_url.to_s}
    end
  end

  def add_transaction_attributes
    Transaction.current.add_parameters(
      input_url:  config.input_url,
      identifier: config.identifier,
      callbacks:  config.callbacks,
      metadata:   config.,
    )
  end

  if Daemons.newrelic?
    add_transaction_tracer :process, category: :task

    add_method_tracer :run_component
    add_method_tracer :process_output
    add_method_tracer :submit_callbacks
  end
end

#configOpener::Daemons::Configuration (readonly)



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/opener/daemons/worker.rb', line 20

class Worker < Oni::Worker
  attr_reader :config, :uploader, :downloader, :callback_handler

  INLINE_IO = !!ENV['INLINE_IO']

  include NewRelic::Agent::Instrumentation::ControllerInstrumentation
  include NewRelic::Agent::MethodTracer

  ##
  # @param [Opener::Daemons::Configuration] config
  #
  def initialize(config)
    @config           = config
    @downloader       = Downloader.new
    @uploader         = Uploader.new
    @callback_handler = CallbackHandler.new
    @input            = nil
    @output           = nil
  end

  ##
  # Processes a document.
  #
  # @raise [Oni::WrappedError]
  #
  def process
    add_transaction_attributes

    begin
      process_input
      run_component
      process_output
      submit_callbacks

    # Unsupported languages are handled in a different manner as they can
    # occur quite often. In these cases we _do_ want the data to be sent
    # to the final callback URL (skipping whatever comes before it) so it
    # can act upon it.
    rescue Core::UnsupportedLanguageError
      handle_unsupported_language
    end
  end

  ##
  #
  def process_input
    if config.input
      @input = Zlib.gunzip Base64.decode64 config.input
      @input.force_encoding 'UTF-8'
    else
      @input = downloader.download config.input_url
    end
  end

  ##
  # @return [String]
  #
  def run_component
    @output = config.component_instance.run @input, config.['custom_config']
  end

  ##
  # @param [String] output
  # @return [Aws::S3::Object]
  #
  def process_output
    if INLINE_IO
      @next_input = Base64.encode64 Zlib.gzip @output
    else
      @object = uploader.upload config.identifier, @output, config.
    end
  end

  ##
  # Sends the object's URL to the next callback URL.
  #
  # @param [Aws::S3::Object] object
  #
  def submit_callbacks
    urls     = config.callbacks.dup
    next_url = urls.shift

    callback_handler.post next_url, next_input_params.merge(
      identifier: config.identifier,
      callbacks:  urls,
      metadata:   config.,
    )

    Core::Syslog.info("Submitted response to #{next_url}", config.)
  end

  ##
  # Sends the unsupported input to the last callback URL.
  #
  def handle_unsupported_language
    last_url = config.callbacks.last

    callback_handler.post last_url, input_params.merge(
      identifier: config.identifier,
      metadata:   config.,
    )

    Core::Syslog.info(
      "Submitted input with an unsupported language to #{last_url}",
      config.
    )
  end

  private

  ##
  # Preserve input for last callback
  #
  def input_params
    if config.input_url
      {input_url: config.input_url}
    else
      {input:     config.input}
    end
  end

  ##
  # Use generated output as new input
  #
  def next_input_params
    if INLINE_IO
      {input:     @next_input}
    else
      {input_url: @object.public_url.to_s}
    end
  end

  def add_transaction_attributes
    Transaction.current.add_parameters(
      input_url:  config.input_url,
      identifier: config.identifier,
      callbacks:  config.callbacks,
      metadata:   config.,
    )
  end

  if Daemons.newrelic?
    add_transaction_tracer :process, category: :task

    add_method_tracer :run_component
    add_method_tracer :process_output
    add_method_tracer :submit_callbacks
  end
end

#downloaderOpener::Daemons::Downloader (readonly)



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/opener/daemons/worker.rb', line 20

class Worker < Oni::Worker
  attr_reader :config, :uploader, :downloader, :callback_handler

  INLINE_IO = !!ENV['INLINE_IO']

  include NewRelic::Agent::Instrumentation::ControllerInstrumentation
  include NewRelic::Agent::MethodTracer

  ##
  # @param [Opener::Daemons::Configuration] config
  #
  def initialize(config)
    @config           = config
    @downloader       = Downloader.new
    @uploader         = Uploader.new
    @callback_handler = CallbackHandler.new
    @input            = nil
    @output           = nil
  end

  ##
  # Processes a document.
  #
  # @raise [Oni::WrappedError]
  #
  def process
    add_transaction_attributes

    begin
      process_input
      run_component
      process_output
      submit_callbacks

    # Unsupported languages are handled in a different manner as they can
    # occur quite often. In these cases we _do_ want the data to be sent
    # to the final callback URL (skipping whatever comes before it) so it
    # can act upon it.
    rescue Core::UnsupportedLanguageError
      handle_unsupported_language
    end
  end

  ##
  #
  def process_input
    if config.input
      @input = Zlib.gunzip Base64.decode64 config.input
      @input.force_encoding 'UTF-8'
    else
      @input = downloader.download config.input_url
    end
  end

  ##
  # @return [String]
  #
  def run_component
    @output = config.component_instance.run @input, config.['custom_config']
  end

  ##
  # @param [String] output
  # @return [Aws::S3::Object]
  #
  def process_output
    if INLINE_IO
      @next_input = Base64.encode64 Zlib.gzip @output
    else
      @object = uploader.upload config.identifier, @output, config.
    end
  end

  ##
  # Sends the object's URL to the next callback URL.
  #
  # @param [Aws::S3::Object] object
  #
  def submit_callbacks
    urls     = config.callbacks.dup
    next_url = urls.shift

    callback_handler.post next_url, next_input_params.merge(
      identifier: config.identifier,
      callbacks:  urls,
      metadata:   config.,
    )

    Core::Syslog.info("Submitted response to #{next_url}", config.)
  end

  ##
  # Sends the unsupported input to the last callback URL.
  #
  def handle_unsupported_language
    last_url = config.callbacks.last

    callback_handler.post last_url, input_params.merge(
      identifier: config.identifier,
      metadata:   config.,
    )

    Core::Syslog.info(
      "Submitted input with an unsupported language to #{last_url}",
      config.
    )
  end

  private

  ##
  # Preserve input for last callback
  #
  def input_params
    if config.input_url
      {input_url: config.input_url}
    else
      {input:     config.input}
    end
  end

  ##
  # Use generated output as new input
  #
  def next_input_params
    if INLINE_IO
      {input:     @next_input}
    else
      {input_url: @object.public_url.to_s}
    end
  end

  def add_transaction_attributes
    Transaction.current.add_parameters(
      input_url:  config.input_url,
      identifier: config.identifier,
      callbacks:  config.callbacks,
      metadata:   config.,
    )
  end

  if Daemons.newrelic?
    add_transaction_tracer :process, category: :task

    add_method_tracer :run_component
    add_method_tracer :process_output
    add_method_tracer :submit_callbacks
  end
end

#uploaderOpener::Daemons::Uploader (readonly)



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/opener/daemons/worker.rb', line 20

class Worker < Oni::Worker
  attr_reader :config, :uploader, :downloader, :callback_handler

  INLINE_IO = !!ENV['INLINE_IO']

  include NewRelic::Agent::Instrumentation::ControllerInstrumentation
  include NewRelic::Agent::MethodTracer

  ##
  # @param [Opener::Daemons::Configuration] config
  #
  def initialize(config)
    @config           = config
    @downloader       = Downloader.new
    @uploader         = Uploader.new
    @callback_handler = CallbackHandler.new
    @input            = nil
    @output           = nil
  end

  ##
  # Processes a document.
  #
  # @raise [Oni::WrappedError]
  #
  def process
    add_transaction_attributes

    begin
      process_input
      run_component
      process_output
      submit_callbacks

    # Unsupported languages are handled in a different manner as they can
    # occur quite often. In these cases we _do_ want the data to be sent
    # to the final callback URL (skipping whatever comes before it) so it
    # can act upon it.
    rescue Core::UnsupportedLanguageError
      handle_unsupported_language
    end
  end

  ##
  #
  def process_input
    if config.input
      @input = Zlib.gunzip Base64.decode64 config.input
      @input.force_encoding 'UTF-8'
    else
      @input = downloader.download config.input_url
    end
  end

  ##
  # @return [String]
  #
  def run_component
    @output = config.component_instance.run @input, config.['custom_config']
  end

  ##
  # @param [String] output
  # @return [Aws::S3::Object]
  #
  def process_output
    if INLINE_IO
      @next_input = Base64.encode64 Zlib.gzip @output
    else
      @object = uploader.upload config.identifier, @output, config.
    end
  end

  ##
  # Sends the object's URL to the next callback URL.
  #
  # @param [Aws::S3::Object] object
  #
  def submit_callbacks
    urls     = config.callbacks.dup
    next_url = urls.shift

    callback_handler.post next_url, next_input_params.merge(
      identifier: config.identifier,
      callbacks:  urls,
      metadata:   config.,
    )

    Core::Syslog.info("Submitted response to #{next_url}", config.)
  end

  ##
  # Sends the unsupported input to the last callback URL.
  #
  def handle_unsupported_language
    last_url = config.callbacks.last

    callback_handler.post last_url, input_params.merge(
      identifier: config.identifier,
      metadata:   config.,
    )

    Core::Syslog.info(
      "Submitted input with an unsupported language to #{last_url}",
      config.
    )
  end

  private

  ##
  # Preserve input for last callback
  #
  def input_params
    if config.input_url
      {input_url: config.input_url}
    else
      {input:     config.input}
    end
  end

  ##
  # Use generated output as new input
  #
  def next_input_params
    if INLINE_IO
      {input:     @next_input}
    else
      {input_url: @object.public_url.to_s}
    end
  end

  def add_transaction_attributes
    Transaction.current.add_parameters(
      input_url:  config.input_url,
      identifier: config.identifier,
      callbacks:  config.callbacks,
      metadata:   config.,
    )
  end

  if Daemons.newrelic?
    add_transaction_tracer :process, category: :task

    add_method_tracer :run_component
    add_method_tracer :process_output
    add_method_tracer :submit_callbacks
  end
end

Instance Method Details

#handle_unsupported_languageObject

Sends the unsupported input to the last callback URL.



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/opener/daemons/worker.rb', line 114

def handle_unsupported_language
  last_url = config.callbacks.last

  callback_handler.post last_url, input_params.merge(
    identifier: config.identifier,
    metadata:   config.,
  )

  Core::Syslog.info(
    "Submitted input with an unsupported language to #{last_url}",
    config.
  )
end

#processObject

Processes a document.

Raises:

  • (Oni::WrappedError)


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/opener/daemons/worker.rb', line 45

def process
  add_transaction_attributes

  begin
    process_input
    run_component
    process_output
    submit_callbacks

  # Unsupported languages are handled in a different manner as they can
  # occur quite often. In these cases we _do_ want the data to be sent
  # to the final callback URL (skipping whatever comes before it) so it
  # can act upon it.
  rescue Core::UnsupportedLanguageError
    handle_unsupported_language
  end
end

#process_inputObject



65
66
67
68
69
70
71
72
# File 'lib/opener/daemons/worker.rb', line 65

def process_input
  if config.input
    @input = Zlib.gunzip Base64.decode64 config.input
    @input.force_encoding 'UTF-8'
  else
    @input = downloader.download config.input_url
  end
end

#process_outputAws::S3::Object

Parameters:

  • output (String)

Returns:

  • (Aws::S3::Object)


85
86
87
88
89
90
91
# File 'lib/opener/daemons/worker.rb', line 85

def process_output
  if INLINE_IO
    @next_input = Base64.encode64 Zlib.gzip @output
  else
    @object = uploader.upload config.identifier, @output, config.
  end
end

#run_componentString

Returns:

  • (String)


77
78
79
# File 'lib/opener/daemons/worker.rb', line 77

def run_component
  @output = config.component_instance.run @input, config.['custom_config']
end

#submit_callbacksObject

Sends the object’s URL to the next callback URL.

Parameters:

  • object (Aws::S3::Object)


98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/opener/daemons/worker.rb', line 98

def submit_callbacks
  urls     = config.callbacks.dup
  next_url = urls.shift

  callback_handler.post next_url, next_input_params.merge(
    identifier: config.identifier,
    callbacks:  urls,
    metadata:   config.,
  )

  Core::Syslog.info("Submitted response to #{next_url}", config.)
end