Class: PgVersions::Connection::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_versions/pg_versions.rb

Instance Method Summary collapse

Constructor Details

#initialize(inner, batch_delay) ⇒ Subscription



499
500
501
502
503
504
505
506
# File 'lib/pg_versions/pg_versions.rb', line 499

def initialize(inner, batch_delay)
  @inner = inner
  @batch_delay = batch_delay
  @notifications = Queue.new
  @already_known_versions = Hash.new { |h,k| h[k] = [] }
  @channels = Hash.new(0)
  @first_wait = true
end

Instance Method Details

#bump(*channels, notify: false) ⇒ Object



541
542
543
544
545
546
# File 'lib/pg_versions/pg_versions.rb', line 541

def bump(*channels, notify: false)
  channels = @channels.keys  if channels.size == 0
  versions = @inner.bump(channels)
  update_already_known_versions(versions)  if not notify
  versions
end

#dropObject



587
588
589
590
591
# File 'lib/pg_versions/pg_versions.rb', line 587

def drop
  @notifications << nil
  @inner.unsubscribe(self, @channels.keys)  if @channels.keys.size > 0
  #TODO: what to do if this object gets used after drop?
end

#notify(versions) ⇒ Object



582
583
584
# File 'lib/pg_versions/pg_versions.rb', line 582

def notify(versions)
  @notifications << versions
end

#read(*channels, notify: false) ⇒ Object



533
534
535
536
537
538
# File 'lib/pg_versions/pg_versions.rb', line 533

def read(*channels, notify: false)
  channels = @channels.keys  if channels.size == 0
  versions = @inner.read(channels)
  update_already_known_versions(versions)  if not notify
  versions
end

#subscribe(channels, known: {}) ⇒ Object



509
510
511
512
513
514
515
516
517
518
# File 'lib/pg_versions/pg_versions.rb', line 509

def subscribe(channels, known: {})
  update_already_known_versions(known)
  channels = [channels].flatten
  channels.select! { |channel|
    (@channels[channel] += 1) == 1
  }
  if channels.size > 0
    @inner.subscribe(self, channels)
  end
end

#unsubscribe(*channels) ⇒ Object



521
522
523
524
525
526
527
528
529
530
# File 'lib/pg_versions/pg_versions.rb', line 521

def unsubscribe(*channels)
  channels = [channels].flatten
  channels.select! { |channel|
    @channels[channel] -= 1
    raise "Trying to unsubscribe from channel (%p) more times than it was subscribed to"%[channel]  if @channels[channel] < 0
    @channels.delete(channel)  if @channels[channel] == 0
    not @channels.has_key?(channel)
  }
  @inner.unsubscribe(self, channels)
end

#update_already_known_versions(new_already_known_versions) ⇒ Object



594
595
596
597
598
# File 'lib/pg_versions/pg_versions.rb', line 594

def update_already_known_versions(new_already_known_versions)
  new_already_known_versions.each { |channel, version|
    @already_known_versions[channel] = version  if (version <=> @already_known_versions[channel]) == 1
  }
end

#wait(new_already_known_versions = {}, batch_delay: nil) ⇒ Object

TODO: make this resume-able after forced exception



550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
# File 'lib/pg_versions/pg_versions.rb', line 550

def wait(new_already_known_versions = {}, batch_delay: nil)
  batch_delay = @batch_delay  if batch_delay.nil?
  update_already_known_versions(new_already_known_versions)
  loop {
    events = [@notifications.shift]
    sleep batch_delay  if batch_delay and !@first_wait
    events << @notifications.shift  while not @notifications.empty?
    changed_versions = {}
    events.each { |versions|
      return nil  if not versions #termination
      versions.each { |channel, version|
        if (@already_known_versions[channel] <=> version) == -1
          @already_known_versions[channel] = version
          changed_versions[channel] = version
        end
      }
    }
    if changed_versions.size > 0
      @first_wait = false
      return Notification.new(changed_versions, @already_known_versions.dup)
    end
  }
end