Class: Fluent::HashForwardOutput

Inherits:
ForwardOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_hash_forward.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#hash_key_slice_lindexObject

Returns the value of attribute hash_key_slice_lindex.



43
44
45
# File 'lib/fluent/plugin/out_hash_forward.rb', line 43

def hash_key_slice_lindex
  @hash_key_slice_lindex
end

#hash_key_slice_rindexObject

Returns the value of attribute hash_key_slice_rindex.



44
45
46
# File 'lib/fluent/plugin/out_hash_forward.rb', line 44

def hash_key_slice_rindex
  @hash_key_slice_rindex
end

#regular_nodesObject (readonly)

for test



39
40
41
# File 'lib/fluent/plugin/out_hash_forward.rb', line 39

def regular_nodes
  @regular_nodes
end

#regular_weight_arrayObject (readonly)

Returns the value of attribute regular_weight_array.



41
42
43
# File 'lib/fluent/plugin/out_hash_forward.rb', line 41

def regular_weight_array
  @regular_weight_array
end

#standby_nodesObject (readonly)

Returns the value of attribute standby_nodes.



40
41
42
# File 'lib/fluent/plugin/out_hash_forward.rb', line 40

def standby_nodes
  @standby_nodes
end

#standby_weight_arrayObject (readonly)

Returns the value of attribute standby_weight_array.



42
43
44
# File 'lib/fluent/plugin/out_hash_forward.rb', line 42

def standby_weight_array
  @standby_weight_array
end

#watcher_intervalObject

Returns the value of attribute watcher_interval.



45
46
47
# File 'lib/fluent/plugin/out_hash_forward.rb', line 45

def watcher_interval
  @watcher_interval
end

Instance Method Details

#build_weight_array(nodes) ⇒ Object

This is just a partial copy from ForwardOuput#rebuild_weight_array



101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/out_hash_forward.rb', line 101

def build_weight_array(nodes)
  weight_array = []
  gcd = nodes.map {|n| n.weight }.inject(0) {|r,w| r.gcd(w) }
  nodes.each {|n|
    (n.weight / gcd).times {
      weight_array << n
    }
  }
  weight_array
end

#configure(conf) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/fluent/plugin/out_hash_forward.rb', line 15

def configure(conf)
  super
  if @hash_key_slice
    lindex, rindex = @hash_key_slice.split('..', 2)
    if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/
      raise Fluent::ConfigError, "out_hash_forard: hash_key_slice must be formatted like [num]..[num]"
    else
      @hash_key_slice_lindex = lindex.to_i
      @hash_key_slice_rindex = rindex.to_i
    end
  end

  @standby_nodes, @regular_nodes = @nodes.partition {|n| n.standby? }
  @regular_weight_array = build_weight_array(@regular_nodes)
  @standby_weight_array = build_weight_array(@standby_nodes)

  @cache_nodes = {}
  @sock = {}
  @sock_expired_at = {}
  @mutex = {}
  @watcher_interval = 1
end

#get_index(key, size) ⇒ Object

hashing(key) mod N



125
126
127
# File 'lib/fluent/plugin/out_hash_forward.rb', line 125

def get_index(key, size)
  str_hash(key) % size
end

#get_mutex(node) ⇒ Object



219
220
221
222
223
# File 'lib/fluent/plugin/out_hash_forward.rb', line 219

def get_mutex(node)
  thread_id = Thread.current.object_id
  @mutex[thread_id] ||= {}
  @mutex[thread_id][node] ||= Mutex.new
end

#get_sockObject



225
226
227
# File 'lib/fluent/plugin/out_hash_forward.rb', line 225

def get_sock
  @sock[Thread.current.object_id] ||= {}
end

#get_sock_expired_atObject



229
230
231
# File 'lib/fluent/plugin/out_hash_forward.rb', line 229

def get_sock_expired_at
  @sock_expired_at[Thread.current.object_id] ||= {}
end

#nodes(tag) ⇒ Object

Get nodes (a regular_node and a standby_node if available) using hash algorithm



113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_hash_forward.rb', line 113

def nodes(tag)
  if nodes = @cache_nodes[tag]
    return nodes
  end
  hash_key = @hash_key_slice ? perform_hash_key_slice(tag) : tag
  regular_index = @regular_weight_array.size > 0 ? get_index(hash_key, @regular_weight_array.size) : 0
  standby_index = @standby_weight_array.size > 0 ? get_index(hash_key, @standby_weight_array.size) : 0
  nodes = [@regular_weight_array[regular_index], @standby_weight_array[standby_index]].compact
  @cache_nodes[tag] = nodes
end

#perform_hash_key_slice(tag) ⇒ Object



135
136
137
138
139
# File 'lib/fluent/plugin/out_hash_forward.rb', line 135

def perform_hash_key_slice(tag)
  tags = tag.split('.')
  sliced = tags[@hash_key_slice_lindex..@hash_key_slice_rindex]
  return sliced.nil? ? "" : sliced.join('.')
end

#rebuild_weight_arrayObject

Override: I change weight algorithm



97
98
# File 'lib/fluent/plugin/out_hash_forward.rb', line 97

def rebuild_weight_array
end

#reconnect(node) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/fluent/plugin/out_hash_forward.rb', line 160

def reconnect(node)
  sock = connect(node)
  opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

  opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  if @keepalive
    get_sock[node] = sock
    get_sock_expired_at[node] = Time.now + @keepalive_time if @keepalive_time
  end

  sock
end

#send_data(node, tag, chunk) ⇒ Object

Override for keepalive



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/fluent/plugin/out_hash_forward.rb', line 142

def send_data(node, tag, chunk)
  get_mutex(node).synchronize do
    sock = get_sock[node]
    unless sock
      sock = reconnect(node)
    end

    begin
      sock_write(sock, tag, chunk)
      node.heartbeat(false)
    rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ETIMEDOUT => e
      log.warn "out_hash_forward: #{e.class} #{e.message}"
      sock = reconnect(node)
      retry
    end
  end
end

#shutdownObject



52
53
54
55
# File 'lib/fluent/plugin/out_hash_forward.rb', line 52

def shutdown
  super
  stop_watcher
end

#sock_write(sock, tag, chunk) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/fluent/plugin/out_hash_forward.rb', line 176

def sock_write(sock, tag, chunk)
  # beginArray(2)
  sock.write FORWARD_HEADER

  # writeRaw(tag)
  sock.write tag.to_msgpack  # tag

  # beginRaw(size)
  sz = chunk.size
  #if sz < 32
  #  # FixRaw
  #  sock.write [0xa0 | sz].pack('C')
  #elsif sz < 65536
  #  # raw 16
  #  sock.write [0xda, sz].pack('Cn')
  #else
  # raw 32
  sock.write [0xdb, sz].pack('CN')
  #end

  # writeRawBody(packed_es)
  chunk.write_to(sock)
end

#startObject



47
48
49
50
# File 'lib/fluent/plugin/out_hash_forward.rb', line 47

def start
  super
  start_watcher
end

#start_watcherObject



57
58
59
60
61
# File 'lib/fluent/plugin/out_hash_forward.rb', line 57

def start_watcher
  if @keepalive and @keepalive_time
    @watcher = Thread.new(&method(:watch_keepalive_time))
  end
end

#stop_watcherObject



63
64
65
66
67
68
# File 'lib/fluent/plugin/out_hash_forward.rb', line 63

def stop_watcher
  if @watcher
    @watcher.terminate
    @watcher.join
  end
end

#str_hash(key) ⇒ Object

the simplest hashing ever gist.github.com/sonots/7263495



131
132
133
# File 'lib/fluent/plugin/out_hash_forward.rb', line 131

def str_hash(key)
  key.bytes.inject(&:+)
end

#watch_keepalive_timeObject

watcher thread callback



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/fluent/plugin/out_hash_forward.rb', line 201

def watch_keepalive_time
  while true
    sleep @watcher_interval
    thread_ids = @sock.keys
    thread_ids.each do |thread_id|
      @sock[thread_id].each do |node, sock|
        @mutex[thread_id][node].synchronize do
          next unless sock_expired_at = @sock_expired_at[thread_id][node]
          next unless Time.now >= sock_expired_at
          sock.close rescue IOError if sock
          @sock[thread_id][node] = nil
          @sock_expired_at[thread_id][node] = nil
        end
      end
    end
  end
end

#write_objects(tag, chunk) ⇒ Object

Override



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/out_hash_forward.rb', line 71

def write_objects(tag, chunk)
  return if chunk.empty?
  error = nil
  nodes = nodes(tag)

  # below is just copy from out_forward
  nodes.each do |node|
    if node.available?
      begin
        send_data(node, tag, chunk)
        return
      rescue
        # for load balancing during detecting crashed servers
        error = $!  # use the latest error
      end
    end
  end

  if error
    raise error
  else
    raise "no nodes are available"  # TODO message
  end
end