Method: Workling::Invokers::ThreadedPoller#clazz_listen

Defined in:
lib/workling/invokers/threaded_poller.rb

#clazz_listen(clazz) ⇒ Object

Listen for one worker class



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/workling/invokers/threaded_poller.rb', line 63

def clazz_listen(clazz)
  logger.debug("Listener thread #{clazz.name} started")

  # Read thread configuration if available
  if Workling.config.has_key?(:listeners)
    if Workling.config[:listeners].has_key?(clazz.to_s)
      config = Workling.config[:listeners][clazz.to_s].symbolize_keys
      thread_sleep_time = config[:sleep_time] if config.has_key?(:sleep_time)
      Thread.current.priority = config[:priority] if config.has_key?(:priority)
    end
  end

  thread_sleep_time ||= self.class.sleep_time

  # Setup connection to client (one per thread)
  connection = @client_class.new
  connection.connect
  logger.info("** Starting client #{ connection.class } for #{clazz.name} queue")

  # Start dispatching those messages
  while (!Thread.current[:shutdown]) do
    begin

      # Thanks for this Brent! 
      #
      #     ...Just a heads up, due to how rails’ MySQL adapter handles this  
      #     call ‘ActiveRecord::Base.connection.active?’, you’ll need 
      #     to wrap the code that checks for a connection in in a mutex.
      #
      #     ....I noticed this while working with a multi-core machine that 
      #     was spawning multiple workling threads. Some of my workling 
      #     threads would hit serious issues at this block of code without 
      #     the mutex.            
      #
      if defined?(ActiveRecord::Base)
        @mutex.synchronize do 
          unless ActiveRecord::Base.connection.active?  # Keep MySQL connection alive
            unless ActiveRecord::Base.connection.reconnect!
              logger.fatal("Failed - Database not available!")
              break
            end
          end
        end
      end

      # Dispatch and process the messages
      n = dispatch!(connection, clazz)
      logger.debug("Listener thread #{clazz.name} processed #{n.to_s} queue items") if n > 0
      sleep(thread_sleep_time) unless n > 0

    # If there is a memcache error, hang for a bit to give it a chance to fire up again
    # and reset the connection.
    rescue Workling::WorklingConnectionError
      logger.warn("Listener thread #{clazz.name} failed to connect. Resetting connection.")
      sleep(self.class.reset_time)
      connection.reset
    end
  end

  logger.debug("Listener thread #{clazz.name} ended")
end