Class: Pakyow::Realtime::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/pakyow/realtime/server.rb,
lib/pakyow/realtime/server/adapters/redis.rb,
lib/pakyow/realtime/server/adapters/memory.rb

Defined Under Namespace

Modules: Adapters

Constant Summary collapse

HEARTBEAT_INTERVAL =
3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(adapter = :memory, adapter_config, timeout_config) ⇒ Server

Returns a new instance of Server.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/pakyow/realtime/server.rb', line 18

def initialize(adapter = :memory, adapter_config, timeout_config)
  require "pakyow/realtime/server/adapters/#{adapter}"
  @adapter = Adapters.const_get(adapter.to_s.capitalize).new(self, adapter_config)
  @sockets = Concurrent::Array.new
  @timeout_config = timeout_config
  @executor = Concurrent::SingleThreadExecutor.new(
    auto_terminate: false
  )

  connect
rescue LoadError => e
  Pakyow.logger.error "Failed to load data subscriber store adapter named `#{adapter}'"
  Pakyow.logger.error e.message
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



14
15
16
# File 'lib/pakyow/realtime/server.rb', line 14

def adapter
  @adapter
end

Instance Method Details

#connectObject



40
41
42
43
44
# File 'lib/pakyow/realtime/server.rb', line 40

def connect
  @executor << -> {
    start_heartbeat; @adapter.connect
  }
end

#disconnectObject



46
47
48
49
50
# File 'lib/pakyow/realtime/server.rb', line 46

def disconnect
  @executor << -> {
    stop_heartbeat; @adapter.disconnect
  }
end

#find_socket(id_or_socket) {|socket| ... } ⇒ Object

Yields:

  • (socket)


113
114
115
116
117
118
119
120
121
# File 'lib/pakyow/realtime/server.rb', line 113

def find_socket(id_or_socket)
  socket = if id_or_socket.is_a?(WebSocket)
    id_or_socket
  else
    find_socket_by_id(id_or_socket)
  end

  yield socket if socket
end

#find_socket_by_id(socket_id) ⇒ Object



109
110
111
# File 'lib/pakyow/realtime/server.rb', line 109

def find_socket_by_id(socket_id)
  @sockets.find { |socket| socket.id == socket_id }
end

#find_socket_id(id_or_socket) {|socket_id| ... } ⇒ Object

Yields:

  • (socket_id)


123
124
125
126
127
128
129
130
131
# File 'lib/pakyow/realtime/server.rb', line 123

def find_socket_id(id_or_socket)
  socket_id = if id_or_socket.is_a?(WebSocket)
    id_or_socket.id
  else
    id_or_socket
  end

  yield socket_id if socket_id
end

#shutdownObject



33
34
35
36
37
38
# File 'lib/pakyow/realtime/server.rb', line 33

def shutdown
  disconnect
  @sockets.each(&:shutdown)
  @executor.shutdown
  @executor.wait_for_termination(30)
end

#socket_connect(id_or_socket) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'lib/pakyow/realtime/server.rb', line 52

def socket_connect(id_or_socket)
  @executor << -> {
    find_socket(id_or_socket) do |socket|
      @sockets << socket
      @adapter.persist(socket.id)
      @adapter.current!(socket.id, socket.object_id)
    end
  }
end

#socket_disconnect(id_or_socket) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pakyow/realtime/server.rb', line 62

def socket_disconnect(id_or_socket)
  @executor << -> {
    find_socket(id_or_socket) do |socket|
      @sockets.delete(socket)

      # If this isn't the current instance for the socket id, it means that a
      # reconnect probably happened and the new socket connected before we
      # knew that the old one disconnected. Since there's a newer socket,
      # don't trigger leave events or expirations for the old one.
      #
      if @adapter.current?(socket.id, socket.object_id)
        socket.leave
        @adapter.expire(socket.id, @timeout_config.disconnect)
      end
    end
  }
end

#socket_subscribe(id_or_socket, *channels) ⇒ Object



80
81
82
83
84
85
86
87
# File 'lib/pakyow/realtime/server.rb', line 80

def socket_subscribe(id_or_socket, *channels)
  @executor << -> {
    find_socket_id(id_or_socket) do |socket_id|
      @adapter.socket_subscribe(socket_id, *channels)
      @adapter.expire(socket_id, @timeout_config.initial)
    end
  }
end

#socket_unsubscribe(*channels) ⇒ Object



89
90
91
92
93
# File 'lib/pakyow/realtime/server.rb', line 89

def socket_unsubscribe(*channels)
  @executor << -> {
    @adapter.socket_unsubscribe(*channels)
  }
end

#subscription_broadcast(channel, message) ⇒ Object



95
96
97
98
99
# File 'lib/pakyow/realtime/server.rb', line 95

def subscription_broadcast(channel, message)
  @executor << -> {
    @adapter.subscription_broadcast(channel.to_s, channel: channel.name, message: message)
  }
end

#transmit_message_to_connection_ids(message, socket_ids, raw: false) ⇒ Object

Called by the adapter, which guarantees that this server has connections for these ids.



103
104
105
106
107
# File 'lib/pakyow/realtime/server.rb', line 103

def transmit_message_to_connection_ids(message, socket_ids, raw: false)
  socket_ids.each do |socket_id|
    find_socket_by_id(socket_id)&.transmit(message, raw: raw)
  end
end