Class: Fluent::HashForwardOutput
- Inherits:
-
ForwardOutput
- Object
- ForwardOutput
- Fluent::HashForwardOutput
- Defined in:
- lib/fluent/plugin/out_hash_forward.rb
Instance Attribute Summary collapse
-
#hash_key_slice_lindex ⇒ Object
Returns the value of attribute hash_key_slice_lindex.
-
#hash_key_slice_rindex ⇒ Object
Returns the value of attribute hash_key_slice_rindex.
-
#regular_nodes ⇒ Object
readonly
for test.
-
#regular_weight_array ⇒ Object
readonly
Returns the value of attribute regular_weight_array.
-
#standby_nodes ⇒ Object
readonly
Returns the value of attribute standby_nodes.
-
#standby_weight_array ⇒ Object
readonly
Returns the value of attribute standby_weight_array.
-
#watcher_interval ⇒ Object
Returns the value of attribute watcher_interval.
Instance Method Summary collapse
-
#build_weight_array(nodes) ⇒ Object
This is just a partial copy from ForwardOuput#rebuild_weight_array.
- #configure(conf) ⇒ Object
-
#get_index(key, size) ⇒ Object
hashing(key) mod N.
- #get_mutex(node) ⇒ Object
- #get_sock ⇒ Object
- #get_sock_expired_at ⇒ Object
-
#nodes(tag) ⇒ Object
Get nodes (a regular_node and a standby_node if available) using hash algorithm.
- #perform_hash_key_slice(tag) ⇒ Object
-
#rebuild_weight_array ⇒ Object
Override: I change weight algorithm.
- #reconnect(node) ⇒ Object
-
#send_data(node, tag, chunk) ⇒ Object
Override for keepalive.
- #shutdown ⇒ Object
- #sock_write(sock, tag, chunk) ⇒ Object
- #start ⇒ Object
- #start_watcher ⇒ Object
- #stop_watcher ⇒ Object
-
#str_hash(key) ⇒ Object
the simplest hashing ever gist.github.com/sonots/7263495.
-
#watch_keepalive_time ⇒ Object
watcher thread callback.
-
#write_objects(tag, chunk) ⇒ Object
Override.
Instance Attribute Details
#hash_key_slice_lindex ⇒ Object
Returns the value of attribute hash_key_slice_lindex.
43 44 45 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 43 def hash_key_slice_lindex @hash_key_slice_lindex end |
#hash_key_slice_rindex ⇒ Object
Returns the value of attribute hash_key_slice_rindex.
44 45 46 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 44 def hash_key_slice_rindex @hash_key_slice_rindex end |
#regular_nodes ⇒ Object (readonly)
for test
39 40 41 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 39 def regular_nodes @regular_nodes end |
#regular_weight_array ⇒ Object (readonly)
Returns the value of attribute regular_weight_array.
41 42 43 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 41 def regular_weight_array @regular_weight_array end |
#standby_nodes ⇒ Object (readonly)
Returns the value of attribute standby_nodes.
40 41 42 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 40 def standby_nodes @standby_nodes end |
#standby_weight_array ⇒ Object (readonly)
Returns the value of attribute standby_weight_array.
42 43 44 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 42 def standby_weight_array @standby_weight_array end |
#watcher_interval ⇒ Object
Returns the value of attribute watcher_interval.
45 46 47 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 45 def watcher_interval @watcher_interval end |
Instance Method Details
#build_weight_array(nodes) ⇒ Object
This is just a partial copy from ForwardOuput#rebuild_weight_array
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 101 def build_weight_array(nodes) weight_array = [] gcd = nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) } nodes.each {|n| (n.weight / gcd).times { weight_array << n } } weight_array end |
#configure(conf) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 15 def configure(conf) super if @hash_key_slice lindex, rindex = @hash_key_slice.split('..', 2) if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/ raise Fluent::ConfigError, "out_hash_forard: hash_key_slice must be formatted like [num]..[num]" else @hash_key_slice_lindex = lindex.to_i @hash_key_slice_rindex = rindex.to_i end end @standby_nodes, @regular_nodes = @nodes.partition {|n| n.standby? } @regular_weight_array = build_weight_array(@regular_nodes) @standby_weight_array = build_weight_array(@standby_nodes) @cache_nodes = {} @sock = {} @sock_expired_at = {} @mutex = {} @watcher_interval = 1 end |
#get_index(key, size) ⇒ Object
hashing(key) mod N
125 126 127 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 125 def get_index(key, size) str_hash(key) % size end |
#get_mutex(node) ⇒ Object
219 220 221 222 223 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 219 def get_mutex(node) thread_id = Thread.current.object_id @mutex[thread_id] ||= {} @mutex[thread_id][node] ||= Mutex.new end |
#get_sock ⇒ Object
225 226 227 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 225 def get_sock @sock[Thread.current.object_id] ||= {} end |
#get_sock_expired_at ⇒ Object
229 230 231 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 229 def get_sock_expired_at @sock_expired_at[Thread.current.object_id] ||= {} end |
#nodes(tag) ⇒ Object
Get nodes (a regular_node and a standby_node if available) using hash algorithm
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 113 def nodes(tag) if nodes = @cache_nodes[tag] return nodes end hash_key = @hash_key_slice ? perform_hash_key_slice(tag) : tag regular_index = @regular_weight_array.size > 0 ? get_index(hash_key, @regular_weight_array.size) : 0 standby_index = @standby_weight_array.size > 0 ? get_index(hash_key, @standby_weight_array.size) : 0 nodes = [@regular_weight_array[regular_index], @standby_weight_array[standby_index]].compact @cache_nodes[tag] = nodes end |
#perform_hash_key_slice(tag) ⇒ Object
135 136 137 138 139 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 135 def perform_hash_key_slice(tag) = tag.split('.') sliced = [@hash_key_slice_lindex..@hash_key_slice_rindex] return sliced.nil? ? "" : sliced.join('.') end |
#rebuild_weight_array ⇒ Object
Override: I change weight algorithm
97 98 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 97 def rebuild_weight_array end |
#reconnect(node) ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 160 def reconnect(node) sock = connect(node) opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) if @keepalive get_sock[node] = sock get_sock_expired_at[node] = Time.now + @keepalive_time if @keepalive_time end sock end |
#send_data(node, tag, chunk) ⇒ Object
Override for keepalive
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 142 def send_data(node, tag, chunk) get_mutex(node).synchronize do sock = get_sock[node] unless sock sock = reconnect(node) end begin sock_write(sock, tag, chunk) node.heartbeat(false) rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ETIMEDOUT => e log.warn "out_hash_forward: #{e.class} #{e.}" sock = reconnect(node) retry end end end |
#shutdown ⇒ Object
52 53 54 55 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 52 def shutdown super stop_watcher end |
#sock_write(sock, tag, chunk) ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 176 def sock_write(sock, tag, chunk) # beginArray(2) sock.write FORWARD_HEADER # writeRaw(tag) sock.write tag.to_msgpack # tag # beginRaw(size) sz = chunk.size #if sz < 32 # # FixRaw # sock.write [0xa0 | sz].pack('C') #elsif sz < 65536 # # raw 16 # sock.write [0xda, sz].pack('Cn') #else # raw 32 sock.write [0xdb, sz].pack('CN') #end # writeRawBody(packed_es) chunk.write_to(sock) end |
#start ⇒ Object
47 48 49 50 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 47 def start super start_watcher end |
#start_watcher ⇒ Object
57 58 59 60 61 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 57 def start_watcher if @keepalive and @keepalive_time @watcher = Thread.new(&method(:watch_keepalive_time)) end end |
#stop_watcher ⇒ Object
63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 63 def stop_watcher if @watcher @watcher.terminate @watcher.join end end |
#str_hash(key) ⇒ Object
the simplest hashing ever gist.github.com/sonots/7263495
131 132 133 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 131 def str_hash(key) key.bytes.inject(&:+) end |
#watch_keepalive_time ⇒ Object
watcher thread callback
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 201 def watch_keepalive_time while true sleep @watcher_interval thread_ids = @sock.keys thread_ids.each do |thread_id| @sock[thread_id].each do |node, sock| @mutex[thread_id][node].synchronize do next unless sock_expired_at = @sock_expired_at[thread_id][node] next unless Time.now >= sock_expired_at sock.close rescue IOError if sock @sock[thread_id][node] = nil @sock_expired_at[thread_id][node] = nil end end end end end |
#write_objects(tag, chunk) ⇒ Object
Override
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fluent/plugin/out_hash_forward.rb', line 71 def write_objects(tag, chunk) return if chunk.empty? error = nil nodes = nodes(tag) # below is just copy from out_forward nodes.each do |node| if node.available? begin send_data(node, tag, chunk) return rescue # for load balancing during detecting crashed servers error = $! # use the latest error end end end if error raise error else raise "no nodes are available" # TODO message end end |