Class: MappedStream
Overview
Read from a stream. Write data to a temporary file, which is mmap. Read more data from stream on a need basis, when some index operation fail.
Constant Summary
collapse
- DEFAULTS =
{
:buf_size => 64*1024,
:tmp_dir => Dir.tmpdir,
}
Instance Attribute Summary collapse
Instance Method Summary
collapse
#count_lines_upto, #parse_header
Constructor Details
#initialize(fd, args = {}) ⇒ MappedStream
Returns a new instance of MappedStream.
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/cless/data.rb', line 51
def initialize(fd, args = {})
@fd = fd
flags = fd.fcntl(Fcntl::F_GETFL)
fd.fcntl(Fcntl::F_SETFL, flags | Fcntl::O_NONBLOCK)
@more = true
@buf = ""
@lines = nil
DEFAULTS.each { |k, v|
instance_variable_set("@#{k}", args[k] || v)
}
if false
@tfd = Tempfile.new(Process.pid.to_s, @tmp_dir)
@ptr = Mmap.new(@tfd.path, "w")
@ptr.extend(10 * @buf_size)
else
@ptr = ""
end
if block_given?
begin
yield(self)
ensure
munmap
end
end
end
|
Instance Attribute Details
#fd ⇒ Object
Returns the value of attribute fd.
50
51
52
|
# File 'lib/cless/data.rb', line 50
def fd
@fd
end
|
#more ⇒ Object
Returns the value of attribute more.
50
51
52
|
# File 'lib/cless/data.rb', line 50
def more
@more
end
|
#ptr ⇒ Object
Returns the value of attribute ptr.
50
51
52
|
# File 'lib/cless/data.rb', line 50
def ptr
@ptr
end
|
Instance Method Details
#[](*args) ⇒ Object
124
|
# File 'lib/cless/data.rb', line 124
def [](*args); @ptr[*args]; end
|
#each_line ⇒ Object
145
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/cless/data.rb', line 145
def each_line
off = 0
loop do
r = @ptr.index("\n", off)
if r
yield(@ptr[off..r])
off = r + 1
else
read_block or break
end
end
end
|
#file_path ⇒ Object
80
|
# File 'lib/cless/data.rb', line 80
def file_path; @tfd ? @tfd.path : nil; end
|
#index(substr, off = 0) ⇒ Object
102
103
104
105
106
107
108
109
|
# File 'lib/cless/data.rb', line 102
def index(substr, off = 0)
loop do
r = @ptr.index(substr, off) and return r
return nil unless @more
off = (@ptr.rindex("\n", @ptr.size) || -1) + 1
read_block or return nil
end
end
|
#lines(line_stop = nil, offset_stop = nil) ⇒ Object
Get the total number of lines Stop if line_stop or offset_stop limits are crossed.
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# File 'lib/cless/data.rb', line 128
def lines(line_stop = nil, offset_stop = nil)
return @lines unless @more || @lines.nil?
lines = @ptr.count("\n")
while @more
unless read_block
select_or_cancel(@fd) or break
next
end
lines += @buf.count("\n")
return lines if line_stop && lines >= line_stop
return @ptr.size if offset_stop && @ptr.size >= offset_stop
end
@lines = lines
@lines += 1 if @ptr[-1] != ?\n
return @lines
end
|
#more_fd ⇒ Object
120
121
122
|
# File 'lib/cless/data.rb', line 120
def more_fd
@more ? @fd : nil
end
|
#munmap ⇒ Object
82
83
84
85
|
# File 'lib/cless/data.rb', line 82
def munmap
@ptr.munmap rescue nil
@tfd.close! rescue nil
end
|
#read_block ⇒ Object
90
91
92
93
94
95
96
97
98
99
|
# File 'lib/cless/data.rb', line 90
def read_block
@fd.read_nonblock(@buf_size, @buf)
@ptr << @buf
true
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR => e
false
rescue EOFError
@more = false
false
end
|
#rindex(*args) ⇒ Object
88
|
# File 'lib/cless/data.rb', line 88
def rindex(*args); @ptr.rindex(*args); end
|
#search_index(substr, off = 0) ⇒ Object
112
113
114
115
116
117
118
119
|
# File 'lib/cless/data.rb', line 112
def search_index(substr, off = 0)
loop do
r = @ptr.index(substr, off) and return r
off = (@ptr.rindex("\n", @ptr.size) || -1) + 1
select_or_cancel(@fd) or return nil
read_block or return nil
end
end
|
#search_rindex(*args) ⇒ Object
111
|
# File 'lib/cless/data.rb', line 111
def search_rindex(*args); rindex(*args); end
|
#size ⇒ Object
87
|
# File 'lib/cless/data.rb', line 87
def size; @ptr.size; end
|