Class: ScraperUtils::Scheduler::OperationWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/scraper_utils/scheduler/operation_worker.rb

Overview

Handles the processing of a registered operation and associated fiber and thread state

Defined Under Namespace

Classes: NotReadyError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(fiber, authority, response_queue) ⇒ OperationWorker

Initialize a new Worker Fiber and Thread, called from the main Fiber

The Thread executes ThreadRequest objects from the request_queue and pushes responses to the global response_queue.

Parameters:

  • fiber (Fiber)

    Fiber to process authority block

  • authority (Symbol)

    Authority label

  • response_queue (Thread::Queue, nil)

    Queue for thread responses if enabled

Raises:

  • (ArgumentError)


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 85

def initialize(fiber, authority, response_queue)
  raise(ArgumentError, "Fiber and Authority must be provided") unless fiber && authority
  validate_fiber(main: true)

  @fiber = fiber
  @authority = authority
  @response_queue = response_queue
  @fiber.instance_variable_set(:@operation_worker, self)
  if response_queue
    @request_queue = Thread::Queue.new
    @thread = Thread.new do
      Thread.current[:current_authority] = authority
      while (request = @request_queue&.pop)
        @response_queue.push request.execute
      end
    end
  end
  @resume_at = self.class.next_resume_at
  @waiting_for_response = false
  # First resume response is ignored
  @response = true
end

Instance Attribute Details

#authoritySymbol (readonly)

Returns The authority name associated with this fiber.

Returns:

  • (Symbol)

    The authority name associated with this fiber



17
18
19
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 17

def authority
  @authority
end

#fiberFiber (readonly)

Returns The fiber.

Returns:

  • (Fiber)

    The fiber



14
15
16
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 14

def fiber
  @fiber
end

#request_queueThread::Queue (readonly)

Returns The request queue for the thread.

Returns:

  • (Thread::Queue)

    The request queue for the thread



32
33
34
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 32

def request_queue
  @request_queue
end

#responseThreadResponse?

Returns The response to be passed on the next resume.

Returns:

  • (ThreadResponse, nil)

    The response to be passed on the next resume



23
24
25
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 23

def response
  @response
end

#resume_atTime

Returns When the fiber should be delayed till / ready to resume at.

Returns:

  • (Time)

    When the fiber should be delayed till / ready to resume at



20
21
22
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 20

def resume_at
  @resume_at
end

#threadThread (readonly)

Returns Thread used.

Returns:

  • (Thread)

    Thread used



29
30
31
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 29

def thread
  @thread
end

#waiting_for_responseBoolean (readonly)

Returns Waiting for a response.

Returns:

  • (Boolean)

    Waiting for a response



26
27
28
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 26

def waiting_for_response
  @waiting_for_response
end

Class Method Details

.next_resume_atObject



34
35
36
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 34

def self.next_resume_at
  @next_resume_at = [@next_resume_at, Time.now - 0.001].compact.max + 0.001
end

Instance Method Details

#alive?Boolean

Fiber has not finished running

Returns:

  • (Boolean)


39
40
41
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 39

def alive?
  fiber.alive?
end

#can_resume?Boolean

Worker has the necessary state to be resumed

Returns:

  • (Boolean)


44
45
46
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 44

def can_resume?
  !@response.nil? && !@resume_at.nil? && alive?
end

#closeObject

close resources from worker fiber Called by worker fiber just before it exits



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 62

def close
  validate_fiber(main: false)
  # Signal thread to finish processing, then wait for it
  @request_queue&.close
  @thread&.join(60)
  # drop references for GC
  @request_queue = nil
  @thread = nil
  # make can_resume? false
  clear_resume_state
end

#resumeThreadRequest?

Resume an operation fiber and queue request if there is any from main fiber

Returns:

  • (ThreadRequest, nil)

    request returned by resume or nil if finished

Raises:

  • (ClosedQueueError)


111
112
113
114
115
116
117
118
119
120
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 111

def resume
  raise ClosedQueueError unless alive?
  raise NotReadyError, "Cannot resume #{authority} without response!" unless @response
  validate_fiber(main: true)

  request = @fiber.resume(@response)
  # submit the next request for processing
  submit_request(request) if request
  request
end

#save_thread_response(response) ⇒ Object

Save thread response from main or worker fiber



49
50
51
52
53
54
55
56
57
58
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 49

def save_thread_response(response)
  raise "#{authority} Wasn't waiting for response! Got: #{response.inspect}" unless @waiting_for_response
  @response = response
  @waiting_for_response = false
  @resume_at = [response&.delay_till, Time.now].compact.max
  if DebugUtils.basic?
    log "Received #{response&.class&.name || 'nil response'} from thread for fiber #{authority} in #{response&.time_taken&.round(3)}s"
  end
  response
end

#shutdownObject

Shutdown worker called from main fiber



123
124
125
126
127
128
129
130
131
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 123

def shutdown
  validate_fiber(main: true)

  clear_resume_state
  if @fiber&.alive?
    # Trigger fiber to raise an error and thus call deregister
    @fiber.resume(nil)
  end
end

#submit_request(request) ⇒ Object

Queue a thread request to be executed from worker fiber otherwise locally if parallel processing is disabled

Process flow if parallel enabled:

  1. This method:

a. pushes request onto local @request_queue
b. calls Fiber.yield(true) so Scheduler can run other fibers
  1. Meanwhile, this fibers thread:

a. pops request off queue
b. processes request
c. pushes response to global response queue
  1. Meanwhile, Scheduler on Main fiber:

a. pops response from response queue as they arrive
  * calls {#save_thread_response} on associated worker to save each response
c. calls {#resume} on worker when it is its' turn (based on resume_at) and it can_resume (has @response)

If parallel processing is not enabled, then the processing occurs in the workers fiber

Parameters:

  • request (ThreadRequest)

    The request to be processed in thread

Raises:



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/scraper_utils/scheduler/operation_worker.rb', line 155

def submit_request(request)
  raise NotReadyError, "Cannot make a second request before the first has responded!" if @waiting_for_response
  raise ArgumentError, "Must be passed a valid ThreadRequest! Got: #{request.inspect}" unless request.is_a? ThreadRequest
  validate_fiber(main: false)

  @response = nil
  @waiting_for_response = true
  if @request_queue
    @request_queue&.push request
    response = Fiber.yield true
    raise "Terminated fiber for #{authority} as requested" unless response
  else
    response = save_thread_response request.execute
  end
  response
end