Class: Opener::Daemons::Worker
- Inherits:
-
Oni::Worker
- Object
- Oni::Worker
- Opener::Daemons::Worker
- Includes:
- NewRelic::Agent::Instrumentation::ControllerInstrumentation, NewRelic::Agent::MethodTracer
- Defined in:
- lib/opener/daemons/worker.rb
Overview
Downlods a KAF document, passes it to a component and submits the output to a callback URL or a default queue. Each Worker instance runs in an isolated thread
Constant Summary collapse
- INLINE_IO =
!!ENV['INLINE_IO']
Instance Attribute Summary collapse
- #callback_handler ⇒ Opener::CallbackHandler readonly
- #config ⇒ Opener::Daemons::Configuration readonly
- #downloader ⇒ Opener::Daemons::Downloader readonly
- #uploader ⇒ Opener::Daemons::Uploader readonly
Instance Method Summary collapse
-
#handle_unsupported_language ⇒ Object
Sends the unsupported input to the last callback URL.
-
#initialize(config) ⇒ Worker
constructor
A new instance of Worker.
-
#process ⇒ Object
Processes a document.
- #process_input ⇒ Object
- #process_output ⇒ Aws::S3::Object
- #run_component ⇒ String
-
#submit_callbacks ⇒ Object
Sends the object’s URL to the next callback URL.
Constructor Details
#initialize(config) ⇒ Worker
Returns a new instance of Worker.
31 32 33 34 35 36 37 38 |
# File 'lib/opener/daemons/worker.rb', line 31 def initialize(config) @config = config @downloader = Downloader.new @uploader = Uploader.new @callback_handler = CallbackHandler.new @input = nil @output = nil end |
Instance Attribute Details
#callback_handler ⇒ Opener::CallbackHandler (readonly)
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/opener/daemons/worker.rb', line 20 class Worker < Oni::Worker attr_reader :config, :uploader, :downloader, :callback_handler INLINE_IO = !!ENV['INLINE_IO'] include NewRelic::Agent::Instrumentation::ControllerInstrumentation include NewRelic::Agent::MethodTracer ## # @param [Opener::Daemons::Configuration] config # def initialize(config) @config = config @downloader = Downloader.new @uploader = Uploader.new @callback_handler = CallbackHandler.new @input = nil @output = nil end ## # Processes a document. # # @raise [Oni::WrappedError] # def process add_transaction_attributes begin process_input run_component process_output submit_callbacks # Unsupported languages are handled in a different manner as they can # occur quite often. In these cases we _do_ want the data to be sent # to the final callback URL (skipping whatever comes before it) so it # can act upon it. rescue Core::UnsupportedLanguageError handle_unsupported_language end end ## # def process_input if config.input @input = Zlib.gunzip Base64.decode64 config.input @input.force_encoding 'UTF-8' else @input = downloader.download config.input_url end end ## # @return [String] # def run_component @output = config.component_instance.run @input, config.['custom_config'] end ## # @param [String] output # @return [Aws::S3::Object] # def process_output if INLINE_IO @next_input = Base64.encode64 Zlib.gzip @output else @object = uploader.upload config.identifier, @output, config. end end ## # Sends the object's URL to the next callback URL. # # @param [Aws::S3::Object] object # def submit_callbacks urls = config.callbacks.dup next_url = urls.shift callback_handler.post next_url, next_input_params.merge( identifier: config.identifier, callbacks: urls, metadata: config., ) Core::Syslog.info("Submitted response to #{next_url}", config.) end ## # Sends the unsupported input to the last callback URL. # def handle_unsupported_language last_url = config.callbacks.last callback_handler.post last_url, input_params.merge( identifier: config.identifier, metadata: config., ) Core::Syslog.info( "Submitted input with an unsupported language to #{last_url}", config. ) end private ## # Preserve input for last callback # def input_params if config.input_url {input_url: config.input_url} else {input: config.input} end end ## # Use generated output as new input # def next_input_params if INLINE_IO {input: @next_input} else {input_url: @object.public_url.to_s} end end def add_transaction_attributes Transaction.current.add_parameters( input_url: config.input_url, identifier: config.identifier, callbacks: config.callbacks, metadata: config., ) end if Daemons.newrelic? add_transaction_tracer :process, category: :task add_method_tracer :run_component add_method_tracer :process_output add_method_tracer :submit_callbacks end end |
#config ⇒ Opener::Daemons::Configuration (readonly)
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/opener/daemons/worker.rb', line 20 class Worker < Oni::Worker attr_reader :config, :uploader, :downloader, :callback_handler INLINE_IO = !!ENV['INLINE_IO'] include NewRelic::Agent::Instrumentation::ControllerInstrumentation include NewRelic::Agent::MethodTracer ## # @param [Opener::Daemons::Configuration] config # def initialize(config) @config = config @downloader = Downloader.new @uploader = Uploader.new @callback_handler = CallbackHandler.new @input = nil @output = nil end ## # Processes a document. # # @raise [Oni::WrappedError] # def process add_transaction_attributes begin process_input run_component process_output submit_callbacks # Unsupported languages are handled in a different manner as they can # occur quite often. In these cases we _do_ want the data to be sent # to the final callback URL (skipping whatever comes before it) so it # can act upon it. rescue Core::UnsupportedLanguageError handle_unsupported_language end end ## # def process_input if config.input @input = Zlib.gunzip Base64.decode64 config.input @input.force_encoding 'UTF-8' else @input = downloader.download config.input_url end end ## # @return [String] # def run_component @output = config.component_instance.run @input, config.['custom_config'] end ## # @param [String] output # @return [Aws::S3::Object] # def process_output if INLINE_IO @next_input = Base64.encode64 Zlib.gzip @output else @object = uploader.upload config.identifier, @output, config. end end ## # Sends the object's URL to the next callback URL. # # @param [Aws::S3::Object] object # def submit_callbacks urls = config.callbacks.dup next_url = urls.shift callback_handler.post next_url, next_input_params.merge( identifier: config.identifier, callbacks: urls, metadata: config., ) Core::Syslog.info("Submitted response to #{next_url}", config.) end ## # Sends the unsupported input to the last callback URL. # def handle_unsupported_language last_url = config.callbacks.last callback_handler.post last_url, input_params.merge( identifier: config.identifier, metadata: config., ) Core::Syslog.info( "Submitted input with an unsupported language to #{last_url}", config. ) end private ## # Preserve input for last callback # def input_params if config.input_url {input_url: config.input_url} else {input: config.input} end end ## # Use generated output as new input # def next_input_params if INLINE_IO {input: @next_input} else {input_url: @object.public_url.to_s} end end def add_transaction_attributes Transaction.current.add_parameters( input_url: config.input_url, identifier: config.identifier, callbacks: config.callbacks, metadata: config., ) end if Daemons.newrelic? add_transaction_tracer :process, category: :task add_method_tracer :run_component add_method_tracer :process_output add_method_tracer :submit_callbacks end end |
#downloader ⇒ Opener::Daemons::Downloader (readonly)
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/opener/daemons/worker.rb', line 20 class Worker < Oni::Worker attr_reader :config, :uploader, :downloader, :callback_handler INLINE_IO = !!ENV['INLINE_IO'] include NewRelic::Agent::Instrumentation::ControllerInstrumentation include NewRelic::Agent::MethodTracer ## # @param [Opener::Daemons::Configuration] config # def initialize(config) @config = config @downloader = Downloader.new @uploader = Uploader.new @callback_handler = CallbackHandler.new @input = nil @output = nil end ## # Processes a document. # # @raise [Oni::WrappedError] # def process add_transaction_attributes begin process_input run_component process_output submit_callbacks # Unsupported languages are handled in a different manner as they can # occur quite often. In these cases we _do_ want the data to be sent # to the final callback URL (skipping whatever comes before it) so it # can act upon it. rescue Core::UnsupportedLanguageError handle_unsupported_language end end ## # def process_input if config.input @input = Zlib.gunzip Base64.decode64 config.input @input.force_encoding 'UTF-8' else @input = downloader.download config.input_url end end ## # @return [String] # def run_component @output = config.component_instance.run @input, config.['custom_config'] end ## # @param [String] output # @return [Aws::S3::Object] # def process_output if INLINE_IO @next_input = Base64.encode64 Zlib.gzip @output else @object = uploader.upload config.identifier, @output, config. end end ## # Sends the object's URL to the next callback URL. # # @param [Aws::S3::Object] object # def submit_callbacks urls = config.callbacks.dup next_url = urls.shift callback_handler.post next_url, next_input_params.merge( identifier: config.identifier, callbacks: urls, metadata: config., ) Core::Syslog.info("Submitted response to #{next_url}", config.) end ## # Sends the unsupported input to the last callback URL. # def handle_unsupported_language last_url = config.callbacks.last callback_handler.post last_url, input_params.merge( identifier: config.identifier, metadata: config., ) Core::Syslog.info( "Submitted input with an unsupported language to #{last_url}", config. ) end private ## # Preserve input for last callback # def input_params if config.input_url {input_url: config.input_url} else {input: config.input} end end ## # Use generated output as new input # def next_input_params if INLINE_IO {input: @next_input} else {input_url: @object.public_url.to_s} end end def add_transaction_attributes Transaction.current.add_parameters( input_url: config.input_url, identifier: config.identifier, callbacks: config.callbacks, metadata: config., ) end if Daemons.newrelic? add_transaction_tracer :process, category: :task add_method_tracer :run_component add_method_tracer :process_output add_method_tracer :submit_callbacks end end |
#uploader ⇒ Opener::Daemons::Uploader (readonly)
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/opener/daemons/worker.rb', line 20 class Worker < Oni::Worker attr_reader :config, :uploader, :downloader, :callback_handler INLINE_IO = !!ENV['INLINE_IO'] include NewRelic::Agent::Instrumentation::ControllerInstrumentation include NewRelic::Agent::MethodTracer ## # @param [Opener::Daemons::Configuration] config # def initialize(config) @config = config @downloader = Downloader.new @uploader = Uploader.new @callback_handler = CallbackHandler.new @input = nil @output = nil end ## # Processes a document. # # @raise [Oni::WrappedError] # def process add_transaction_attributes begin process_input run_component process_output submit_callbacks # Unsupported languages are handled in a different manner as they can # occur quite often. In these cases we _do_ want the data to be sent # to the final callback URL (skipping whatever comes before it) so it # can act upon it. rescue Core::UnsupportedLanguageError handle_unsupported_language end end ## # def process_input if config.input @input = Zlib.gunzip Base64.decode64 config.input @input.force_encoding 'UTF-8' else @input = downloader.download config.input_url end end ## # @return [String] # def run_component @output = config.component_instance.run @input, config.['custom_config'] end ## # @param [String] output # @return [Aws::S3::Object] # def process_output if INLINE_IO @next_input = Base64.encode64 Zlib.gzip @output else @object = uploader.upload config.identifier, @output, config. end end ## # Sends the object's URL to the next callback URL. # # @param [Aws::S3::Object] object # def submit_callbacks urls = config.callbacks.dup next_url = urls.shift callback_handler.post next_url, next_input_params.merge( identifier: config.identifier, callbacks: urls, metadata: config., ) Core::Syslog.info("Submitted response to #{next_url}", config.) end ## # Sends the unsupported input to the last callback URL. # def handle_unsupported_language last_url = config.callbacks.last callback_handler.post last_url, input_params.merge( identifier: config.identifier, metadata: config., ) Core::Syslog.info( "Submitted input with an unsupported language to #{last_url}", config. ) end private ## # Preserve input for last callback # def input_params if config.input_url {input_url: config.input_url} else {input: config.input} end end ## # Use generated output as new input # def next_input_params if INLINE_IO {input: @next_input} else {input_url: @object.public_url.to_s} end end def add_transaction_attributes Transaction.current.add_parameters( input_url: config.input_url, identifier: config.identifier, callbacks: config.callbacks, metadata: config., ) end if Daemons.newrelic? add_transaction_tracer :process, category: :task add_method_tracer :run_component add_method_tracer :process_output add_method_tracer :submit_callbacks end end |
Instance Method Details
#handle_unsupported_language ⇒ Object
Sends the unsupported input to the last callback URL.
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/opener/daemons/worker.rb', line 114 def handle_unsupported_language last_url = config.callbacks.last callback_handler.post last_url, input_params.merge( identifier: config.identifier, metadata: config., ) Core::Syslog.info( "Submitted input with an unsupported language to #{last_url}", config. ) end |
#process ⇒ Object
Processes a document.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/opener/daemons/worker.rb', line 45 def process add_transaction_attributes begin process_input run_component process_output submit_callbacks # Unsupported languages are handled in a different manner as they can # occur quite often. In these cases we _do_ want the data to be sent # to the final callback URL (skipping whatever comes before it) so it # can act upon it. rescue Core::UnsupportedLanguageError handle_unsupported_language end end |
#process_input ⇒ Object
65 66 67 68 69 70 71 72 |
# File 'lib/opener/daemons/worker.rb', line 65 def process_input if config.input @input = Zlib.gunzip Base64.decode64 config.input @input.force_encoding 'UTF-8' else @input = downloader.download config.input_url end end |
#process_output ⇒ Aws::S3::Object
85 86 87 88 89 90 91 |
# File 'lib/opener/daemons/worker.rb', line 85 def process_output if INLINE_IO @next_input = Base64.encode64 Zlib.gzip @output else @object = uploader.upload config.identifier, @output, config. end end |
#run_component ⇒ String
77 78 79 |
# File 'lib/opener/daemons/worker.rb', line 77 def run_component @output = config.component_instance.run @input, config.['custom_config'] end |
#submit_callbacks ⇒ Object
Sends the object’s URL to the next callback URL.
98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/opener/daemons/worker.rb', line 98 def submit_callbacks urls = config.callbacks.dup next_url = urls.shift callback_handler.post next_url, next_input_params.merge( identifier: config.identifier, callbacks: urls, metadata: config., ) Core::Syslog.info("Submitted response to #{next_url}", config.) end |