Class: ScraperUtils::Scheduler::OperationRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/scraper_utils/scheduler/operation_registry.rb

Overview

Registry of all active OperationWorkers registered to be processed

Instance Method Summary collapse

Constructor Details

#initializeOperationRegistry

Returns a new instance of OperationRegistry.



12
13
14
15
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 12

def initialize
  @operations = {}
  @fiber_ids = {}
end

Instance Method Details

#can_resumeArray{OperationWorker}

Find operations that can be resumed in resume_at order (may include future resume_at)

Returns:

  • (Array{OperationWorker})

    Operations that are alive and have a response to use with resume



71
72
73
74
75
76
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 71

def can_resume
  @operations
    .values
    .select { |op| op.can_resume? }
    .sort_by(&:resume_at)
end

#cleanup_zombiesObject

Cleanup dead fibers that haven’t removed themselves so we don’t loop forever



79
80
81
82
83
84
85
86
87
88
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 79

def cleanup_zombies
  dead_operations = @operations.values.reject(&:alive?)

  dead_operations.each do |operation|
    LogUtils.log "WARNING: removing dead operation for #{operation.authority} - it should have cleaned up after itself!"
    operation.shutdown
    @operations.delete(operation.authority)
    @fiber_ids.delete(operation.fiber.object_id)
  end
end

#current_authorityObject



35
36
37
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 35

def current_authority
  find(Fiber.current.object_id)&.authority
end

#deregisterObject

Remove yourself from registry, called from fiber



25
26
27
28
29
30
31
32
33
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 25

def deregister
  operation = find
  return unless operation

  operation.close
  # Remove operation from registry since shutdown has done all it can to shut down the thread and fiber
  @operations.delete(operation.authority)
  @fiber_ids.delete(operation.fiber.object_id)
end

#empty?Boolean

Returns true if there are no registered operations

Returns:

  • (Boolean)


59
60
61
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 59

def empty?
  @operations.empty?
end

#find(key = nil) ⇒ OperationWorker?

Find OperationWorker

Parameters:

  • key (Integer, String, nil) (defaults to: nil)

    Fiber’s object_id or authority (default current fiber’s object_id)

Returns:



42
43
44
45
46
47
48
49
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 42

def find(key = nil)
  key ||= Fiber.current.object_id
  if key.is_a?(Symbol)
    @operations[key]
  elsif key.is_a?(Integer)
    @fiber_ids[key]
  end
end

#process_thread_response(response) ⇒ Object

Save the thread response into the thread and mark that it can continue



91
92
93
94
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 91

def process_thread_response(response)
  operation = find(response.authority)
  operation&.save_thread_response response
end

#register(fiber, authority) ⇒ Object



17
18
19
20
21
22
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 17

def register(fiber, authority)
  authority = authority.to_sym
  operation = OperationWorker.new(fiber, authority, @response_queue)
  @operations[authority] = operation
  @fiber_ids[operation.fiber.object_id] = operation
end

#shutdownObject

Removes operations



52
53
54
55
56
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 52

def shutdown
  operations.each do |_key, operation|
    operation.shutdown
  end
end

#sizeObject

Returns number of registered operations



64
65
66
# File 'lib/scraper_utils/scheduler/operation_registry.rb', line 64

def size
  @operations.size
end