Class: Miner

Inherits:
Object
  • Object
show all
Defined in:
lib/fileminer/miner.rb

Constant Summary collapse

DEFAULTS =
{
  work_dir: '/var/lib/fileminer',
  max_eof_files: 20,
  eof_seconds: 86400,
  batch_lines: 200,
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Miner

Create a new file miner instance

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :registry_path (String) — default: /var/lib/fileminer/registry
  • :paths (Array)
  • :eof_seconds (Integer) — default: 86400
  • :batch_lines (Integer) — default: 50
  • :host (String) — default: Socket.gethostname


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/fileminer/miner.rb', line 25

def initialize(options = {})
  # fix options by DEFAULTS
  DEFAULTS.each { |k, v| options[k] = v unless options.key? k }
  @registry_path = options[:registry_path]
  if @registry_path.nil?
    @registry_path = "#{options[:work_dir]}/registry"
  end
  @history_path = options[:history_path]
  if @history_path.nil?
    @history_path = "#{options[:work_dir]}/history"
  end
  @max_eof_files = options[:max_eof_files]
  @paths = options[:paths]
  @eof_seconds = options[:eof_seconds]
  @batch_lines = options[:batch_lines]
  @host = options[:host]
  if @host.nil?
    require 'socket'
    @host = Socket.gethostname
  end
  @files = []
  @active_files = []
  @history = []
  if File.exist? @history_path
    @history = File.open(@history_path) { |io| JSON.parse(io.read) }
  else
    parent_dir = File.dirname @history_path
    Dir.mkdirs parent_dir unless Dir.exist? parent_dir
  end
  if File.exist? @registry_path
    @files = File.open(@registry_path) { |io| JSON.parse(io.read, {symbolize_names: true}) }
    @active_files = @files.select { |record| !record[:eof] }
  else
    parent_dir = File.dirname @registry_path
    Dir.mkdirs parent_dir unless Dir.exist? parent_dir
  end
end

Instance Attribute Details

#active_filesObject (readonly)

Returns the value of attribute active_files.



15
16
17
# File 'lib/fileminer/miner.rb', line 15

def active_files
  @active_files
end

#batch_linesObject (readonly)

Returns the value of attribute batch_lines.



15
16
17
# File 'lib/fileminer/miner.rb', line 15

def batch_lines
  @batch_lines
end

#eof_secondsObject (readonly)

Returns the value of attribute eof_seconds.



15
16
17
# File 'lib/fileminer/miner.rb', line 15

def eof_seconds
  @eof_seconds
end

#filesObject (readonly)

Returns the value of attribute files.



15
16
17
# File 'lib/fileminer/miner.rb', line 15

def files
  @files
end

#pathsObject (readonly)

Returns the value of attribute paths.



15
16
17
# File 'lib/fileminer/miner.rb', line 15

def paths
  @paths
end

#registry_pathObject (readonly)

Returns the value of attribute registry_path.



15
16
17
# File 'lib/fileminer/miner.rb', line 15

def registry_path
  @registry_path
end

Instance Method Details

#files_need_refresh?(refresh_files_time_trigger) ⇒ Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/fileminer/miner.rb', line 126

def files_need_refresh?(refresh_files_time_trigger)
  Time.now - @files_refresh_time >= refresh_files_time_trigger
end

#read_lines(record) ⇒ Object

Read lines



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fileminer/miner.rb', line 101

def read_lines(record)
  file_path = record[:path]
  File.open file_path do |io|
    lines = []
    io.pos = record[:pos]
    while lines.size < @batch_lines
      line = {host: @host, path: file_path, pos: io.pos}
      begin
        data = io.readline
        break if data.nil?
        if data[-1] != "\n"
          io.pos = line[:pos]
          break
        end
      rescue EOFError
        break
      end
      line[:end] = io.pos
      line[:data] = data
      lines << line
    end
    lines
  end
end

#refresh_filesObject

Refresh



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fileminer/miner.rb', line 80

def refresh_files
  now = Time.now
  file_paths = Set.new
  file_paths.merge Dir[*@paths].select { |path| File.file? path }
  active_files, eof_size = select_active_files now, file_paths
  if eof_size > @max_eof_files
    move_eof_to_history
  end
  history_files = Set.new @history
  file_paths.select do |path|
    ! history_files.member? path
  end.each do |path|
    record = {path: path, pos: 0, eof: false}
    @files << record
    active_files << record
  end
  @active_files = active_files
  @files_refresh_time = now
end

#save_historyObject

Save history



75
76
77
# File 'lib/fileminer/miner.rb', line 75

def save_history
  File.open(@history_path, 'w') { |io| io.write @history.to_json }
end

#save_registryObject

Save registry



70
71
72
# File 'lib/fileminer/miner.rb', line 70

def save_registry
  File.open(@registry_path, 'w') { |io| io.write @files.to_json }
end

#save_work_statusObject

Save work status



64
65
66
67
# File 'lib/fileminer/miner.rb', line 64

def save_work_status
  save_history
  save_registry
end