Class: Miner
- Inherits:
-
Object
- Object
- Miner
- Defined in:
- lib/fileminer/miner.rb
Constant Summary collapse
- DEFAULTS =
{ work_dir: '/var/lib/fileminer', max_eof_files: 20, eof_seconds: 86400, batch_lines: 200, }
Instance Attribute Summary collapse
-
#active_files ⇒ Object
readonly
Returns the value of attribute active_files.
-
#batch_lines ⇒ Object
readonly
Returns the value of attribute batch_lines.
-
#eof_seconds ⇒ Object
readonly
Returns the value of attribute eof_seconds.
-
#files ⇒ Object
readonly
Returns the value of attribute files.
-
#paths ⇒ Object
readonly
Returns the value of attribute paths.
-
#registry_path ⇒ Object
readonly
Returns the value of attribute registry_path.
Instance Method Summary collapse
- #files_need_refresh?(refresh_files_time_trigger) ⇒ Boolean
-
#initialize(options = {}) ⇒ Miner
constructor
Create a new file miner instance.
-
#read_lines(record) ⇒ Object
Read lines.
-
#refresh_files ⇒ Object
Refresh.
-
#save_history ⇒ Object
Save history.
-
#save_registry ⇒ Object
Save registry.
-
#save_work_status ⇒ Object
Save work status.
Constructor Details
#initialize(options = {}) ⇒ Miner
Create a new file miner instance
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/fileminer/miner.rb', line 25 def initialize( = {}) # fix options by DEFAULTS DEFAULTS.each { |k, v| [k] = v unless .key? k } @registry_path = [:registry_path] if @registry_path.nil? @registry_path = "#{options[:work_dir]}/registry" end @history_path = [:history_path] if @history_path.nil? @history_path = "#{options[:work_dir]}/history" end @max_eof_files = [:max_eof_files] @paths = [:paths] @eof_seconds = [:eof_seconds] @batch_lines = [:batch_lines] @host = [:host] if @host.nil? require 'socket' @host = Socket.gethostname end @files = [] @active_files = [] @history = [] if File.exist? @history_path @history = File.open(@history_path) { |io| JSON.parse(io.read) } else parent_dir = File.dirname @history_path Dir.mkdirs parent_dir unless Dir.exist? parent_dir end if File.exist? @registry_path @files = File.open(@registry_path) { |io| JSON.parse(io.read, {symbolize_names: true}) } @active_files = @files.select { |record| !record[:eof] } else parent_dir = File.dirname @registry_path Dir.mkdirs parent_dir unless Dir.exist? parent_dir end end |
Instance Attribute Details
#active_files ⇒ Object (readonly)
Returns the value of attribute active_files.
15 16 17 |
# File 'lib/fileminer/miner.rb', line 15 def active_files @active_files end |
#batch_lines ⇒ Object (readonly)
Returns the value of attribute batch_lines.
15 16 17 |
# File 'lib/fileminer/miner.rb', line 15 def batch_lines @batch_lines end |
#eof_seconds ⇒ Object (readonly)
Returns the value of attribute eof_seconds.
15 16 17 |
# File 'lib/fileminer/miner.rb', line 15 def eof_seconds @eof_seconds end |
#files ⇒ Object (readonly)
Returns the value of attribute files.
15 16 17 |
# File 'lib/fileminer/miner.rb', line 15 def files @files end |
#paths ⇒ Object (readonly)
Returns the value of attribute paths.
15 16 17 |
# File 'lib/fileminer/miner.rb', line 15 def paths @paths end |
#registry_path ⇒ Object (readonly)
Returns the value of attribute registry_path.
15 16 17 |
# File 'lib/fileminer/miner.rb', line 15 def registry_path @registry_path end |
Instance Method Details
#files_need_refresh?(refresh_files_time_trigger) ⇒ Boolean
126 127 128 |
# File 'lib/fileminer/miner.rb', line 126 def files_need_refresh?(refresh_files_time_trigger) Time.now - @files_refresh_time >= refresh_files_time_trigger end |
#read_lines(record) ⇒ Object
Read lines
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/fileminer/miner.rb', line 101 def read_lines(record) file_path = record[:path] File.open file_path do |io| lines = [] io.pos = record[:pos] while lines.size < @batch_lines line = {host: @host, path: file_path, pos: io.pos} begin data = io.readline break if data.nil? if data[-1] != "\n" io.pos = line[:pos] break end rescue EOFError break end line[:end] = io.pos line[:data] = data lines << line end lines end end |
#refresh_files ⇒ Object
Refresh
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fileminer/miner.rb', line 80 def refresh_files now = Time.now file_paths = Set.new file_paths.merge Dir[*@paths].select { |path| File.file? path } active_files, eof_size = select_active_files now, file_paths if eof_size > @max_eof_files move_eof_to_history end history_files = Set.new @history file_paths.select do |path| ! history_files.member? path end.each do |path| record = {path: path, pos: 0, eof: false} @files << record active_files << record end @active_files = active_files @files_refresh_time = now end |
#save_history ⇒ Object
Save history
75 76 77 |
# File 'lib/fileminer/miner.rb', line 75 def save_history File.open(@history_path, 'w') { |io| io.write @history.to_json } end |
#save_registry ⇒ Object
Save registry
70 71 72 |
# File 'lib/fileminer/miner.rb', line 70 def save_registry File.open(@registry_path, 'w') { |io| io.write @files.to_json } end |
#save_work_status ⇒ Object
Save work status
64 65 66 67 |
# File 'lib/fileminer/miner.rb', line 64 def save_work_status save_history save_registry end |