Class: Fluent::HRForecastOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_hrforecast.rb

Defined Under Namespace

Classes: PostThread

Instance Method Summary collapse

Constructor Details

#initializeHRForecastOutput

Returns a new instance of HRForecastOutput.



6
7
8
9
10
11
# File 'lib/fluent/plugin/out_hrforecast.rb', line 6

def initialize
  super
  require 'net/http'
  require 'uri'
  require 'resolve/hostname'
end

Instance Method Details

#configure(conf) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/out_hrforecast.rb', line 44

def configure(conf)
  super

  if @hrfapi_url !~ %r|/api/\z|
    raise Fluent::ConfigError, "hrfapi_url must end with /api/"
  end
  if @graph_path !~ %r|\A[^/]+/[^/]+/[^/]+\z|
    raise Fluent::ConfigError, "graph_path #{@graph_path}  must be like 'service/section/${tag}_${key_name}'"
  end

  if @name_keys.nil? and @name_key_pattern.nil?
    raise Fluent::ConfigError, "missing both of name_keys and name_key_pattern"
  end
  if not @name_keys.nil? and not @name_key_pattern.nil?
    raise Fluent::ConfigError, "cannot specify both of name_keys and name_key_pattern"
  end

  if not @datetime_key.nil? and @datetime_key_format.nil?
    raise Fluent::ConfigError, "missing datetime_key_format"
  end

  url = URI.parse(@hrfapi_url)
  @host = url.host
  @port = url.port

  if @name_keys
    @name_keys = Hash[
      @name_keys.split(',').map{|k|
        k.split('=>',2).tap{|kv|
          kv.push(kv[0]) if kv.size == 1
        }
      }
    ]
  end

  if @name_key_pattern
    @name_key_pattern = Regexp.new(@name_key_pattern)
  end

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end

  @auth = case @authentication
          when 'basic' then :basic
          else
            :none
          end
  @resolver = Resolve::Hostname.new(:system_resolver => true)
end

#decide_time(time, record) ⇒ Object



210
211
212
213
214
215
# File 'lib/fluent/plugin/out_hrforecast.rb', line 210

def decide_time(time, record)
  if @datetime_key && record[@datetime_key]
    time = Time.strptime(record[@datetime_key], @datetime_key_format)
  end
  time
end

#emit(tag, es, chain) ⇒ Object



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/fluent/plugin/out_hrforecast.rb', line 217

def emit(tag, es, chain)
  events = []
  if @name_keys
    es.each {|time,record|
      time = decide_time(time, record)
      @name_keys.each {|key, name|
        if value = record[key]
          events.push({:tag => tag, :name => name, :value => value, :time => time})
        end
      }
    }
  else # for name_key_pattern
    es.each {|time,record|
      time = decide_time(time, record)
      record.keys.each {|key|
        if @name_key_pattern.match(key) and record[key]
          name = Regexp.last_match(1) || key
          events.push({:tag => tag, :name => name, :value => record[key], :time => time})
        end
      }
    }
  end

  if @post_thread
    @post_thread.queue << events
  else
    begin
      post_events(events)
    rescue => e
      log.warn "HTTP POST Error occures to HRforecast server", :error_class => e.class, :error => e.message
      raise if @retry
    end
  end

  chain.next
end

#format_url(tag, name) ⇒ Object



150
151
152
153
# File 'lib/fluent/plugin/out_hrforecast.rb', line 150

def format_url(tag, name)
  graph_path = @graph_path.gsub(/(\${[_a-z]+})/, placeholder_mapping(tag, name))
  return @hrfapi_url + URI.escape(graph_path)
end

#make_http_connectionObject



155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/fluent/plugin/out_hrforecast.rb', line 155

def make_http_connection()
  http = Net::HTTP.new(@resolver.getaddress(@host), @port)
  if @timeout
    http.open_timeout = @timeout
    http.read_timeout = @timeout
  end
  if @ssl
    http.use_ssl = true
    unless @verify_ssl
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
  end
  http
end

#make_post_request(tag, name, value, time) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/fluent/plugin/out_hrforecast.rb', line 170

def make_post_request(tag, name, value, time)
  url = URI.parse(format_url(tag,name))
  req = Net::HTTP::Post.new(url.path)
  if @auth and @auth == :basic
    req.basic_auth(@username, @password)
  end
  req['Host'] = url.host
  req['Connection'] = 'Keep-Alive' if @keepalive
  req.set_form_data({
    'number' => @enable_float_number ? value.to_f : value.to_i,
    'datetime' => Time.at(time).strftime(@datetime_format),
  })
  req
end

#placeholder_mapping(tag, name) ⇒ Object



142
143
144
145
146
147
148
# File 'lib/fluent/plugin/out_hrforecast.rb', line 142

def placeholder_mapping(tag, name)
  if @remove_prefix and
      ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix)
    tag = tag[@removed_length..-1]
  end
  {'${tag}' => tag, '${key_name}' => name}
end

#post_events(events) ⇒ Object



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/fluent/plugin/out_hrforecast.rb', line 185

def post_events(events)
  return if events.size < 1

  requests = events.map do |e|
    make_post_request(e[:tag], e[:name], e[:value], e[:time])
  end

  http = make_http_connection()
  requests.each do |req|
    begin
      http.start unless http.started?
      res = http.request(req)
      unless res and res.is_a?(Net::HTTPSuccess)
        log.warn "failed to post to HRforecast: #{host}:#{port}#{req.path}, post_data: #{req.body} code: #{res && res.code}"
      end
    rescue IOError, EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, SystemCallError
      log.warn "net/http POST raises exception: #{$!.class}, '#{$!.message}'"
      http.finish if http.started?
    end
    if not @keepalive and http.started?
      http.finish
    end
  end
end

#shutdownObject



135
136
137
138
139
140
# File 'lib/fluent/plugin/out_hrforecast.rb', line 135

def shutdown
  if @post_thread
    @post_thread.shutdown
  end
  super
end

#startObject



126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/out_hrforecast.rb', line 126

def start
  super

  @post_thread = nil
  if @background_post
    @post_thread = PostThread.new(self)
  end
end