Class: KRPC::Streaming::StreamsManager
- Inherits:
-
Object
- Object
- KRPC::Streaming::StreamsManager
- Defined in:
- lib/krpc/streaming.rb
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
Instance Method Summary collapse
-
#create_stream(call, return_type, method, *args, **kwargs) ⇒ Object
Send the streaming request, create related Stream object and return it.
-
#initialize(client) ⇒ StreamsManager
constructor
A new instance of StreamsManager.
-
#remove_all_streams ⇒ Object
Remove all streams created by this streams manager.
-
#remove_stream(stream) ⇒ Object
Remove the streaming request and deactivate the Stream object.
-
#start_streaming_thread ⇒ Object
Start streaming thread.
-
#stop_streaming_thread ⇒ Object
Stop streaming thread.
Constructor Details
#initialize(client) ⇒ StreamsManager
Returns a new instance of StreamsManager.
10 11 12 13 14 15 |
# File 'lib/krpc/streaming.rb', line 10 def initialize(client) @client = client @streams = {} @streams_mutex = Mutex.new @streaming_thread = Thread.new {} end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
8 9 10 |
# File 'lib/krpc/streaming.rb', line 8 def client @client end |
Instance Method Details
#create_stream(call, return_type, method, *args, **kwargs) ⇒ Object
Send the streaming request, create related Stream object and return it. If identical Stream already exists, doesn’t create new Stream and return the existing one.
19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/krpc/streaming.rb', line 19 def create_stream(call, return_type, method, *args, **kwargs) raise RuntimeError("Cannot stream a property setter") if method.name.to_s.end_with? '=' stream_msg = client.core.add_stream(call) id = stream_msg.id @streams_mutex.synchronize do if @streams.include? id @streams[id] else value = method.call(*args, **kwargs) @streams[id] = Stream.new(self, id, return_type, value, method, *args, **kwargs) end end end |
#remove_all_streams ⇒ Object
Remove all streams created by this streams manager.
48 49 50 |
# File 'lib/krpc/streaming.rb', line 48 def remove_all_streams @streams.each {|_,stream| remove_stream(stream)} end |
#remove_stream(stream) ⇒ Object
Remove the streaming request and deactivate the Stream object. Returns ‘true` if the streaming request has been removed or `false` if passed Stream object is already inactive.
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/krpc/streaming.rb', line 35 def remove_stream(stream) return false unless stream.active? @streams_mutex.synchronize do return false unless @streams.include? stream.id client.core.remove_stream stream.id @streams.delete stream.id end stream.value = RuntimeError.new("Stream has been removed") stream.mark_as_inactive true end |
#start_streaming_thread ⇒ Object
Start streaming thread. It receives stream data, and updates Stream object’s ‘value` attribute.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/krpc/streaming.rb', line 53 def start_streaming_thread stop_streaming_thread @streaming_thread = Thread.new do connection = client.stream_connection loop do size = connection.recv_varint data = connection.recv(size) stream_msg = PB::StreamUpdate.decode(data) @streams_mutex.synchronize do stream_msg.results.each do |result| next unless @streams.include? result.id stream = @streams[result.id] if result.result.field_empty? :error stream.value = Decoder.decode(result.result.value, stream.return_type, client) else stream.value = client.build_exception(result.result.error) end end end end end end |
#stop_streaming_thread ⇒ Object
Stop streaming thread.
77 78 79 |
# File 'lib/krpc/streaming.rb', line 77 def stop_streaming_thread @streaming_thread.terminate end |