Class: ScraperUtils::Scheduler::OperationWorker
- Inherits:
-
Object
- Object
- ScraperUtils::Scheduler::OperationWorker
- Defined in:
- lib/scraper_utils/scheduler/operation_worker.rb
Overview
Handles the processing of a registered operation and associated fiber and thread state
Defined Under Namespace
Classes: NotReadyError
Instance Attribute Summary collapse
-
#authority ⇒ Symbol
readonly
The authority name associated with this fiber.
-
#fiber ⇒ Fiber
readonly
The fiber.
-
#request_queue ⇒ Thread::Queue
readonly
The request queue for the thread.
-
#response ⇒ ThreadResponse?
The response to be passed on the next resume.
-
#resume_at ⇒ Time
When the fiber should be delayed till / ready to resume at.
-
#thread ⇒ Thread
readonly
Thread used.
-
#waiting_for_response ⇒ Boolean
readonly
Waiting for a response.
Class Method Summary collapse
Instance Method Summary collapse
-
#alive? ⇒ Boolean
Fiber has not finished running.
-
#can_resume? ⇒ Boolean
Worker has the necessary state to be resumed.
-
#close ⇒ Object
close resources from worker fiber Called by worker fiber just before it exits.
-
#initialize(fiber, authority, response_queue) ⇒ OperationWorker
constructor
Initialize a new Worker Fiber and Thread, called from the main Fiber.
-
#resume ⇒ ThreadRequest?
Resume an operation fiber and queue request if there is any from main fiber.
-
#save_thread_response(response) ⇒ Object
Save thread response from main or worker fiber.
-
#shutdown ⇒ Object
Shutdown worker called from main fiber.
-
#submit_request(request) ⇒ Object
Queue a thread request to be executed from worker fiber otherwise locally if parallel processing is disabled.
Constructor Details
#initialize(fiber, authority, response_queue) ⇒ OperationWorker
Initialize a new Worker Fiber and Thread, called from the main Fiber
The Thread executes ThreadRequest objects from the request_queue and pushes responses to the global response_queue.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 85 def initialize(fiber, , response_queue) raise(ArgumentError, "Fiber and Authority must be provided") unless fiber && validate_fiber(main: true) @fiber = fiber @authority = @response_queue = response_queue @fiber.instance_variable_set(:@operation_worker, self) if response_queue @request_queue = Thread::Queue.new @thread = Thread.new do Thread.current[:current_authority] = while (request = @request_queue&.pop) @response_queue.push request.execute end end end @resume_at = self.class.next_resume_at @waiting_for_response = false # First resume response is ignored @response = true end |
Instance Attribute Details
#authority ⇒ Symbol (readonly)
Returns The authority name associated with this fiber.
17 18 19 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 17 def @authority end |
#fiber ⇒ Fiber (readonly)
Returns The fiber.
14 15 16 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 14 def fiber @fiber end |
#request_queue ⇒ Thread::Queue (readonly)
Returns The request queue for the thread.
32 33 34 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 32 def request_queue @request_queue end |
#response ⇒ ThreadResponse?
Returns The response to be passed on the next resume.
23 24 25 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 23 def response @response end |
#resume_at ⇒ Time
Returns When the fiber should be delayed till / ready to resume at.
20 21 22 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 20 def resume_at @resume_at end |
#thread ⇒ Thread (readonly)
Returns Thread used.
29 30 31 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 29 def thread @thread end |
#waiting_for_response ⇒ Boolean (readonly)
Returns Waiting for a response.
26 27 28 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 26 def waiting_for_response @waiting_for_response end |
Class Method Details
.next_resume_at ⇒ Object
34 35 36 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 34 def self.next_resume_at @next_resume_at = [@next_resume_at, Time.now - 0.001].compact.max + 0.001 end |
Instance Method Details
#alive? ⇒ Boolean
Fiber has not finished running
39 40 41 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 39 def alive? fiber.alive? end |
#can_resume? ⇒ Boolean
Worker has the necessary state to be resumed
44 45 46 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 44 def can_resume? !@response.nil? && !@resume_at.nil? && alive? end |
#close ⇒ Object
close resources from worker fiber Called by worker fiber just before it exits
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 62 def close validate_fiber(main: false) # Signal thread to finish processing, then wait for it @request_queue&.close @thread&.join(60) # drop references for GC @request_queue = nil @thread = nil # make can_resume? false clear_resume_state end |
#resume ⇒ ThreadRequest?
Resume an operation fiber and queue request if there is any from main fiber
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 111 def resume raise ClosedQueueError unless alive? raise NotReadyError, "Cannot resume #{} without response!" unless @response validate_fiber(main: true) request = @fiber.resume(@response) # submit the next request for processing submit_request(request) if request request end |
#save_thread_response(response) ⇒ Object
Save thread response from main or worker fiber
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 49 def save_thread_response(response) raise "#{} Wasn't waiting for response! Got: #{response.inspect}" unless @waiting_for_response @response = response @waiting_for_response = false @resume_at = [response&.delay_till, Time.now].compact.max if DebugUtils.basic? log "Received #{response&.class&.name || 'nil response'} from thread for fiber #{} in #{response&.time_taken&.round(3)}s" end response end |
#shutdown ⇒ Object
Shutdown worker called from main fiber
123 124 125 126 127 128 129 130 131 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 123 def shutdown validate_fiber(main: true) clear_resume_state if @fiber&.alive? # Trigger fiber to raise an error and thus call deregister @fiber.resume(nil) end end |
#submit_request(request) ⇒ Object
Queue a thread request to be executed from worker fiber otherwise locally if parallel processing is disabled
Process flow if parallel enabled:
-
This method:
a. pushes request onto local @request_queue
b. calls Fiber.yield(true) so Scheduler can run other fibers
-
Meanwhile, this fibers thread:
a. pops request off queue
b. processes request
c. pushes response to global response queue
-
Meanwhile, Scheduler on Main fiber:
a. pops response from response queue as they arrive
* calls {#save_thread_response} on associated worker to save each response
c. calls {#resume} on worker when it is its' turn (based on resume_at) and it can_resume (has @response)
If parallel processing is not enabled, then the processing occurs in the workers fiber
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 155 def submit_request(request) raise NotReadyError, "Cannot make a second request before the first has responded!" if @waiting_for_response raise ArgumentError, "Must be passed a valid ThreadRequest! Got: #{request.inspect}" unless request.is_a? ThreadRequest validate_fiber(main: false) @response = nil @waiting_for_response = true if @request_queue @request_queue&.push request response = Fiber.yield true raise "Terminated fiber for #{} as requested" unless response else response = save_thread_response request.execute end response end |