Class: Pakyow::Realtime::Server
- Inherits:
-
Object
- Object
- Pakyow::Realtime::Server
- Defined in:
- lib/pakyow/realtime/server.rb,
lib/pakyow/realtime/server/adapters/redis.rb,
lib/pakyow/realtime/server/adapters/memory.rb
Defined Under Namespace
Modules: Adapters
Constant Summary collapse
- HEARTBEAT_INTERVAL =
3
Instance Attribute Summary collapse
-
#adapter ⇒ Object
readonly
Returns the value of attribute adapter.
Instance Method Summary collapse
- #connect ⇒ Object
- #disconnect ⇒ Object
- #find_socket(id_or_socket) {|socket| ... } ⇒ Object
- #find_socket_by_id(socket_id) ⇒ Object
- #find_socket_id(id_or_socket) {|socket_id| ... } ⇒ Object
-
#initialize(adapter = :memory, adapter_config, timeout_config) ⇒ Server
constructor
A new instance of Server.
- #shutdown ⇒ Object
- #socket_connect(id_or_socket) ⇒ Object
- #socket_disconnect(id_or_socket) ⇒ Object
- #socket_subscribe(id_or_socket, *channels) ⇒ Object
- #socket_unsubscribe(*channels) ⇒ Object
- #subscription_broadcast(channel, message) ⇒ Object
-
#transmit_message_to_connection_ids(message, socket_ids, raw: false) ⇒ Object
Called by the adapter, which guarantees that this server has connections for these ids.
Constructor Details
#initialize(adapter = :memory, adapter_config, timeout_config) ⇒ Server
Returns a new instance of Server.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/pakyow/realtime/server.rb', line 18 def initialize(adapter = :memory, adapter_config, timeout_config) require "pakyow/realtime/server/adapters/#{adapter}" @adapter = Adapters.const_get(adapter.to_s.capitalize).new(self, adapter_config) @sockets = Concurrent::Array.new @timeout_config = timeout_config @executor = Concurrent::SingleThreadExecutor.new( auto_terminate: false ) connect rescue LoadError => e Pakyow.logger.error "Failed to load data subscriber store adapter named `#{adapter}'" Pakyow.logger.error e. end |
Instance Attribute Details
#adapter ⇒ Object (readonly)
Returns the value of attribute adapter.
14 15 16 |
# File 'lib/pakyow/realtime/server.rb', line 14 def adapter @adapter end |
Instance Method Details
#connect ⇒ Object
40 41 42 43 44 |
# File 'lib/pakyow/realtime/server.rb', line 40 def connect @executor << -> { start_heartbeat; @adapter.connect } end |
#disconnect ⇒ Object
46 47 48 49 50 |
# File 'lib/pakyow/realtime/server.rb', line 46 def disconnect @executor << -> { stop_heartbeat; @adapter.disconnect } end |
#find_socket(id_or_socket) {|socket| ... } ⇒ Object
113 114 115 116 117 118 119 120 121 |
# File 'lib/pakyow/realtime/server.rb', line 113 def find_socket(id_or_socket) socket = if id_or_socket.is_a?(WebSocket) id_or_socket else find_socket_by_id(id_or_socket) end yield socket if socket end |
#find_socket_by_id(socket_id) ⇒ Object
109 110 111 |
# File 'lib/pakyow/realtime/server.rb', line 109 def find_socket_by_id(socket_id) @sockets.find { |socket| socket.id == socket_id } end |
#find_socket_id(id_or_socket) {|socket_id| ... } ⇒ Object
123 124 125 126 127 128 129 130 131 |
# File 'lib/pakyow/realtime/server.rb', line 123 def find_socket_id(id_or_socket) socket_id = if id_or_socket.is_a?(WebSocket) id_or_socket.id else id_or_socket end yield socket_id if socket_id end |
#shutdown ⇒ Object
33 34 35 36 37 38 |
# File 'lib/pakyow/realtime/server.rb', line 33 def shutdown disconnect @sockets.each(&:shutdown) @executor.shutdown @executor.wait_for_termination(30) end |
#socket_connect(id_or_socket) ⇒ Object
52 53 54 55 56 57 58 59 60 |
# File 'lib/pakyow/realtime/server.rb', line 52 def socket_connect(id_or_socket) @executor << -> { find_socket(id_or_socket) do |socket| @sockets << socket @adapter.persist(socket.id) @adapter.current!(socket.id, socket.object_id) end } end |
#socket_disconnect(id_or_socket) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/pakyow/realtime/server.rb', line 62 def socket_disconnect(id_or_socket) @executor << -> { find_socket(id_or_socket) do |socket| @sockets.delete(socket) # If this isn't the current instance for the socket id, it means that a # reconnect probably happened and the new socket connected before we # knew that the old one disconnected. Since there's a newer socket, # don't trigger leave events or expirations for the old one. # if @adapter.current?(socket.id, socket.object_id) socket.leave @adapter.expire(socket.id, @timeout_config.disconnect) end end } end |
#socket_subscribe(id_or_socket, *channels) ⇒ Object
80 81 82 83 84 85 86 87 |
# File 'lib/pakyow/realtime/server.rb', line 80 def socket_subscribe(id_or_socket, *channels) @executor << -> { find_socket_id(id_or_socket) do |socket_id| @adapter.socket_subscribe(socket_id, *channels) @adapter.expire(socket_id, @timeout_config.initial) end } end |
#socket_unsubscribe(*channels) ⇒ Object
89 90 91 92 93 |
# File 'lib/pakyow/realtime/server.rb', line 89 def socket_unsubscribe(*channels) @executor << -> { @adapter.socket_unsubscribe(*channels) } end |
#subscription_broadcast(channel, message) ⇒ Object
95 96 97 98 99 |
# File 'lib/pakyow/realtime/server.rb', line 95 def subscription_broadcast(channel, ) @executor << -> { @adapter.subscription_broadcast(channel.to_s, channel: channel.name, message: ) } end |
#transmit_message_to_connection_ids(message, socket_ids, raw: false) ⇒ Object
Called by the adapter, which guarantees that this server has connections for these ids.
103 104 105 106 107 |
# File 'lib/pakyow/realtime/server.rb', line 103 def (, socket_ids, raw: false) socket_ids.each do |socket_id| find_socket_by_id(socket_id)&.transmit(, raw: raw) end end |