Module: ScraperUtils::Scheduler
- Defined in:
- lib/scraper_utils/scheduler.rb,
lib/scraper_utils/scheduler/constants.rb,
lib/scraper_utils/scheduler/thread_request.rb,
lib/scraper_utils/scheduler/process_request.rb,
lib/scraper_utils/scheduler/thread_response.rb,
lib/scraper_utils/scheduler/operation_worker.rb,
lib/scraper_utils/scheduler/operation_registry.rb
Overview
A utility module to coordinate the scheduling of work,
-
interleaving multiple operations (scraping of an authorities site) uses Fibers (cooperative concurrency) so your code and the libraries you call don’t have to be thread safe
-
Performing mechanize Network I/O in parallel using Threads
Process flow
-
operation_workers start with response = true as the first resume passes args to block and response is ignored
-
resumes fiber of operation_worker with the last response when ‘Time.now` >= resume_at
-
worker fiber calls Scheduler.execute_request
-
sets resume_at based on calculated delay and waiting_for_response
-
pushes request onto local request queue if parallel, otherwise executes request immediately in fiber and passes response to save_thread_response
-
fiber yields true to main fiber to indicate it wants to continue after resume_at / response arrives
-
-
one thread for each fiber (if parallel), thread:
-
pops request
-
executes request
-
pushes response onto global response queue (includes response_time)
-
-
main fiber - schedule_all loop
-
pops any responses and calls save_thread_response on operation_worker
-
resumes(true) operation_worker (fiber) when ‘Time.now` >= resume_at and not waiting_for_response
-
-
When worker fiber is finished it returns false to indicate it is finished OR when shutdown is called resume(false) is called to indicate worker fiber should not continue
save_thread_response:
* Updates running average and calculates next_resume_at
fiber aborts processing if 2nd argument is true fiber returns nil when finished
Workers:
-
Push process requests onto individual request queues for their thread to process, and yield(true) to scheduler
when enough
Thread safe Implementation:
-
Uses fibers for each authority with its own mechanize agent so operations don’t need to be thread safe
-
Only Mechanize requests are run in threads in parallel whilst they wait for network response
-
Uses message passing (using Queue’s) to avoid having to share state between threads.
-
Execute request does not return till the response has been received from the thread, so the fiber’s mechanize agent that is shared with the thread isn’t used in multiple threads at once
-
Only one execute request per authority fiber can be in the thread request queue at any one time
Defined Under Namespace
Modules: Constants Classes: OperationRegistry, OperationWorker, ProcessRequest, ThreadRequest, ThreadResponse
Class Attribute Summary collapse
-
.exceptions ⇒ Hash{Symbol => Exception}
readonly
Exceptions by authority.
-
.max_workers ⇒ Integer
Controls whether Mechanize network requests are executed in parallel using threads.
-
.run_timeout ⇒ Integer
Returns the run_operations timeout On timeout a message will be output and the ruby program will exit with exit code 124.
-
.threaded ⇒ Boolean
(also: threaded?)
Controls if network I/O requests will be processed in parallel using threads.
Fiber Api collapse
-
.current_authority ⇒ Symbol?
Gets the authority associated with the current fiber or thread.
-
.execute_request(client, method_name, args) ⇒ Object
Execute Mechanize network request in parallel using the fiber’s thread This allows multiple network I/O requests to be waiting for a response in parallel whilst responses that have arrived can be processed by their fibers.
Class Method Summary collapse
-
.register_operation(authority) { ... } ⇒ Object
Registers a block to scrape for a specific authority.
-
.reset! ⇒ Object
Resets the scheduler state.
-
.run_operations ⇒ Hash
Run all registered operations until completion.
Class Attribute Details
.exceptions ⇒ Hash{Symbol => Exception} (readonly)
Returns exceptions by authority.
75 76 77 |
# File 'lib/scraper_utils/scheduler.rb', line 75 def exceptions @exceptions end |
.max_workers ⇒ Integer
Controls whether Mechanize network requests are executed in parallel using threads
72 73 74 |
# File 'lib/scraper_utils/scheduler.rb', line 72 def max_workers @max_workers end |
.run_timeout ⇒ Integer
Returns the run_operations timeout On timeout a message will be output and the ruby program will exit with exit code 124.
81 82 83 |
# File 'lib/scraper_utils/scheduler.rb', line 81 def run_timeout @run_timeout end |
.threaded ⇒ Boolean Also known as: threaded?
Defaults to true unless the MORPH_DISABLE_THREADS ENV variable is set
Controls if network I/O requests will be processed in parallel using threads
64 65 66 |
# File 'lib/scraper_utils/scheduler.rb', line 64 def threaded @threaded end |
Class Method Details
.current_authority ⇒ Symbol?
Gets the authority associated with the current fiber or thread
211 212 213 |
# File 'lib/scraper_utils/scheduler.rb', line 211 def self. current_operation&. end |
.execute_request(client, method_name, args) ⇒ Object
Execute Mechanize network request in parallel using the fiber’s thread This allows multiple network I/O requests to be waiting for a response in parallel whilst responses that have arrived can be processed by their fibers.
194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/scraper_utils/scheduler.rb', line 194 def self.execute_request(client, method_name, args) operation = current_operation # execute immediately if not in a worker fiber return client.send(method_name, args) unless operation request = Scheduler::ProcessRequest.new(operation., client, method_name, args) log "Submitting request #{request.inspect}" if DebugUtils.basic? response = operation.submit_request(request) unless response.is_a?(ThreadResponse) raise "Expected ThreadResponse, got: #{response.inspect}" end response.result! end |
.register_operation(authority) { ... } ⇒ Object
Registers a block to scrape for a specific authority
Block yields(:delay) when operation.resume_at is in the future, and returns :finished when finished
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/scraper_utils/scheduler.rb', line 116 def self.register_operation(, &block) fiber = Fiber.new do |continue| begin raise "Terminated fiber for #{authority} before block run" unless continue block.call rescue StandardError => e # Store exception against the authority exceptions[] = e ensure # Clean up when done regardless of success/failure operation_registry&.deregister end # no further requests nil end operation = operation_registry&.register(fiber, ) if DebugUtils.basic? LogUtils.log "Registered #{authority} operation with fiber: #{fiber.object_id} for interleaving" end if operation_registry&.size >= @max_workers LogUtils.log "Running batch of #{operation_registry&.size} operations immediately" run_operations end # return operation for ease of testing operation end |
.reset! ⇒ Object
Resets the scheduler state. Use before retrying failed authorities.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/scraper_utils/scheduler.rb', line 92 def self.reset! @operation_registry&.shutdown @operation_registry = nil @response_queue.close if @response_queue @threaded = ENV["MORPH_DISABLE_THREADS"].to_s.empty? @max_workers = [1, ENV.fetch('MORPH_MAX_WORKERS', Constants::DEFAULT_MAX_WORKERS).to_i].max @exceptions = {} @totals = Hash.new { 0 } @initial_resume_at = Time.now @response_queue = Thread::Queue.new if self.threaded? @operation_registry = OperationRegistry.new @reset = true @run_timeout = ENV.fetch('MORPH_RUN_TIMEOUT', Constants::DEFAULT_TIMEOUT).to_i nil end |
.run_operations ⇒ Hash
Run all registered operations until completion
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/scraper_utils/scheduler.rb', line 149 def self.run_operations monitor_run_time = Thread.new do sleep run_timeout desc = "#{(run_timeout / 3600.0).round(1)} hours" desc = "#{(run_timeout / 60.0).round(1)} minutes" if run_timeout < 100 * 60 desc = "#{run_timeout} seconds" if run_timeout < 100 LogUtils.log "ERROR: Script exceeded maximum allowed runtime of #{desc}!\n" \ "Forcibly terminating process!" Process.exit!(124) end count = operation_registry&.size # Main scheduling loop - process till there is nothing left to do until @operation_registry.empty? save_thread_responses resume_next_operation end report_summary(count) exceptions ensure # Kill the monitoring thread if we finish normally monitor_run_time.kill if monitor_run_time.alive? monitor_run_time.join(2) end |