Top Level Namespace

Constant Summary collapse

MAX_WS_FRAME_SIZE =

Key Variables

50.0

Instance Method Summary collapse

Instance Method Details

#accept_connection(socket, ws_key) ⇒ Object

accept_connection

Sends back the HTTP header to initialize the websocket connection.



196
197
198
199
200
201
202
203
204
205
# File 'lib/easel/websocket.rb', line 196

def accept_connection(socket, ws_key)

  ws_accept_key = Digest::SHA1.base64digest(
    ws_key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
  socket.write "HTTP/1.1 101 Switching Protocols\r\n" +
               "Upgrade: websocket\r\n" +
               "Connection: Upgrade\r\n" +
               "Sec-WebSocket-Accept: #{ws_accept_key}\r\n" +
               "\r\n"
end

#build_appObject

build_app



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/easel/build_pages.rb', line 21

def build_app
  app_erb = File.new("#{File.dirname(__FILE__)}/../html/app.html.erb").read
  page = ERB.new(app_erb).result()

  "HTTP/1.1 200 OK\r\n" +
  "Content-Type: text/html; charset=UTF-8\r\n" +
  "Content-Length: #{page.bytesize}\r\n" +
  "Connection: close\r\n" +
  "\r\n" +
  page
end

#build_cssObject

build_css



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/easel/build_pages.rb', line 82

def build_css
  error_erb = File.new("#{File.dirname(__FILE__)}/../html/app.css.erb").read
  css = ERB.new(error_erb).result(binding)

  "HTTP/1.1 200 OK\r\n" +
  "Content-Type: text/css; charset=UTF-8\r\n" +
  "Content-Length: #{css.bytesize}\r\n" +
  "Connection: close\r\n" +
  "\r\n" +
  css
end

#build_error(code) ⇒ Object

build_error



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/easel/build_pages.rb', line 98

def build_error code
  error_erb = File.new("#{File.dirname(__FILE__)}/../html/error.html.erb").read
  page = ERB.new(error_erb).result(binding)

  "HTTP/1.1 #{code} #{@code_names[code]}\r\n" +
  "Content-Type: text/html; charset=UTF-8\r\n" +
  "Content-Length: #{page.bytesize}\r\n" +
  "Connection: close\r\n" +
  "\r\n" +
  page
end

#build_jsObject

build_js



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/easel/build_pages.rb', line 36

def build_js
  js_erb = File.new("#{File.dirname(__FILE__)}/../html/controller.js.erb").read
  page = ERB.new(js_erb).result()

  "HTTP/1.1 200 OK\r\n" +
  "Content-Type: text/javascript; charset=UTF-8\r\n" +
  "Content-Length: #{page.bytesize}\r\n" +
  "Connection: close\r\n" +
  "\r\n" +
  page
end

#collect_dataObject

collect_data

Collects information on the current state of the system, and writes it to



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/easel/data_gathering.rb', line 49

def collect_data
  new_data = {}

  # TODO: Check which element type something is to figure out how to handle it.

  log_info "Collecting data."
  $config[:dashboards].each{ |dashboard|

    new_data[dashboard[:id]] = {}
    dashboard[:elements].each_with_index{ |element, e_index|
      new_data[dashboard[:id]][e_index] = {}

      element[:data].each_with_index{ |data, index|
        output = `#{data[:cmd]}`
        log_info "Ran `#{data[:cmd]}`, got: #{output}"
        begin
          value = output.match(/#{data[:regex]}/)[1]
        rescue NoMethodError => e
          log_error "Data failed to be parsed. Regex: /#{data[:regex]}/ -- Output: #{output}"
        end
        new_data[dashboard[:id]][e_index][index] = [
          Time.new.strftime("%H:%M:%S"),
          value
        ]
      }
    }
  }
  write_data new_data
end

#get_command(cmd_id) ⇒ Object

get_command



184
185
186
187
188
189
190
191
# File 'lib/easel/websocket.rb', line 184

def get_command cmd_id
  $config[:commands].each { |cmd|
    if cmd[:id] == cmd_id
      return cmd[:cmd]
    end
  }
  nil
end

#handle_get(socket, request) ⇒ Object

handle_get

Handle a get request.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/easel/server.rb', line 93

def handle_get(socket, request)

  case request[:url]
  when "/", "/index.html"
    socket.print build_app
    socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n"
  when "/app.css"
    socket.print build_css
    socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n"
  when "/controller.js"
    log_info "building controller"
    socket.print build_js
    socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n"
  when "/dashboardElements.js"
    socket.print return_js 'dashboardElements.js'
    socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n"
  when "/createComponents.js"
    socket.print return_js 'createComponents.js'
    socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n"
  # TODO: respond with favicon
  else
    socket.print build_error 404
    socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n"
  end

  log_info "Handled HTTP request: #{request}"

end

#handle_request(socket) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/easel/server.rb', line 45

def handle_request socket

  log_info "Receieved request: #{socket}"
  request = read_HTTP_message socket

  if request.nil?
    socket.print build_error 400
    socket.close
    return
  end

  case request[:method]
  when "GET"
    if request[:fields][:Upgrade] == "websocket\r\n"
      run_websocket(socket, request)
    else
        handle_get(socket, request)
    end
  when "HEAD"
    if request[:fields][:Upgrade] == "websocket\r\n"
    else
      read_end, write_end = IO.pipe
      handle_get(write_end, request)
      msg = ""
      loop do
        msg += read_end.readline
        if msg.include? "\r\n\r\n"
          socket.print msg
          socket.close
          break
        elsif read_end.readline.nil?
          socket.print build_error 500
          socket.close
          break
        end
      end
    end
  else
    puts "THIS SHOULD NOT OCCUREEEEEE"
    socket.print build_error 400
    socket.close
  end

end

#launchObject

launch

Launches the Easel Dashboard. Check the $config variable for defaults, although everything can be overridden by the YAML file (and some by the command line arguments).



22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/easel.rb', line 22

def launch

  parse_ARGV

  # Load the provided YAML
  overwrite_config $config[:yaml_file]
  log_info("YAML loaded successfully (from: #{$config[:yaml_file]})")
  $config[:commands].each_with_index{ |cmd, i| cmd[:id] = i } # Give commands an ID.
  $config.freeze # Set config to read only

  # Lauch the server
  log_info("Launching server at #{$config[:hostname]}:#{$config[:port]}")
  launch_server
end

#launch_data_collectionObject

launch_data_collection

Launch a background thread to start collecting system info in the background.



35
36
37
38
39
40
41
42
# File 'lib/easel/data_gathering.rb', line 35

def launch_data_collection
  Thread.new do
    loop do
      collect_data
      sleep $config[:collect_data_period]
    end
  end
end

#launch_easel(config) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/easel/controller.rb', line 9

def launch_easel config

  # r_server_data = Ractor.new config do
  #   'this is a message' # TODO: Implement this.
  # end

  tcp_listener = Ractor.new(config) do |config|
    launch_server config
  end

  tcp_listener.take

end

#launch_serverObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/easel/server.rb', line 15

def launch_server

  # Lauch the TCPServer
  begin
    server = TCPServer.new($config[:hostname], $config[:port])
  rescue Exception => e
    log_fatal "Server could not start. Error message: #{e}"
  end

  # Lauch data collection if turned on.
  launch_data_collection unless $config[:collect_data_period] == 0

  Thread.abort_on_exception = true

  # Main Loop
  begin
    loop {
      Thread.start(server.accept) do |client|
        handle_request client
    end
  }
  rescue Interrupt # Handle shutting down.
    log_info "Interrupt received, server shutting down..."
  rescue Exception => e
    log_error "Unexpected error occured and closed client connection. Error: #{e}"
    e.backtrace.each { |trace| log_error "#{trace}" }
  end
end

#log_error(*msg) ⇒ Object



19
20
21
22
23
# File 'lib/easel/logging.rb', line 19

def log_error *msg
  unless $config[:logging] < 1
    $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] ERROR: " + msg.join(" ")
  end
end

#log_fatal(*msg) ⇒ Object



12
13
14
15
16
17
# File 'lib/easel/logging.rb', line 12

def log_fatal *msg
  unless $config[:logging] == 0
    $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] FATAL: " + msg.join(" ")
  end
  exit 1
end

#log_info(*msg) ⇒ Object



31
32
33
34
35
# File 'lib/easel/logging.rb', line 31

def log_info *msg
  unless $config[:logging] < 3
    $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] INFO: " + msg.join(" ")
  end
end

#log_warning(*msg) ⇒ Object



25
26
27
28
29
# File 'lib/easel/logging.rb', line 25

def log_warning *msg
  unless $config[:logging] < 2
    $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] WARNING: " + msg.join(" ")
  end
end

#loop_overwrite(config, yaml) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/easel.rb', line 108

def loop_overwrite (config, yaml)
  yaml.each_key { |key|
      if yaml[key].is_a? Hash
        loop_overwrite(config[key.to_sym], yaml[key])
      elsif yaml[key].is_a? Array
          config[key.to_sym] = []
          yaml[key].each { |elmnt|
            element = {}
            loop_overwrite(element, elmnt)
            config[key.to_sym] << element
          }
      else
        config[key.to_sym] = yaml[key]
      end
  }
end

#overwrite_config(yaml_filename) ⇒ Object

overwrite_config

Overwrites the $config fields that are set in the YAML file provided on input. TODO: Log (error?) every key in the YAML file that does not exist in $config.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/easel.rb', line 93

def overwrite_config yaml_filename

  # TODO: Rewrite using pattern matching to allow checking if the
  # yaml_contents.each is one of the base keys. If so, check that the associated
  # value matches the expected nesting. Do that 'no other values' check.

  # TODO: Ensure that the command names are less than 1020 bytes (because I'm
  # setting the max length of a single websocket message to 1024 (minus 'STOP:'))

  begin
    yaml_contents = YAML.load_file $config[:yaml_file]
  rescue Exception => e
    log_fatal "YAML failed to load. Error Message: #{e}"
  end

  def loop_overwrite (config, yaml)
    yaml.each_key { |key|
        if yaml[key].is_a? Hash
          loop_overwrite(config[key.to_sym], yaml[key])
        elsif yaml[key].is_a? Array
            config[key.to_sym] = []
            yaml[key].each { |elmnt|
              element = {}
              loop_overwrite(element, elmnt)
              config[key.to_sym] << element
            }
        else
          config[key.to_sym] = yaml[key]
        end
    }
  end

  loop_overwrite($config, yaml_contents)
end

#parse_ARGVObject

parse_ARGV

Parses the command line arguments (ARGV) using the optparse gem. Optional command line arguments can be seen by running this program with the -h (or –help) flag.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/easel.rb', line 43

def parse_ARGV
  opt_parser = OptionParser.new do |opts|
    opts.banner = "Useage: launch.rb [flags] configuration.yaml"

    opts.on("-h", "--help", "Prints this help message.") do
      puts opts
      exit
    end

    opts.on("-l LOG_LEVEL", "--log LOG_LEVEL", Integer, "Sets the logging level (default=2). 0=Silent, 1=Errors, 2=Warnings, 3=Info.") do |lvl|
      if [0, 1, 2, 3].include?(lvl)
        $config[:logging] = lvl
      else
        log_fatal "Command argument LOG_LEVEL '#{lvl}' not recognized. Expected 0, 1, 2, or 3."
      end
    end

    opts.on("-p PORT", "--port PORT", Integer, "Sets the port to bind to. Default is #{$config[:port]}.") do |port|
      if port >= 0 and port <= 65535
        $config[:port] = port
      else
        log_fatal "Command argument PORT '#{port}' not a valid port. Must be between 0 and 65535 (inclusive)"
      end
    end

    opts.on("-h HOST", "--hostname HOST",  "Sets the hostname. Default is '#{$config[:hostname]}'.") do |host|
      $config[:host] = host
    end

    opts.on("-o [FILE]", "--output [FILE]",  "Set a log file.") do |filename|
      begin
        $config[:log_file] = File.new(filename, "a")
      rescue Exception => e
        log_error "Log file could not be open. Sending log to STDIN. Error message: #{e}"
      end
    end
  end.parse!

  if ARGV.length != 1
    log_fatal "launch.rb takes exactly one file. Try -h for more details."
  else
    $config[:yaml_file] = ARGV[0]
  end
end

#read_dataObject

read_data

Reads (copies) @collected_data, and handles the Readers/Writer problem by using semaphores and a mutex. Returns a copy



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/easel/data_gathering.rb', line 125

def read_data

  joined = false
  until joined
    @join_mutex.synchronize {
      if @writers_semaphore.available_permits == 0
        @readers_semaphore.release 1  # Increment @readers_semaphore
        # TODO: likely need to release the mutex.
        joined = true
      end
    }
    sleep 0.05 unless joined  # Wait 50ms to give another thread time to lock the @join_mutex.
  end

  # Read Data
  data = @collected_data.dup

  @readers_semaphore.acquire 1  # Decrement @readers_semaphore
  data
end

#read_HTTP_message(socket) ⇒ Object

read_HTTP_message

Read an HTTP message from the socket, and parse it into a request Hash.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/easel/server.rb', line 126

def read_HTTP_message socket
  message = []
  first_line = true
  loop do
    line = socket.gets
    if first_line
      return nil if line.nil? or not line.match(/^(GET|HEAD|POST|PUT|DELETE|OPTIONS|TRACE) .+ HTTP.+/)
      first_line = false
    end
    message << line
    if line == "\r\n"
      break
    end
  end

  request = {fields: {}}
  (request[:method], request[:url], request[:protocol]) = message[0].split(" ")

  message[1..-1].each{ |line|
    (key, value) = line.split(": ")
    request[:fields][key.split("-").join("_").to_sym] = value
  }
  request
end

#receive_msg(socket) ⇒ Object

receive_msg

Extremely naive websocket server. Requires masking, and a message with a length of at most 125. Needs to be updated to conform with RFC 6544



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/easel/websocket.rb', line 212

def receive_msg socket

  # Check first two bytes
  byte1 = socket.getbyte
  byte2 = socket.getbyte
  return nil if byte1.nil? or byte2.nil?
  if byte1 == 0x88  # Client is requesting that we close the connection.
    # TODO: Unsure how to properly handle this case. Right now the socket will close and
    # everything here will shut down - eventually? Kill all child threads first?
    log_info "Client requested the websocket be closed."
    socket.close
    return
  end
  fin = byte1 & 0b10000000
  opcode = byte1 & 0b00001111
  msg_size = byte2 & 0b01111111
  is_masked = byte2 & 0b10000000
  unless fin and opcode == 1 and is_masked and msg_size < MAX_WS_FRAME_SIZE
    log_error "Invalid websocket message received. #{fin}, #{opcode == 1}, #{is_masked}, #{msg_size}"
    msg_size.times { socket.getbyte }  # Read message from socket.
    return
  end

  # Get message
  mask = 4.times.map { socket.getbyte }
  raise "Not Integer: fin       > #{fin      }" if not fin.is_a? Integer
  raise "Not Integer: msg_size  > #{msg_size }" if not msg_size.is_a? Integer
  raise "Not Integer: opcode    > #{opcode   }" if not opcode.is_a? Integer
  raise "Not Integer: is_masked > #{is_masked}" if not is_masked.is_a? Integer
  raise "Not aRRAY: mask      > #{mask     }" if not mask.is_a? Array
  msg = msg_size.times.map { socket.getbyte }.each_with_index.map { |byte, i|
    byte ^ mask[i % 4]
  }.pack('C*').force_encoding('utf-8').inspect

  log_info "WebSocket received: #{msg}"

  msg[1..-2] # Remove quotation marks from message



end

#return_html(file) ⇒ Object

return_html



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/easel/build_pages.rb', line 67

def return_html file

  page = File.new("#{File.dirname(__FILE__)}/../html/#{file}").read

  "HTTP/1.1 200 OK\r\n" +
  "Content-Type: text/html; charset=UTF-8\r\n" +
  "Content-Length: #{page.bytesize}\r\n" +
  "Connection: close\r\n" +
  "\r\n" +
  page
end

#return_js(file) ⇒ Object

return_js



52
53
54
55
56
57
58
59
60
61
62
# File 'lib/easel/build_pages.rb', line 52

def return_js file

  page = File.new("#{File.dirname(__FILE__)}/../html/#{file}").read

  "HTTP/1.1 200 OK\r\n" +
  "Content-Type: text/javascript; charset=UTF-8\r\n" +
  "Content-Length: #{page.bytesize}\r\n" +
  "Connection: close\r\n" +
  "\r\n" +
  page
end

#run_command_and_stream(socket, cmd_id, send_msg_mutex) ⇒ Object

run_command_and_stream

Run a command and stream the stdout and stderr through the websocket.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/easel/websocket.rb', line 145

def run_command_and_stream(socket, cmd_id, send_msg_mutex)

  cmd = get_command cmd_id
  if cmd.nil?
    log_error "Client requested command ID #{cmd_id} be run, but that ID does not exist."
    return
  end
  Open3::popen3(cmd) do |stdin, stdout, stderr, cmd_thread|

    continue = true

    while ready_fds = IO.select([stdout, stderr])[0]
      ready_fds.each{ |fd|
        resp = fd.gets
        if resp.nil?
          continue = false
          break
        end
        if fd == stdout
          send_msg(socket, send_msg_mutex, cmd_id, "OUT", resp)
        elsif fd == stderr
          send_msg(socket, send_msg_mutex, cmd_id, "ERR", resp)
        else
          raise "Received output from popen3(#{cmd}) that was not via stdout or stderr."
        end
      }
      break unless continue
    end

    cmd_thread.join
    send_msg(socket, send_msg_mutex, cmd_id, "FINISHED")
  end
end

#run_websocket(socket, initial_request) ⇒ Object

run_websocket



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/easel/websocket.rb', line 78

def run_websocket(socket, initial_request)

  accept_connection(socket, initial_request[:fields][:Sec_WebSocket_Key][0..-3])
  log_info "Accepted WebSocket Connection"
  child_threads = {}
  send_msg_mutex = Mutex.new # One mutex per websocket to control sending messages.


  Thread.new {  # Periodically update the generic dashboard if set.
    loop do
      #begin
        data = read_data
        send_msg(socket, send_msg_mutex, nil, "DASH", data)
      #rescue Errno::EPIPE
      #  log_info "Pipe closed erorr while sending periodic data to client"
      #  break
      #end
      sleep $config[:collect_data_period]
    end
  } unless $config[:collect_data_period] == 0

  begin
    loop {
      begin
        msg = receive_msg socket
      rescue Errno::ECONNRESET => e
        log_error "Client reset the connection"
        socket.close
        msg = nil
      end
      break if msg.nil? # The socket was closed by the client.

      case msg.split(":")[1]
      when "RUN"
        cmd_id = msg.match(/^RUN:(.*)$/)[1].to_i

        unless child_threads[cmd_id]
          child_threads[cmd_id] = Thread.new do
            run_command_and_stream(socket, cmd_id, send_msg_mutex)
            child_threads[cmd_id] = nil
          end
        end

      when "STOP"

        cmd_id = msg.match(/^STOP:(.*)$/)[1].to_i
        unless child_threads[cmd_id].nil?
          child_threads[cmd_id].kill
          child_threads[cmd_id] = nil
        end

      else
        log_error "Received an unrecognized message over the websocket: #{msg}"
      end
    }
  rescue Exception => e
    log_error "Wuh Woh #2: #{e}"
    e.backtrace.each { |trace| log_error "#{trace}" }
    raise e
  end

end

#send_frame(socket, msg) ⇒ Object

send_frame

Sends a message over the websocket. Requires that the message be an appropriate length, and have the right format (eg. checks should be done before calling this function). TODO: Figure out the proper frame size (MAX_WS_FRAME_SIZE).



339
340
341
342
343
344
345
346
347
# File 'lib/easel/websocket.rb', line 339

def send_frame(socket, msg)

  output = [0b10000001, msg.size, msg]
  begin
    socket.write output.pack("CCA#{msg.size}")
  rescue IOError, Errno::EPIPE
    log_error "WebSocket is closed. Msg: #{msg}"
  end
end

#send_msg(socket, send_msg_mutex, cmd_id, msg_type, msg = nil) ⇒ Object

send_msg



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/easel/websocket.rb', line 258

def send_msg(socket, send_msg_mutex, cmd_id, msg_type, msg=nil)

  case msg_type
  when "OUT", "ERR" # See comments at the top of the file to explain this part of the protocol.
    header = "#{cmd_id}:#{msg_type}:"
    if header.length > MAX_WS_FRAME_SIZE
      log_error "Message header '#{msg_type}' is too long. Msg: #{msg}."
    elsif msg.nil?
      log_error "Message of type '#{msg_type}' sent without a message."
    else
      send_msg_mutex.synchronize {
        if msg.length > MAX_WS_FRAME_SIZE - header.length
          msg_part_len = MAX_WS_FRAME_SIZE - header.length
          msg_parts = (0..(msg.length-1)/msg_part_len).map{ |i|
            msg[i*msg_part_len,msg_part_len]
          }
          msg_parts.each{ |part|
            send_frame(socket, header + part)
          }
        else
          send_frame(socket, header + msg)
        end
      }
    end

  when "DASH"  # See comments at the top of the file to explain this part of the protocol.
    if msg.nil?
      log_error "Message of type '#{msg_type}' sent without a message."
    end
    msg.each_key { |dash_id|
      msg[dash_id].each_key { |ele_index|
        did = "#{dash_id}#{ele_index}"
        send_msg_mutex.synchronize {
          msg[dash_id][ele_index].each_key { |key|
            data_fragment = "#{key}->#{msg[dash_id][ele_index][key]}"
            if data_fragment.length > MAX_WS_FRAME_SIZE - (did.length + 3)
              msg_part_len = MAX_WS_FRAME_SIZE - (did.length + 7) # TODO: Handle case where header is longer than DID:XX/XX:
              msg_parts = (0..(data_fragment.length-1)/msg_part_len).map{ |i|
                data_fragment[i*msg_part_len,msg_part_len]
              }
              msg_parts.each_with_index{ |part, index|
                header = did + ":#{index + 1}/#{msg_parts.length}:"
                if header.length > MAX_WS_FRAME_SIZE
                  log_error "Message header '#{msg_type}' is too long. Data: #{data_fragment}."
                end
                send_frame(socket, header + part)
              }
            else
              send_frame(socket, did + ":A:" + data_fragment)
            end
          }
        }
      }
    }

  when "CLEAR", "FINISHED"
    if !msg.nil?
      log_error "Message of type '#{msg_type}' passed an empty message. Msg: #{msg}."
    end
    to_send = "#{cmd_id}:#{msg_type}"
    send_msg_mutex.synchronize {
      if to_send.length > MAX_WS_FRAME_SIZE
        log_error "Message of type '#{msg_type}' is too long. Msg: #{to_send}."
      else
        send_frame(socket, to_send)
      end
    }
  else
    log_error "Trying to send a websocket message with unrecognized type: #{msg_type}"
  end

  log_info "Message sent via WebSocket: #{msg}"

end

#write_data(data) ⇒ Object

write_data

Writes the data collected to @collected_data, and handles the Readers/Writer problem by using semaphores and a mutex.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/easel/data_gathering.rb', line 84

def write_data data

  joined = false
  until joined
    @join_mutex.synchronize {
      if @readers_semaphore.available_permits == 0 and @writers_semaphore.available_permits == 0
        @writers_semaphore.release 1  # Increment @writers_semaphore
        joined = true
      end
    }
    sleep 0.05  # Wait 50ms to give another thread time to lock the @join_mutex.
  end

  # Write Data
  data.each_key { |key|
    case key
    when :load
      @collected_data[:load] = [] if @collected_data[:load].nil?
      @collected_data[:load] << data[:load]
    else
      @collected_data[key] = data[key]
    end
  }

  # Log if @collected_data has gotten too large.
  if not @collected_data[:load].nil? and @collected_data[:load].length > 240
    if @collected_data[:load].length > 1000
      log_error "Easel dashboard has run collect_data more than a 1000 times. Memory size likely to be large."
    else
      log_warning "Easel dashboard starting to take up a lot of memory due to data_collection being turned on."
    end
  end

  @writers_semaphore.acquire 1  # Decrement @writers_semaphore
end