Class: Workling::Invokers::ThreadedPoller

Inherits:
Base
  • Object
show all
Defined in:
lib/workling/invokers/threaded_poller.rb

Instance Attribute Summary

Attributes inherited from Base

#reset_time, #sleep_time

Instance Method Summary collapse

Methods inherited from Base

#logger, #run

Constructor Details

#initialize(routing, client_class) ⇒ ThreadedPoller

Returns a new instance of ThreadedPoller.

[View source]

12
13
14
15
16
17
18
19
20
# File 'lib/workling/invokers/threaded_poller.rb', line 12

def initialize(routing, client_class)
  super

  ThreadedPoller.sleep_time = Workling.config[:sleep_time] || 2
  ThreadedPoller.reset_time = Workling.config[:reset_time] || 30

  @workers = ThreadGroup.new
  @mutex = Mutex.new
end

Instance Method Details

#clazz_listen(clazz) ⇒ Object

Listen for one worker class

[View source]

63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/workling/invokers/threaded_poller.rb', line 63

def clazz_listen(clazz)
  logger.debug("Listener thread #{clazz.name} started")

  # Read thread configuration if available
  if Workling.config.has_key?(:listeners)
    if Workling.config[:listeners].has_key?(clazz.to_s)
      config = Workling.config[:listeners][clazz.to_s].symbolize_keys
      thread_sleep_time = config[:sleep_time] if config.has_key?(:sleep_time)
      Thread.current.priority = config[:priority] if config.has_key?(:priority)
    end
  end

  thread_sleep_time ||= self.class.sleep_time

  # Setup connection to client (one per thread)
  connection = @client_class.new
  connection.connect
  logger.info("** Starting client #{ connection.class } for #{clazz.name} queue")

  # Start dispatching those messages
  while (!Thread.current[:shutdown]) do
    begin

      # Thanks for this Brent! 
      #
      #     ...Just a heads up, due to how rails’ MySQL adapter handles this  
      #     call ‘ActiveRecord::Base.connection.active?’, you’ll need 
      #     to wrap the code that checks for a connection in in a mutex.
      #
      #     ....I noticed this while working with a multi-core machine that 
      #     was spawning multiple workling threads. Some of my workling 
      #     threads would hit serious issues at this block of code without 
      #     the mutex.            
      #
      if defined?(ActiveRecord::Base)
        @mutex.synchronize do 
          unless ActiveRecord::Base.connection.active?  # Keep MySQL connection alive
            unless ActiveRecord::Base.connection.reconnect!
              logger.fatal("Failed - Database not available!")
              break
            end
          end
        end
      end

      # Dispatch and process the messages
      n = dispatch!(connection, clazz)
      logger.debug("Listener thread #{clazz.name} processed #{n.to_s} queue items") if n > 0
      sleep(thread_sleep_time) unless n > 0

    # If there is a memcache error, hang for a bit to give it a chance to fire up again
    # and reset the connection.
    rescue Workling::WorklingConnectionError
      logger.warn("Listener thread #{clazz.name} failed to connect. Resetting connection.")
      sleep(self.class.reset_time)
      connection.reset
    end
  end

  logger.debug("Listener thread #{clazz.name} ended")
end

#dispatch!(connection, clazz) ⇒ Object

Dispatcher for one worker class. Will throw MemCacheError if unable to connect. Returns the number of worker methods called

[View source]

127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/workling/invokers/threaded_poller.rb', line 127

def dispatch!(connection, clazz)
  n = 0
  for queue in @routing.queue_names_routing_class(clazz)
    begin
      result = connection.retrieve(queue)
      if result
        n += 1
        handler = @routing[queue]
        method_name = @routing.method_name(queue)
        logger.debug("Calling #{handler.class.to_s}\##{method_name}(#{result.inspect})")
        handler.dispatch_to_worker_method(method_name, result)
      end
    rescue Workling::WorklingError => e
      logger.error("FAILED to connect with queue #{ queue }: #{ e } }")
      raise e
    end
  end

  return n
end

#listenObject

[View source]

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/workling/invokers/threaded_poller.rb', line 22

def listen
  # Create a thread for each worker.
  Workling::Discovery.discovered_workers.each do |clazz|
    logger.debug("Discovered listener #{clazz}")
    @workers.add(Thread.new(clazz) { |c| clazz_listen(c) })
  end

  # Wait for all workers to complete
  @workers.list.each { |t| t.join }

  logger.debug("Reaped listener threads. ")

  # Clean up all the connections.
  if defined?(ActiveRecord::Base)
    ActiveRecord::Base.verify_active_connections!
  end

  logger.debug("Cleaned up connection: out!")
end

#started?Boolean

Check if all Worker threads have been started.

Returns:

  • (Boolean)
[View source]

43
44
45
46
# File 'lib/workling/invokers/threaded_poller.rb', line 43

def started?
  logger.debug("checking if started... list size is #{ worker_threads }")
  Workling::Discovery.discovered_workers.size == worker_threads
end

#stopObject

Gracefully stop processing

[View source]

54
55
56
57
58
59
60
# File 'lib/workling/invokers/threaded_poller.rb', line 54

def stop
  logger.info("stopping threaded poller...")
  sleep 1 until started? # give it a chance to start up before shutting down. 
  logger.info("Giving Listener Threads a chance to shut down. This may take a while... ")
  @workers.list.each { |w| w[:shutdown] = true }
  logger.info("Listener threads were shut down.  ")
end

#worker_threadsObject

number of worker threads running

[View source]

49
50
51
# File 'lib/workling/invokers/threaded_poller.rb', line 49

def worker_threads
  @workers.list.size
end