Class: Acquia::Cloud::Logs::Streamer

Inherits:
Object
  • Object
show all
Defined in:
lib/acquia/cloud/logs/streamer.rb

Overview

Constant Summary collapse

RECV_LENGTH =

Try to retrieve 16KB blocks from the socket.

16 * 1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, message) ⇒ Streamer

Returns a new instance of Streamer.



18
19
20
21
22
23
24
25
26
# File 'lib/acquia/cloud/logs/streamer.rb', line 18

def initialize(url, message)
  @url = url
  @message = message
  @logs = []
  @available = []
  @enabled_types = []
  @keepalive_duration = 30
  @last_keepalive = @last_data = Time.now
end

Instance Attribute Details

#keepalive_durationObject

Returns the value of attribute keepalive_duration.



16
17
18
# File 'lib/acquia/cloud/logs/streamer.rb', line 16

def keepalive_duration
  @keepalive_duration
end

#remote_serverObject (readonly)

Returns the value of attribute remote_server.



15
16
17
# File 'lib/acquia/cloud/logs/streamer.rb', line 15

def remote_server
  @remote_server
end

Instance Method Details

#closeObject



50
51
52
# File 'lib/acquia/cloud/logs/streamer.rb', line 50

def close
  @socket.close
end

#connectObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/acquia/cloud/logs/streamer.rb', line 32

def connect
  raise StreamerConnectionError, 'Already connected.' unless @socket.nil?

  @handshake = ::WebSocket::Handshake::Client.new url: @url
  uri = ::URI.parse @url
  @socket = TCPSocket.new uri.host, uri.port || 80
  @socket.print @handshake.to_s

  until @handshake.finished?
    @handshake << @socket.gets
  end
  raise StreamerConnectionError, "Couldn't get a valid response while connecting to the remote server: #{@url}" unless @handshake.valid?
  @incoming = WebSocket::Frame::Incoming::Client.new(version: @handshake.version)

  # Initial authentication
  send @message
end

#debugObject



28
29
30
# File 'lib/acquia/cloud/logs/streamer.rb', line 28

def debug
  @debug = true
end

#disable(opts = {}) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/acquia/cloud/logs/streamer.rb', line 79

def disable(opts = {})
  if opts[:source]
    opts[:server] = opts[:source].server
    opts[:type] = opts[:source].type
  end

  send(
    cmd: 'disable',
    server: opts[:server],
    type: opts[:type],
  )
end

#disable_type(type) ⇒ Object



92
93
94
95
96
97
# File 'lib/acquia/cloud/logs/streamer.rb', line 92

def disable_type(type)
  @available.each do |source|
    disable source: source
  end
  @enabled_types.delete type
end

#each_logObject



99
100
101
102
103
104
# File 'lib/acquia/cloud/logs/streamer.rb', line 99

def each_log
  update
  until @logs.empty?
    yield @logs.shift
  end
end

#enable(opts = {}) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/acquia/cloud/logs/streamer.rb', line 59

def enable(opts = {})
  if opts[:source]
    opts[:server] = opts[:source].server
    opts[:type] = opts[:source].type
  end

  send(
    cmd: 'enable',
    server: opts[:server],
    type: opts[:type],
  )
end

#enable_type(type) ⇒ Object



72
73
74
75
76
77
# File 'lib/acquia/cloud/logs/streamer.rb', line 72

def enable_type(type)
  @available.each do |source|
    enable source: source
  end
  @enabled_types << type
end

#send(message) ⇒ Object

protected



108
109
110
111
112
113
# File 'lib/acquia/cloud/logs/streamer.rb', line 108

def send(message)
  message = message.to_json
  STDERR.puts "-> #{message}" if @debug
  frame = ::WebSocket::Frame::Outgoing::Client.new(version: @handshake.version, type: 'text', data: message)
  @socket.send frame.to_s, 0
end

#sourcesObject



54
55
56
57
# File 'lib/acquia/cloud/logs/streamer.rb', line 54

def sources
  update
  @available.clone
end

#updateObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/acquia/cloud/logs/streamer.rb', line 115

def update
  # Send a keepalive if required
  if Time.now - @last_keepalive < @keepalive_duration
    send(cmd: 'keepalive')
  end

  # Read as much as possible
  while true
    read = @socket.read_nonblock(RECV_LENGTH)
    @last_data = Time.now
    @incoming << read
  end
rescue ::IO::WaitReadable => e
  if Time.now - @last_data > @keepalive_duration * 1.5
    # There hasn't been a socket level error, but we're worried at this
    # point. Trigger a proper socket close 'just in case' then flag a
    # broken pipe to downstream code.
    close
    raise Errno::EPIPE
  end
  loop do
    msg = @incoming.next
    break unless msg
    STDERR.puts "<- #{msg}" if @debug

    msg = JSON.parse(msg.data)
    case msg['cmd']
      when 'connected'
        @remote_server = msg['server'] if msg['server'].start_with? 'logstream-api'
      when 'success'
        # Congratulations!
      when 'error'
        raise StreamerRemoteError, "#{msg['code']}: #{msg['description']} during #{msg['during'].inspect}"
      when 'available'
        source = Source.new(msg['server'], msg['type'], msg['display_type'])
        @available << source
        enable source: source if @enabled_types.include? source.type
      when 'line'
        @logs << msg
      else
        if @debug
          STDERR.puts "Received unknown command: #{msg['cmd']}"
          STDERR.puts msg.inspect
        else
          raise StreamerUnrecognisedCommandError, "Unrecognised command: #{msg['cmd']}\n#{msg.inspect}"
        end
    end
  end
end