Class: KRPC::Streaming::StreamsManager

Inherits:
Object
  • Object
show all
Defined in:
lib/krpc/streaming.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ StreamsManager

Returns a new instance of StreamsManager.



10
11
12
13
14
15
# File 'lib/krpc/streaming.rb', line 10

def initialize(client)
  @client = client
  @streams = {}
  @streams_mutex = Mutex.new
  @streaming_thread = Thread.new {}
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



8
9
10
# File 'lib/krpc/streaming.rb', line 8

def client
  @client
end

Instance Method Details

#create_stream(call, return_type, method, *args, **kwargs) ⇒ Object

Send the streaming request, create related Stream object and return it. If identical Stream already exists, doesn’t create new Stream and return the existing one.



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/krpc/streaming.rb', line 19

def create_stream(call, return_type, method, *args, **kwargs)
  raise RuntimeError("Cannot stream a property setter") if method.name.to_s.end_with? '='
  stream_msg = client.core.add_stream(call)
  id = stream_msg.id
  @streams_mutex.synchronize do
    if @streams.include? id
      @streams[id]
    else
      value = method.call(*args, **kwargs)
      @streams[id] = Stream.new(self, id, return_type, value, method, *args, **kwargs)
    end
  end
end

#remove_all_streamsObject

Remove all streams created by this streams manager.



48
49
50
# File 'lib/krpc/streaming.rb', line 48

def remove_all_streams
  @streams.each {|_,stream| remove_stream(stream)}
end

#remove_stream(stream) ⇒ Object

Remove the streaming request and deactivate the Stream object. Returns ‘true` if the streaming request has been removed or `false` if passed Stream object is already inactive.



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/krpc/streaming.rb', line 35

def remove_stream(stream)
  return false unless stream.active?
  @streams_mutex.synchronize do
    return false unless @streams.include? stream.id
    client.core.remove_stream stream.id
    @streams.delete stream.id
  end
  stream.value = RuntimeError.new("Stream has been removed")
  stream.mark_as_inactive
  true
end

#start_streaming_threadObject

Start streaming thread. It receives stream data, and updates Stream object’s ‘value` attribute.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/krpc/streaming.rb', line 53

def start_streaming_thread
  stop_streaming_thread
  @streaming_thread = Thread.new do
    connection = client.stream_connection
    loop do
      size = connection.recv_varint
      data = connection.recv(size)
      stream_msg = PB::StreamUpdate.decode(data)
      @streams_mutex.synchronize do
        stream_msg.results.each do |result|
          next unless @streams.include? result.id
          stream = @streams[result.id]
          if result.result.field_empty? :error
            stream.value = Decoder.decode(result.result.value, stream.return_type, client)
          else
            stream.value = client.build_exception(result.result.error)
          end
        end
      end
    end
  end
end

#stop_streaming_threadObject

Stop streaming thread.



77
78
79
# File 'lib/krpc/streaming.rb', line 77

def stop_streaming_thread
  @streaming_thread.terminate
end