Module: Kaede::Scheduler
Constant Summary collapse
- POISON =
Object.new
- DBUS_STOP_TIMEOUT =
5
Instance Method Summary collapse
- #dbus_export_programs(service) ⇒ Object
- #epoll_loop(epoll) ⇒ Object
- #fire_stop ⇒ Object
- #prepare_epoll ⇒ Object
- #prepare_events ⇒ Object
- #prepare_timerfds ⇒ Object
- #setup(db) ⇒ Object
- #spawn_recorder(pid) ⇒ Object
- #start ⇒ Object
- #start_dbus ⇒ Object
- #start_dbus_loop(bus) ⇒ Object
- #start_epoll ⇒ Object
- #start_recorder_waiter ⇒ Object
- #stop_dbus ⇒ Object
Instance Method Details
#dbus_export_programs(service) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/kaede/scheduler.rb', line 122 def dbus_export_programs(service) programs = @db.get_programs(@timerfds.values.map { |_, pid| pid }) @timerfds.each_value do |tfd, pid| _, value = tfd.gettime program = programs[pid] obj = DBus::Program.new(program, Time.now + value) service.export(obj) # ruby-dbus doesn't emit properties when Introspect is requested. # Kaede manually creates Introspect XML so that `gdbus introspect` outputs properties. node = service.get_node(obj.path) node.singleton_class.class_eval do define_method :to_xml do obj.to_xml end end end end |
#epoll_loop(epoll) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/kaede/scheduler.rb', line 90 def epoll_loop(epoll) loop do epoll.wait do |events, io| case io when SleepyPenguin::TimerFD io.expirations _, pid = @timerfds.delete(io.fileno) thread = spawn_recorder(pid) @recorder_queue.enq(thread) when @reload_event io.value throw :reload when @stop_event io.value $0 = "kaede-scheduler (old #{Time.now.strftime('%F %X')})" throw :stop else abort "Unknown IO: #{io.inspect}" end end end end |
#fire_stop ⇒ Object
32 33 34 |
# File 'lib/kaede/scheduler.rb', line 32 def fire_stop @stop_event.incr(1) end |
#prepare_epoll ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/kaede/scheduler.rb', line 66 def prepare_epoll prepare_timerfds SleepyPenguin::Epoll.new.tap do |epoll| epoll.add(@reload_event, [:IN]) epoll.add(@stop_event, [:IN]) @timerfds.each_value do |tfd, _| epoll.add(tfd, [:IN]) end end end |
#prepare_events ⇒ Object
27 28 29 30 |
# File 'lib/kaede/scheduler.rb', line 27 def prepare_events @reload_event = SleepyPenguin::EventFD.new(0, :SEMAPHORE) @stop_event = SleepyPenguin::EventFD.new(0, :SEMAPHORE) end |
#prepare_timerfds ⇒ Object
56 57 58 59 60 61 62 63 64 |
# File 'lib/kaede/scheduler.rb', line 56 def prepare_timerfds @timerfds = {} @db.get_jobs.each do |job| tfd = SleepyPenguin::TimerFD.new(:REALTIME) tfd.settime(:ABSTIME, 0, job[:enqueued_at].to_i) @timerfds[tfd.fileno] = [tfd, job[:pid]] end @timerfds end |
#setup(db) ⇒ Object
14 15 16 17 18 19 20 21 22 23 |
# File 'lib/kaede/scheduler.rb', line 14 def setup(db) @db = db prepare_events $stdout.sync = true $stderr.sync = true @recorder_queue = Queue.new @recorder_waiter = start_recorder_waiter $0 = 'kaede-scheduler' puts "Start #{Process.pid}" end |
#spawn_recorder(pid) ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/kaede/scheduler.rb', line 179 def spawn_recorder(pid) Thread.start do begin require 'kaede/recorder' Recorder.new(Notifier.new).record(@db, pid) @db.mark_finished(pid) rescue Exception => e $stderr.puts "Failed job for #{pid}: #{e.class}: #{e.message}" e.backtrace.each do |bt| $stderr.puts " #{bt}" end end end end |
#start ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/kaede/scheduler.rb', line 46 def start catch(:stop) do loop do start_epoll end end @recorder_queue.enq(POISON) @recorder_waiter.join end |
#start_dbus ⇒ Object
113 114 115 116 117 118 119 120 |
# File 'lib/kaede/scheduler.rb', line 113 def start_dbus bus = ::DBus.system_bus service = bus.request_service(DBus::DESTINATION) dbus_export_programs(service) service.export(DBus::Scheduler.new(@reload_event, @stop_event)) @dbus_thread = start_dbus_loop(bus) end |
#start_dbus_loop(bus) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/kaede/scheduler.rb', line 141 def start_dbus_loop(bus) @dbus_main = DBus::Main.new @dbus_main << bus Thread.start do max_retries = 10 retries = 0 begin @dbus_main.loop rescue ::DBus::Connection::NameRequestError => e puts "#{e.class}: #{e.message}" if retries < max_retries retries += 1 sleep 1 retry end end end end |
#start_epoll ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/kaede/scheduler.rb', line 77 def start_epoll epoll = prepare_epoll puts "Loaded #{@timerfds.size} schedules" start_dbus catch(:reload) do epoll_loop(epoll) end ensure epoll.close stop_dbus end |
#start_recorder_waiter ⇒ Object
36 37 38 39 40 41 42 43 44 |
# File 'lib/kaede/scheduler.rb', line 36 def start_recorder_waiter Thread.start do loop do recorder_thread = @recorder_queue.deq break if recorder_thread.equal?(POISON) recorder_thread.join end end end |
#stop_dbus ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/kaede/scheduler.rb', line 161 def stop_dbus return unless @dbus_main @dbus_main.quit begin unless @dbus_thread.join(DBUS_STOP_TIMEOUT) @dbus_thread.kill end rescue Exception => e $stderr.puts "Exception on DBus thread: #{e.class}: #{e.message}" e.backtrace.each do |bt| $stderr.puts " #{bt}" end end @dbus_main = nil @dbus_thread = nil ::DBus.system_bus.proxy.ReleaseName(DBus::DESTINATION) end |