Class: MappedStream

Inherits:
Object
  • Object
show all
Includes:
MappedCommon
Defined in:
lib/cless/data.rb

Overview

Read from a stream. Write data to a temporary file, which is mmap. Read more data from stream on a need basis, when some index operation fail.

Constant Summary collapse

DEFAULTS =
{
  :buf_size => 64*1024,
  :tmp_dir => Dir.tmpdir,
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MappedCommon

#count_lines_upto, #parse_header

Constructor Details

#initialize(fd, args = {}) ⇒ MappedStream

Returns a new instance of MappedStream.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/cless/data.rb', line 51

def initialize(fd, args = {})
  @fd = fd
  flags = fd.fcntl(Fcntl::F_GETFL)
  fd.fcntl(Fcntl::F_SETFL, flags | Fcntl::O_NONBLOCK)
  @more = true
  @buf = ""
  @lines = nil

  DEFAULTS.each { |k, v|
    instance_variable_set("@#{k}", args[k] || v)
  }
  if false
#    if $have_mmap
    @tfd = Tempfile.new(Process.pid.to_s, @tmp_dir)
    @ptr = Mmap.new(@tfd.path, "w")
    @ptr.extend(10 * @buf_size)
  else
    @ptr = ""
  end

  if block_given?
    begin
      yield(self)
    ensure
      munmap
    end
  end
end

Instance Attribute Details

#fdObject (readonly)

Returns the value of attribute fd.



50
51
52
# File 'lib/cless/data.rb', line 50

def fd
  @fd
end

#moreObject (readonly)

Returns the value of attribute more.



50
51
52
# File 'lib/cless/data.rb', line 50

def more
  @more
end

#ptrObject (readonly)

Returns the value of attribute ptr.



50
51
52
# File 'lib/cless/data.rb', line 50

def ptr
  @ptr
end

Instance Method Details

#[](*args) ⇒ Object



124
# File 'lib/cless/data.rb', line 124

def [](*args); @ptr[*args]; end

#each_lineObject



145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/cless/data.rb', line 145

def each_line
  off = 0
  loop do
    r = @ptr.index("\n", off)
    if r
      yield(@ptr[off..r])
      off = r + 1
    else
      read_block or break
    end
  end
end

#file_pathObject



80
# File 'lib/cless/data.rb', line 80

def file_path; @tfd ? @tfd.path : nil; end

#index(substr, off = 0) ⇒ Object



102
103
104
105
106
107
108
109
# File 'lib/cless/data.rb', line 102

def index(substr, off = 0)
  loop do
    r = @ptr.index(substr, off) and return r
    return nil unless @more
    off = (@ptr.rindex("\n", @ptr.size) || -1) + 1
    read_block or return nil
  end
end

#lines(line_stop = nil, offset_stop = nil) ⇒ Object

Get the total number of lines Stop if line_stop or offset_stop limits are crossed.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/cless/data.rb', line 128

def lines(line_stop = nil, offset_stop = nil)
  return @lines unless @more || @lines.nil?
  lines = @ptr.count("\n")
  while @more
    unless read_block
      select_or_cancel(@fd) or break
      next
    end
    lines += @buf.count("\n")
    return lines if line_stop && lines >= line_stop
    return @ptr.size if offset_stop && @ptr.size >= offset_stop
  end
  @lines = lines
  @lines += 1 if @ptr[-1] != ?\n
  return @lines
end

#more_fdObject



120
121
122
# File 'lib/cless/data.rb', line 120

def more_fd
  @more ? @fd : nil
end

#munmapObject



82
83
84
85
# File 'lib/cless/data.rb', line 82

def munmap
  @ptr.munmap rescue nil
  @tfd.close! rescue nil
end

#read_blockObject



90
91
92
93
94
95
96
97
98
99
# File 'lib/cless/data.rb', line 90

def read_block
  @fd.read_nonblock(@buf_size, @buf)
  @ptr << @buf
  true
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR => e
  false
rescue EOFError
  @more = false
  false
end

#rindex(*args) ⇒ Object



88
# File 'lib/cless/data.rb', line 88

def rindex(*args); @ptr.rindex(*args); end

#search_index(substr, off = 0) ⇒ Object



112
113
114
115
116
117
118
119
# File 'lib/cless/data.rb', line 112

def search_index(substr, off = 0)
  loop do
    r = @ptr.index(substr, off) and return r
    off = (@ptr.rindex("\n", @ptr.size) || -1) + 1
    select_or_cancel(@fd) or return nil
    read_block or return nil
  end
end

#search_rindex(*args) ⇒ Object



111
# File 'lib/cless/data.rb', line 111

def search_rindex(*args); rindex(*args); end

#sizeObject



87
# File 'lib/cless/data.rb', line 87

def size; @ptr.size; end