Class: ScraperUtils::Scheduler::OperationRegistry
- Inherits:
-
Object
- Object
- ScraperUtils::Scheduler::OperationRegistry
- Defined in:
- lib/scraper_utils/scheduler/operation_registry.rb
Overview
Registry of all active OperationWorkers registered to be processed
Instance Method Summary collapse
-
#can_resume ⇒ Array{OperationWorker}
Find operations that can be resumed in resume_at order (may include future resume_at).
-
#cleanup_zombies ⇒ Object
Cleanup dead fibers that haven’t removed themselves so we don’t loop forever.
- #current_authority ⇒ Object
-
#deregister ⇒ Object
Remove yourself from registry, called from fiber.
-
#empty? ⇒ Boolean
Returns true if there are no registered operations.
-
#find(key = nil) ⇒ OperationWorker?
Find OperationWorker.
-
#initialize ⇒ OperationRegistry
constructor
A new instance of OperationRegistry.
-
#process_thread_response(response) ⇒ Object
Save the thread response into the thread and mark that it can continue.
- #register(fiber, authority) ⇒ Object
-
#shutdown ⇒ Object
Removes operations.
-
#size ⇒ Object
Returns number of registered operations.
Constructor Details
#initialize ⇒ OperationRegistry
Returns a new instance of OperationRegistry.
12 13 14 15 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 12 def initialize @operations = {} @fiber_ids = {} end |
Instance Method Details
#can_resume ⇒ Array{OperationWorker}
Find operations that can be resumed in resume_at order (may include future resume_at)
71 72 73 74 75 76 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 71 def can_resume @operations .values .select { |op| op.can_resume? } .sort_by(&:resume_at) end |
#cleanup_zombies ⇒ Object
Cleanup dead fibers that haven’t removed themselves so we don’t loop forever
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 79 def cleanup_zombies dead_operations = @operations.values.reject(&:alive?) dead_operations.each do |operation| LogUtils.log "WARNING: removing dead operation for #{operation.} - it should have cleaned up after itself!" operation.shutdown @operations.delete(operation.) @fiber_ids.delete(operation.fiber.object_id) end end |
#current_authority ⇒ Object
35 36 37 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 35 def find(Fiber.current.object_id)&. end |
#deregister ⇒ Object
Remove yourself from registry, called from fiber
25 26 27 28 29 30 31 32 33 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 25 def deregister operation = find return unless operation operation.close # Remove operation from registry since shutdown has done all it can to shut down the thread and fiber @operations.delete(operation.) @fiber_ids.delete(operation.fiber.object_id) end |
#empty? ⇒ Boolean
Returns true if there are no registered operations
59 60 61 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 59 def empty? @operations.empty? end |
#find(key = nil) ⇒ OperationWorker?
Find OperationWorker
42 43 44 45 46 47 48 49 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 42 def find(key = nil) key ||= Fiber.current.object_id if key.is_a?(Symbol) @operations[key] elsif key.is_a?(Integer) @fiber_ids[key] end end |
#process_thread_response(response) ⇒ Object
Save the thread response into the thread and mark that it can continue
91 92 93 94 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 91 def process_thread_response(response) operation = find(response.) operation&.save_thread_response response end |
#register(fiber, authority) ⇒ Object
17 18 19 20 21 22 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 17 def register(fiber, ) = .to_sym operation = OperationWorker.new(fiber, , @response_queue) @operations[] = operation @fiber_ids[operation.fiber.object_id] = operation end |
#shutdown ⇒ Object
Removes operations
52 53 54 55 56 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 52 def shutdown operations.each do |_key, operation| operation.shutdown end end |
#size ⇒ Object
Returns number of registered operations
64 65 66 |
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 64 def size @operations.size end |