Class: TimeScheduler::EventQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/time_scheduler/event.rb

Constant Summary collapse

REFRESH_TIME =
3600.0

Instance Method Summary collapse

Constructor Details

#initialize(**option) ⇒ EventQueue

Returns a new instance of EventQueue.



28
29
30
31
32
33
# File 'lib/time_scheduler/event.rb', line 28

def initialize( **option )
  @event_mutex  =  ::Mutex.new
  @event_queue  =  ::Queue.new
  @event_set  =  ::SortedSet.new
  run
end

Instance Method Details

#refresh(time) ⇒ Object



50
51
52
53
54
55
56
# File 'lib/time_scheduler/event.rb', line 50

def refresh( time )
  wait_time  =  [time - Time.now, REFRESH_TIME + rand * 2 - 1].min
  Thread.start do
    ::Kernel.sleep( wait_time )
    @event_queue.push( 1 )
  end
end

#reserve(time, queue, ident) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/time_scheduler/event.rb', line 35

def reserve( time, queue, ident )
  @event_mutex.synchronize do
    @event_set.add( EventItem.new( time, queue, ident ) )
    wait_time  =  [time - Time.now, REFRESH_TIME + rand * 2 - 1].min
    if  wait_time <= 0
      @event_queue.push( 1 )
    else
      Thread.start do
        ::Kernel.sleep( wait_time )
        @event_queue.push( 1 )
      end
    end
  end
end

#runObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/time_scheduler/event.rb', line 58

def run
  Thread.start do
    begin
      while  true
        @event_queue.pop
        @event_mutex.synchronize do
          event_item  =  @event_set.first

          now  =  Time.now
          if  event_item.time <= now
            @event_set.delete( event_item )
            event_item.queue.push( [now, event_item.ident] )
          else
            refresh( event_item.time )
          end
        end
      end
    rescue => e
      raise  ::TimeScheduler::Error, "#{__FILE__}: #{e.message}"
    end
  end
end