Module: ScraperUtils::Scheduler

Defined in:
lib/scraper_utils/scheduler.rb,
lib/scraper_utils/scheduler/constants.rb,
lib/scraper_utils/scheduler/thread_request.rb,
lib/scraper_utils/scheduler/process_request.rb,
lib/scraper_utils/scheduler/thread_response.rb,
lib/scraper_utils/scheduler/operation_worker.rb,
lib/scraper_utils/scheduler/operation_registry.rb

Overview

A utility module to coordinate the scheduling of work,

  • interleaving multiple operations (scraping of an authorities site) uses Fibers (cooperative concurrency) so your code and the libraries you call don’t have to be thread safe

  • Performing mechanize Network I/O in parallel using Threads

Process flow

  1. operation_workers start with response = true as the first resume passes args to block and response is ignored

  2. resumes fiber of operation_worker with the last response when ‘Time.now` >= resume_at

  3. worker fiber calls Scheduler.execute_request

    1. sets resume_at based on calculated delay and waiting_for_response

    2. pushes request onto local request queue if parallel, otherwise executes request immediately in fiber and passes response to save_thread_response

    3. fiber yields true to main fiber to indicate it wants to continue after resume_at / response arrives

  4. one thread for each fiber (if parallel), thread:

    1. pops request

    2. executes request

    3. pushes response onto global response queue (includes response_time)

  5. main fiber - schedule_all loop

    1. pops any responses and calls save_thread_response on operation_worker

    2. resumes(true) operation_worker (fiber) when ‘Time.now` >= resume_at and not waiting_for_response

  6. When worker fiber is finished it returns false to indicate it is finished OR when shutdown is called resume(false) is called to indicate worker fiber should not continue

save_thread_response:

* Updates running average and calculates next_resume_at

fiber aborts processing if 2nd argument is true fiber returns nil when finished

Workers:

  • Push process requests onto individual request queues for their thread to process, and yield(true) to scheduler

when enough

Thread safe Implementation:

  • Uses fibers for each authority with its own mechanize agent so operations don’t need to be thread safe

  • Only Mechanize requests are run in threads in parallel whilst they wait for network response

  • Uses message passing (using Queue’s) to avoid having to share state between threads.

  • Execute request does not return till the response has been received from the thread, so the fiber’s mechanize agent that is shared with the thread isn’t used in multiple threads at once

  • Only one execute request per authority fiber can be in the thread request queue at any one time

Defined Under Namespace

Modules: Constants Classes: OperationRegistry, OperationWorker, ProcessRequest, ThreadRequest, ThreadResponse

Class Attribute Summary collapse

Fiber Api collapse

Class Method Summary collapse

Class Attribute Details

.exceptionsHash{Symbol => Exception} (readonly)

Returns exceptions by authority.

Returns:

  • (Hash{Symbol => Exception})

    exceptions by authority



75
76
77
# File 'lib/scraper_utils/scheduler.rb', line 75

def exceptions
  @exceptions
end

.max_workersInteger

Controls whether Mechanize network requests are executed in parallel using threads

Returns:

  • (Integer)

    max concurrent workers using fibers and threads, defaults to MORPH_MAX_WORKERS env variable or 50



72
73
74
# File 'lib/scraper_utils/scheduler.rb', line 72

def max_workers
  @max_workers
end

.run_timeoutInteger

Returns the run_operations timeout On timeout a message will be output and the ruby program will exit with exit code 124.

Returns:

  • (Integer)

    Overall process timeout in seconds (default MORPH_RUN_TIMEOUT ENV value or 6 hours)



81
82
83
# File 'lib/scraper_utils/scheduler.rb', line 81

def run_timeout
  @run_timeout
end

.threadedBoolean Also known as: threaded?

Note:

Defaults to true unless the MORPH_DISABLE_THREADS ENV variable is set

Controls if network I/O requests will be processed in parallel using threads

Returns:

  • (Boolean)

    true if processing network I/O in parallel using threads, otherwise false



64
65
66
# File 'lib/scraper_utils/scheduler.rb', line 64

def threaded
  @threaded
end

Class Method Details

.current_authoritySymbol?

Gets the authority associated with the current fiber or thread

Returns:

  • (Symbol, nil)

    the authority name or nil if not in a fiber



211
212
213
# File 'lib/scraper_utils/scheduler.rb', line 211

def self.current_authority
  current_operation&.authority
end

.execute_request(client, method_name, args) ⇒ Object

Execute Mechanize network request in parallel using the fiber’s thread This allows multiple network I/O requests to be waiting for a response in parallel whilst responses that have arrived can be processed by their fibers.

Examples:

Replace this code in your scraper

page = agent.get(url_period(url, period, webguest))

With this code

page = ScraperUtils::Scheduler.execute_request(agent, :get, [url_period(url, period, webguest)])

Parameters:

  • client (MechanizeClient)

    client to be used to process request

  • method_name (Symbol)

    method to be called on client

  • args (Array)

    Arguments to be used with method call

Returns:

  • (Object)

    response from method call on client



194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/scraper_utils/scheduler.rb', line 194

def self.execute_request(client, method_name, args)
  operation = current_operation
  # execute immediately if not in a worker fiber
  return client.send(method_name, args) unless operation

  request = Scheduler::ProcessRequest.new(operation.authority, client, method_name, args)
  log "Submitting request #{request.inspect}" if DebugUtils.basic?
  response = operation.submit_request(request)
  unless response.is_a?(ThreadResponse)
    raise "Expected ThreadResponse, got: #{response.inspect}"
  end
  response.result!
end

.register_operation(authority) { ... } ⇒ Object

Registers a block to scrape for a specific authority

Block yields(:delay) when operation.resume_at is in the future, and returns :finished when finished

Parameters:

  • authority (Symbol)

    the name of the authority being processed

Yields:

  • to the block containing the scraping operation to be run in the fiber



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/scraper_utils/scheduler.rb', line 116

def self.register_operation(authority, &block)
  fiber = Fiber.new do |continue|
    begin
      raise "Terminated fiber for #{authority} before block run" unless continue

      block.call
    rescue StandardError => e
      # Store exception against the authority
      exceptions[authority] = e
    ensure
      # Clean up when done regardless of success/failure
      operation_registry&.deregister
    end
    # no further requests
    nil
  end

  operation = operation_registry&.register(fiber, authority)

  if DebugUtils.basic?
    LogUtils.log "Registered #{authority} operation with fiber: #{fiber.object_id} for interleaving"
  end
  if operation_registry&.size >= @max_workers
    LogUtils.log "Running batch of #{operation_registry&.size} operations immediately"
    run_operations
  end
  # return operation for ease of testing
  operation
end

.reset!Object

Resets the scheduler state. Use before retrying failed authorities.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/scraper_utils/scheduler.rb', line 92

def self.reset!
  @operation_registry&.shutdown
  @operation_registry = nil
  @response_queue.close if @response_queue
  @threaded = ENV["MORPH_DISABLE_THREADS"].to_s.empty?
  @max_workers = [1, ENV.fetch('MORPH_MAX_WORKERS', Constants::DEFAULT_MAX_WORKERS).to_i].max
  @exceptions = {}
  @totals = Hash.new { 0 }
  @initial_resume_at = Time.now
  @response_queue = Thread::Queue.new if self.threaded?
  @operation_registry = OperationRegistry.new
  @reset = true
  @run_timeout = ENV.fetch('MORPH_RUN_TIMEOUT', Constants::DEFAULT_TIMEOUT).to_i
  nil
end

.run_operationsHash

Run all registered operations until completion

Returns:

  • (Hash)

    Exceptions that occurred during execution



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/scraper_utils/scheduler.rb', line 149

def self.run_operations
  monitor_run_time = Thread.new do
    sleep run_timeout
    desc = "#{(run_timeout / 3600.0).round(1)} hours"
    desc = "#{(run_timeout / 60.0).round(1)} minutes" if run_timeout < 100 * 60
    desc = "#{run_timeout} seconds" if run_timeout < 100
    LogUtils.log "ERROR: Script exceeded maximum allowed runtime of #{desc}!\n" \
                   "Forcibly terminating process!"
    Process.exit!(124)
  end
  count = operation_registry&.size

  # Main scheduling loop - process till there is nothing left to do
  until @operation_registry.empty?
    save_thread_responses
    resume_next_operation
  end

  report_summary(count)

  exceptions
ensure
  # Kill the monitoring thread if we finish normally
  monitor_run_time.kill if monitor_run_time.alive?
  monitor_run_time.join(2)
end