Top Level Namespace
Constant Summary collapse
- MAX_WS_FRAME_SIZE =
Key Variables
50.0
Instance Method Summary collapse
-
#accept_connection(socket, ws_key) ⇒ Object
accept_connection.
-
#build_app ⇒ Object
build_app.
-
#build_css ⇒ Object
build_css.
-
#build_error(code) ⇒ Object
build_error.
-
#build_js ⇒ Object
build_js.
-
#collect_data ⇒ Object
collect_data.
-
#get_command(cmd_id) ⇒ Object
get_command.
-
#handle_get(socket, request) ⇒ Object
handle_get.
- #handle_request(socket) ⇒ Object
-
#launch ⇒ Object
launch.
-
#launch_data_collection ⇒ Object
launch_data_collection.
- #launch_easel(config) ⇒ Object
- #launch_server ⇒ Object
- #log_error(*msg) ⇒ Object
- #log_fatal(*msg) ⇒ Object
- #log_info(*msg) ⇒ Object
- #log_warning(*msg) ⇒ Object
- #loop_overwrite(config, yaml) ⇒ Object
-
#overwrite_config(yaml_filename) ⇒ Object
overwrite_config.
-
#parse_ARGV ⇒ Object
parse_ARGV.
-
#read_data ⇒ Object
read_data.
-
#read_HTTP_message(socket) ⇒ Object
read_HTTP_message.
-
#receive_msg(socket) ⇒ Object
receive_msg.
-
#return_html(file) ⇒ Object
return_html.
-
#return_js(file) ⇒ Object
return_js.
-
#run_command_and_stream(socket, cmd_id, send_msg_mutex) ⇒ Object
run_command_and_stream.
-
#run_websocket(socket, initial_request) ⇒ Object
run_websocket.
-
#send_frame(socket, msg) ⇒ Object
send_frame.
-
#send_msg(socket, send_msg_mutex, cmd_id, msg_type, msg = nil) ⇒ Object
send_msg.
-
#write_data(data) ⇒ Object
write_data.
Instance Method Details
#accept_connection(socket, ws_key) ⇒ Object
accept_connection
Sends back the HTTP header to initialize the websocket connection.
196 197 198 199 200 201 202 203 204 205 |
# File 'lib/easel/websocket.rb', line 196 def accept_connection(socket, ws_key) ws_accept_key = Digest::SHA1.base64digest( ws_key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11") socket.write "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: #{ws_accept_key}\r\n" + "\r\n" end |
#build_app ⇒ Object
build_app
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/easel/build_pages.rb', line 21 def build_app app_erb = File.new("#{File.dirname(__FILE__)}/../html/app.html.erb").read page = ERB.new(app_erb).result() "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end |
#build_css ⇒ Object
build_css
82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/easel/build_pages.rb', line 82 def build_css error_erb = File.new("#{File.dirname(__FILE__)}/../html/app.css.erb").read css = ERB.new(error_erb).result(binding) "HTTP/1.1 200 OK\r\n" + "Content-Type: text/css; charset=UTF-8\r\n" + "Content-Length: #{css.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + css end |
#build_error(code) ⇒ Object
build_error
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/easel/build_pages.rb', line 98 def build_error code error_erb = File.new("#{File.dirname(__FILE__)}/../html/error.html.erb").read page = ERB.new(error_erb).result(binding) "HTTP/1.1 #{code} #{@code_names[code]}\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end |
#build_js ⇒ Object
build_js
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/easel/build_pages.rb', line 36 def build_js js_erb = File.new("#{File.dirname(__FILE__)}/../html/controller.js.erb").read page = ERB.new(js_erb).result() "HTTP/1.1 200 OK\r\n" + "Content-Type: text/javascript; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end |
#collect_data ⇒ Object
collect_data
Collects information on the current state of the system, and writes it to
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/easel/data_gathering.rb', line 49 def collect_data new_data = {} # TODO: Check which element type something is to figure out how to handle it. log_info "Collecting data." $config[:dashboards].each{ |dashboard| new_data[dashboard[:id]] = {} dashboard[:elements].each_with_index{ |element, e_index| new_data[dashboard[:id]][e_index] = {} element[:data].each_with_index{ |data, index| output = `#{data[:cmd]}` log_info "Ran `#{data[:cmd]}`, got: #{output}" begin value = output.match(/#{data[:regex]}/)[1] rescue NoMethodError => e log_error "Data failed to be parsed. Regex: /#{data[:regex]}/ -- Output: #{output}" end new_data[dashboard[:id]][e_index][index] = [ Time.new.strftime("%H:%M:%S"), value ] } } } write_data new_data end |
#get_command(cmd_id) ⇒ Object
get_command
184 185 186 187 188 189 190 191 |
# File 'lib/easel/websocket.rb', line 184 def get_command cmd_id $config[:commands].each { |cmd| if cmd[:id] == cmd_id return cmd[:cmd] end } nil end |
#handle_get(socket, request) ⇒ Object
handle_get
Handle a get request.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/easel/server.rb', line 93 def handle_get(socket, request) case request[:url] when "/", "/index.html" socket.print build_app socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" when "/app.css" socket.print build_css socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" when "/controller.js" log_info "building controller" socket.print build_js socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" when "/dashboardElements.js" socket.print return_js 'dashboardElements.js' socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" when "/createComponents.js" socket.print return_js 'createComponents.js' socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" # TODO: respond with favicon else socket.print build_error 404 socket.close unless request[:Connection].is_a? String and request[:Connection].downcase == "keep-alive\r\n" end log_info "Handled HTTP request: #{request}" end |
#handle_request(socket) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/easel/server.rb', line 45 def handle_request socket log_info "Receieved request: #{socket}" request = socket if request.nil? socket.print build_error 400 socket.close return end case request[:method] when "GET" if request[:fields][:Upgrade] == "websocket\r\n" run_websocket(socket, request) else handle_get(socket, request) end when "HEAD" if request[:fields][:Upgrade] == "websocket\r\n" else read_end, write_end = IO.pipe handle_get(write_end, request) msg = "" loop do msg += read_end.readline if msg.include? "\r\n\r\n" socket.print msg socket.close break elsif read_end.readline.nil? socket.print build_error 500 socket.close break end end end else puts "THIS SHOULD NOT OCCUREEEEEE" socket.print build_error 400 socket.close end end |
#launch ⇒ Object
launch
Launches the Easel Dashboard. Check the $config variable for defaults, although everything can be overridden by the YAML file (and some by the command line arguments).
22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/easel.rb', line 22 def launch parse_ARGV # Load the provided YAML overwrite_config $config[:yaml_file] log_info("YAML loaded successfully (from: #{$config[:yaml_file]})") $config[:commands].each_with_index{ |cmd, i| cmd[:id] = i } # Give commands an ID. $config.freeze # Set config to read only # Lauch the server log_info("Launching server at #{$config[:hostname]}:#{$config[:port]}") launch_server end |
#launch_data_collection ⇒ Object
launch_data_collection
Launch a background thread to start collecting system info in the background.
35 36 37 38 39 40 41 42 |
# File 'lib/easel/data_gathering.rb', line 35 def launch_data_collection Thread.new do loop do collect_data sleep $config[:collect_data_period] end end end |
#launch_easel(config) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/easel/controller.rb', line 9 def launch_easel config # r_server_data = Ractor.new config do # 'this is a message' # TODO: Implement this. # end tcp_listener = Ractor.new(config) do |config| launch_server config end tcp_listener.take end |
#launch_server ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/easel/server.rb', line 15 def launch_server # Lauch the TCPServer begin server = TCPServer.new($config[:hostname], $config[:port]) rescue Exception => e log_fatal "Server could not start. Error message: #{e}" end # Lauch data collection if turned on. launch_data_collection unless $config[:collect_data_period] == 0 Thread.abort_on_exception = true # Main Loop begin loop { Thread.start(server.accept) do |client| handle_request client end } rescue Interrupt # Handle shutting down. log_info "Interrupt received, server shutting down..." rescue Exception => e log_error "Unexpected error occured and closed client connection. Error: #{e}" e.backtrace.each { |trace| log_error "#{trace}" } end end |
#log_error(*msg) ⇒ Object
19 20 21 22 23 |
# File 'lib/easel/logging.rb', line 19 def log_error *msg unless $config[:logging] < 1 $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] ERROR: " + msg.join(" ") end end |
#log_fatal(*msg) ⇒ Object
12 13 14 15 16 17 |
# File 'lib/easel/logging.rb', line 12 def log_fatal *msg unless $config[:logging] == 0 $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] FATAL: " + msg.join(" ") end exit 1 end |
#log_info(*msg) ⇒ Object
31 32 33 34 35 |
# File 'lib/easel/logging.rb', line 31 def log_info *msg unless $config[:logging] < 3 $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] INFO: " + msg.join(" ") end end |
#log_warning(*msg) ⇒ Object
25 26 27 28 29 |
# File 'lib/easel/logging.rb', line 25 def log_warning *msg unless $config[:logging] < 2 $config[:log_file].puts "[#{Time.new.strftime("%Y-%m-%d-%H:%M:%S")}] WARNING: " + msg.join(" ") end end |
#loop_overwrite(config, yaml) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/easel.rb', line 108 def loop_overwrite (config, yaml) yaml.each_key { |key| if yaml[key].is_a? Hash loop_overwrite(config[key.to_sym], yaml[key]) elsif yaml[key].is_a? Array config[key.to_sym] = [] yaml[key].each { |elmnt| element = {} loop_overwrite(element, elmnt) config[key.to_sym] << element } else config[key.to_sym] = yaml[key] end } end |
#overwrite_config(yaml_filename) ⇒ Object
overwrite_config
Overwrites the $config fields that are set in the YAML file provided on input. TODO: Log (error?) every key in the YAML file that does not exist in $config.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/easel.rb', line 93 def overwrite_config yaml_filename # TODO: Rewrite using pattern matching to allow checking if the # yaml_contents.each is one of the base keys. If so, check that the associated # value matches the expected nesting. Do that 'no other values' check. # TODO: Ensure that the command names are less than 1020 bytes (because I'm # setting the max length of a single websocket message to 1024 (minus 'STOP:')) begin yaml_contents = YAML.load_file $config[:yaml_file] rescue Exception => e log_fatal "YAML failed to load. Error Message: #{e}" end def loop_overwrite (config, yaml) yaml.each_key { |key| if yaml[key].is_a? Hash loop_overwrite(config[key.to_sym], yaml[key]) elsif yaml[key].is_a? Array config[key.to_sym] = [] yaml[key].each { |elmnt| element = {} loop_overwrite(element, elmnt) config[key.to_sym] << element } else config[key.to_sym] = yaml[key] end } end loop_overwrite($config, yaml_contents) end |
#parse_ARGV ⇒ Object
parse_ARGV
Parses the command line arguments (ARGV) using the optparse gem. Optional command line arguments can be seen by running this program with the -h (or –help) flag.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/easel.rb', line 43 def parse_ARGV opt_parser = OptionParser.new do |opts| opts. = "Useage: launch.rb [flags] configuration.yaml" opts.on("-h", "--help", "Prints this help message.") do puts opts exit end opts.on("-l LOG_LEVEL", "--log LOG_LEVEL", Integer, "Sets the logging level (default=2). 0=Silent, 1=Errors, 2=Warnings, 3=Info.") do |lvl| if [0, 1, 2, 3].include?(lvl) $config[:logging] = lvl else log_fatal "Command argument LOG_LEVEL '#{lvl}' not recognized. Expected 0, 1, 2, or 3." end end opts.on("-p PORT", "--port PORT", Integer, "Sets the port to bind to. Default is #{$config[:port]}.") do |port| if port >= 0 and port <= 65535 $config[:port] = port else log_fatal "Command argument PORT '#{port}' not a valid port. Must be between 0 and 65535 (inclusive)" end end opts.on("-h HOST", "--hostname HOST", "Sets the hostname. Default is '#{$config[:hostname]}'.") do |host| $config[:host] = host end opts.on("-o [FILE]", "--output [FILE]", "Set a log file.") do |filename| begin $config[:log_file] = File.new(filename, "a") rescue Exception => e log_error "Log file could not be open. Sending log to STDIN. Error message: #{e}" end end end.parse! if ARGV.length != 1 log_fatal "launch.rb takes exactly one file. Try -h for more details." else $config[:yaml_file] = ARGV[0] end end |
#read_data ⇒ Object
read_data
Reads (copies) @collected_data, and handles the Readers/Writer problem by using semaphores and a mutex. Returns a copy
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/easel/data_gathering.rb', line 125 def read_data joined = false until joined @join_mutex.synchronize { if @writers_semaphore.available_permits == 0 @readers_semaphore.release 1 # Increment @readers_semaphore # TODO: likely need to release the mutex. joined = true end } sleep 0.05 unless joined # Wait 50ms to give another thread time to lock the @join_mutex. end # Read Data data = @collected_data.dup @readers_semaphore.acquire 1 # Decrement @readers_semaphore data end |
#read_HTTP_message(socket) ⇒ Object
read_HTTP_message
Read an HTTP message from the socket, and parse it into a request Hash.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/easel/server.rb', line 126 def socket = [] first_line = true loop do line = socket.gets if first_line return nil if line.nil? or not line.match(/^(GET|HEAD|POST|PUT|DELETE|OPTIONS|TRACE) .+ HTTP.+/) first_line = false end << line if line == "\r\n" break end end request = {fields: {}} (request[:method], request[:url], request[:protocol]) = [0].split(" ") [1..-1].each{ |line| (key, value) = line.split(": ") request[:fields][key.split("-").join("_").to_sym] = value } request end |
#receive_msg(socket) ⇒ Object
receive_msg
Extremely naive websocket server. Requires masking, and a message with a length of at most 125. Needs to be updated to conform with RFC 6544
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/easel/websocket.rb', line 212 def receive_msg socket # Check first two bytes byte1 = socket.getbyte byte2 = socket.getbyte return nil if byte1.nil? or byte2.nil? if byte1 == 0x88 # Client is requesting that we close the connection. # TODO: Unsure how to properly handle this case. Right now the socket will close and # everything here will shut down - eventually? Kill all child threads first? log_info "Client requested the websocket be closed." socket.close return end fin = byte1 & 0b10000000 opcode = byte1 & 0b00001111 msg_size = byte2 & 0b01111111 is_masked = byte2 & 0b10000000 unless fin and opcode == 1 and is_masked and msg_size < MAX_WS_FRAME_SIZE log_error "Invalid websocket message received. #{fin}, #{opcode == 1}, #{is_masked}, #{msg_size}" msg_size.times { socket.getbyte } # Read message from socket. return end # Get message mask = 4.times.map { socket.getbyte } raise "Not Integer: fin > #{fin }" if not fin.is_a? Integer raise "Not Integer: msg_size > #{msg_size }" if not msg_size.is_a? Integer raise "Not Integer: opcode > #{opcode }" if not opcode.is_a? Integer raise "Not Integer: is_masked > #{is_masked}" if not is_masked.is_a? Integer raise "Not aRRAY: mask > #{mask }" if not mask.is_a? Array msg = msg_size.times.map { socket.getbyte }.each_with_index.map { |byte, i| byte ^ mask[i % 4] }.pack('C*').force_encoding('utf-8').inspect log_info "WebSocket received: #{msg}" msg[1..-2] # Remove quotation marks from message end |
#return_html(file) ⇒ Object
return_html
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/easel/build_pages.rb', line 67 def return_html file page = File.new("#{File.dirname(__FILE__)}/../html/#{file}").read "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end |
#return_js(file) ⇒ Object
return_js
52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/easel/build_pages.rb', line 52 def return_js file page = File.new("#{File.dirname(__FILE__)}/../html/#{file}").read "HTTP/1.1 200 OK\r\n" + "Content-Type: text/javascript; charset=UTF-8\r\n" + "Content-Length: #{page.bytesize}\r\n" + "Connection: close\r\n" + "\r\n" + page end |
#run_command_and_stream(socket, cmd_id, send_msg_mutex) ⇒ Object
run_command_and_stream
Run a command and stream the stdout and stderr through the websocket.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/easel/websocket.rb', line 145 def run_command_and_stream(socket, cmd_id, send_msg_mutex) cmd = get_command cmd_id if cmd.nil? log_error "Client requested command ID #{cmd_id} be run, but that ID does not exist." return end Open3::popen3(cmd) do |stdin, stdout, stderr, cmd_thread| continue = true while ready_fds = IO.select([stdout, stderr])[0] ready_fds.each{ |fd| resp = fd.gets if resp.nil? continue = false break end if fd == stdout send_msg(socket, send_msg_mutex, cmd_id, "OUT", resp) elsif fd == stderr send_msg(socket, send_msg_mutex, cmd_id, "ERR", resp) else raise "Received output from popen3(#{cmd}) that was not via stdout or stderr." end } break unless continue end cmd_thread.join send_msg(socket, send_msg_mutex, cmd_id, "FINISHED") end end |
#run_websocket(socket, initial_request) ⇒ Object
run_websocket
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/easel/websocket.rb', line 78 def run_websocket(socket, initial_request) accept_connection(socket, initial_request[:fields][:Sec_WebSocket_Key][0..-3]) log_info "Accepted WebSocket Connection" child_threads = {} send_msg_mutex = Mutex.new # One mutex per websocket to control sending messages. Thread.new { # Periodically update the generic dashboard if set. loop do #begin data = read_data send_msg(socket, send_msg_mutex, nil, "DASH", data) #rescue Errno::EPIPE # log_info "Pipe closed erorr while sending periodic data to client" # break #end sleep $config[:collect_data_period] end } unless $config[:collect_data_period] == 0 begin loop { begin msg = receive_msg socket rescue Errno::ECONNRESET => e log_error "Client reset the connection" socket.close msg = nil end break if msg.nil? # The socket was closed by the client. case msg.split(":")[1] when "RUN" cmd_id = msg.match(/^RUN:(.*)$/)[1].to_i unless child_threads[cmd_id] child_threads[cmd_id] = Thread.new do run_command_and_stream(socket, cmd_id, send_msg_mutex) child_threads[cmd_id] = nil end end when "STOP" cmd_id = msg.match(/^STOP:(.*)$/)[1].to_i unless child_threads[cmd_id].nil? child_threads[cmd_id].kill child_threads[cmd_id] = nil end else log_error "Received an unrecognized message over the websocket: #{msg}" end } rescue Exception => e log_error "Wuh Woh #2: #{e}" e.backtrace.each { |trace| log_error "#{trace}" } raise e end end |
#send_frame(socket, msg) ⇒ Object
send_frame
Sends a message over the websocket. Requires that the message be an appropriate length, and have the right format (eg. checks should be done before calling this function). TODO: Figure out the proper frame size (MAX_WS_FRAME_SIZE).
339 340 341 342 343 344 345 346 347 |
# File 'lib/easel/websocket.rb', line 339 def send_frame(socket, msg) output = [0b10000001, msg.size, msg] begin socket.write output.pack("CCA#{msg.size}") rescue IOError, Errno::EPIPE log_error "WebSocket is closed. Msg: #{msg}" end end |
#send_msg(socket, send_msg_mutex, cmd_id, msg_type, msg = nil) ⇒ Object
send_msg
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/easel/websocket.rb', line 258 def send_msg(socket, send_msg_mutex, cmd_id, msg_type, msg=nil) case msg_type when "OUT", "ERR" # See comments at the top of the file to explain this part of the protocol. header = "#{cmd_id}:#{msg_type}:" if header.length > MAX_WS_FRAME_SIZE log_error "Message header '#{msg_type}' is too long. Msg: #{msg}." elsif msg.nil? log_error "Message of type '#{msg_type}' sent without a message." else send_msg_mutex.synchronize { if msg.length > MAX_WS_FRAME_SIZE - header.length msg_part_len = MAX_WS_FRAME_SIZE - header.length msg_parts = (0..(msg.length-1)/msg_part_len).map{ |i| msg[i*msg_part_len,msg_part_len] } msg_parts.each{ |part| send_frame(socket, header + part) } else send_frame(socket, header + msg) end } end when "DASH" # See comments at the top of the file to explain this part of the protocol. if msg.nil? log_error "Message of type '#{msg_type}' sent without a message." end msg.each_key { |dash_id| msg[dash_id].each_key { |ele_index| did = "#{dash_id}#{ele_index}" send_msg_mutex.synchronize { msg[dash_id][ele_index].each_key { |key| data_fragment = "#{key}->#{msg[dash_id][ele_index][key]}" if data_fragment.length > MAX_WS_FRAME_SIZE - (did.length + 3) msg_part_len = MAX_WS_FRAME_SIZE - (did.length + 7) # TODO: Handle case where header is longer than DID:XX/XX: msg_parts = (0..(data_fragment.length-1)/msg_part_len).map{ |i| data_fragment[i*msg_part_len,msg_part_len] } msg_parts.each_with_index{ |part, index| header = did + ":#{index + 1}/#{msg_parts.length}:" if header.length > MAX_WS_FRAME_SIZE log_error "Message header '#{msg_type}' is too long. Data: #{data_fragment}." end send_frame(socket, header + part) } else send_frame(socket, did + ":A:" + data_fragment) end } } } } when "CLEAR", "FINISHED" if !msg.nil? log_error "Message of type '#{msg_type}' passed an empty message. Msg: #{msg}." end to_send = "#{cmd_id}:#{msg_type}" send_msg_mutex.synchronize { if to_send.length > MAX_WS_FRAME_SIZE log_error "Message of type '#{msg_type}' is too long. Msg: #{to_send}." else send_frame(socket, to_send) end } else log_error "Trying to send a websocket message with unrecognized type: #{msg_type}" end log_info "Message sent via WebSocket: #{msg}" end |
#write_data(data) ⇒ Object
write_data
Writes the data collected to @collected_data, and handles the Readers/Writer problem by using semaphores and a mutex.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/easel/data_gathering.rb', line 84 def write_data data joined = false until joined @join_mutex.synchronize { if @readers_semaphore.available_permits == 0 and @writers_semaphore.available_permits == 0 @writers_semaphore.release 1 # Increment @writers_semaphore joined = true end } sleep 0.05 # Wait 50ms to give another thread time to lock the @join_mutex. end # Write Data data.each_key { |key| case key when :load @collected_data[:load] = [] if @collected_data[:load].nil? @collected_data[:load] << data[:load] else @collected_data[key] = data[key] end } # Log if @collected_data has gotten too large. if not @collected_data[:load].nil? and @collected_data[:load].length > 240 if @collected_data[:load].length > 1000 log_error "Easel dashboard has run collect_data more than a 1000 times. Memory size likely to be large." else log_warning "Easel dashboard starting to take up a lot of memory due to data_collection being turned on." end end @writers_semaphore.acquire 1 # Decrement @writers_semaphore end |