Module: Kaede::Scheduler

Extended by:
Scheduler
Included in:
Scheduler
Defined in:
lib/kaede/scheduler.rb

Constant Summary collapse

POISON =
Object.new
DBUS_STOP_TIMEOUT =
5

Instance Method Summary collapse

Instance Method Details

#dbus_export_programs(service) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/kaede/scheduler.rb', line 122

def dbus_export_programs(service)
  programs = @db.get_programs(@timerfds.values.map { |_, pid| pid })
  @timerfds.each_value do |tfd, pid|
    _, value = tfd.gettime
    program = programs[pid]
    obj = DBus::Program.new(program, Time.now + value)
    service.export(obj)

    # ruby-dbus doesn't emit properties when Introspect is requested.
    # Kaede manually creates Introspect XML so that `gdbus introspect` outputs properties.
    node = service.get_node(obj.path)
    node.singleton_class.class_eval do
      define_method :to_xml do
        obj.to_xml
      end
    end
  end
end

#epoll_loop(epoll) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/kaede/scheduler.rb', line 90

def epoll_loop(epoll)
  loop do
    epoll.wait do |events, io|
      case io
      when SleepyPenguin::TimerFD
        io.expirations
        _, pid = @timerfds.delete(io.fileno)
        thread = spawn_recorder(pid)
        @recorder_queue.enq(thread)
      when @reload_event
        io.value
        throw :reload
      when @stop_event
        io.value
        $0 = "kaede-scheduler (old #{Time.now.strftime('%F %X')})"
        throw :stop
      else
        abort "Unknown IO: #{io.inspect}"
      end
    end
  end
end

#fire_stopObject



32
33
34
# File 'lib/kaede/scheduler.rb', line 32

def fire_stop
  @stop_event.incr(1)
end

#prepare_epollObject



66
67
68
69
70
71
72
73
74
75
# File 'lib/kaede/scheduler.rb', line 66

def prepare_epoll
  prepare_timerfds
  SleepyPenguin::Epoll.new.tap do |epoll|
    epoll.add(@reload_event, [:IN])
    epoll.add(@stop_event, [:IN])
    @timerfds.each_value do |tfd, _|
      epoll.add(tfd, [:IN])
    end
  end
end

#prepare_eventsObject



27
28
29
30
# File 'lib/kaede/scheduler.rb', line 27

def prepare_events
  @reload_event = SleepyPenguin::EventFD.new(0, :SEMAPHORE)
  @stop_event = SleepyPenguin::EventFD.new(0, :SEMAPHORE)
end

#prepare_timerfdsObject



56
57
58
59
60
61
62
63
64
# File 'lib/kaede/scheduler.rb', line 56

def prepare_timerfds
  @timerfds = {}
  @db.get_jobs.each do |job|
    tfd = SleepyPenguin::TimerFD.new(:REALTIME)
    tfd.settime(:ABSTIME, 0, job[:enqueued_at].to_i)
    @timerfds[tfd.fileno] = [tfd, job[:pid]]
  end
  @timerfds
end

#setup(db) ⇒ Object



14
15
16
17
18
19
20
21
22
23
# File 'lib/kaede/scheduler.rb', line 14

def setup(db)
  @db = db
  prepare_events
  $stdout.sync = true
  $stderr.sync = true
  @recorder_queue = Queue.new
  @recorder_waiter = start_recorder_waiter
  $0 = 'kaede-scheduler'
  puts "Start #{Process.pid}"
end

#spawn_recorder(pid) ⇒ Object



179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/kaede/scheduler.rb', line 179

def spawn_recorder(pid)
  Thread.start do
    begin
      require 'kaede/recorder'
      Recorder.new(Notifier.new).record(@db, pid)
      @db.mark_finished(pid)
    rescue Exception => e
      $stderr.puts "Failed job for #{pid}: #{e.class}: #{e.message}"
      e.backtrace.each do |bt|
        $stderr.puts "  #{bt}"
      end
    end
  end
end

#startObject



46
47
48
49
50
51
52
53
54
# File 'lib/kaede/scheduler.rb', line 46

def start
  catch(:stop) do
    loop do
      start_epoll
    end
  end
  @recorder_queue.enq(POISON)
  @recorder_waiter.join
end

#start_dbusObject



113
114
115
116
117
118
119
120
# File 'lib/kaede/scheduler.rb', line 113

def start_dbus
  bus = ::DBus.system_bus
  service = bus.request_service(DBus::DESTINATION)
  dbus_export_programs(service)
  service.export(DBus::Scheduler.new(@reload_event, @stop_event))

  @dbus_thread = start_dbus_loop(bus)
end

#start_dbus_loop(bus) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/kaede/scheduler.rb', line 141

def start_dbus_loop(bus)
  @dbus_main = DBus::Main.new
  @dbus_main << bus
  Thread.start do
    max_retries = 10
    retries = 0
    begin
      @dbus_main.loop
    rescue ::DBus::Connection::NameRequestError => e
      puts "#{e.class}: #{e.message}"
      if retries < max_retries
        retries += 1
        sleep 1
        retry
      end
    end
  end
end

#start_epollObject



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/kaede/scheduler.rb', line 77

def start_epoll
  epoll = prepare_epoll
  puts "Loaded #{@timerfds.size} schedules"
  start_dbus

  catch(:reload) do
    epoll_loop(epoll)
  end
ensure
  epoll.close
  stop_dbus
end

#start_recorder_waiterObject



36
37
38
39
40
41
42
43
44
# File 'lib/kaede/scheduler.rb', line 36

def start_recorder_waiter
  Thread.start do
    loop do
      recorder_thread = @recorder_queue.deq
      break if recorder_thread.equal?(POISON)
      recorder_thread.join
    end
  end
end

#stop_dbusObject



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/kaede/scheduler.rb', line 161

def stop_dbus
  return unless @dbus_main
  @dbus_main.quit
  begin
    unless @dbus_thread.join(DBUS_STOP_TIMEOUT)
      @dbus_thread.kill
    end
  rescue Exception => e
    $stderr.puts "Exception on DBus thread: #{e.class}: #{e.message}"
    e.backtrace.each do |bt|
      $stderr.puts "  #{bt}"
    end
  end
  @dbus_main = nil
  @dbus_thread = nil
  ::DBus.system_bus.proxy.ReleaseName(DBus::DESTINATION)
end