Class: One2Influx::Influx
- Inherits:
-
Object
- Object
- One2Influx::Influx
- Defined in:
- lib/one2influx/influx.rb
Overview
Used for storing data to InfluxDB through HTTP API
Instance Method Summary collapse
-
#db_exists? ⇒ boolean
Checks whether InfluxDB connection is possible, database @db exists and @retention_policy exists.
-
#initialize ⇒ Influx
constructor
Initializes class Influx.
- #store(points) ⇒ Object
Constructor Details
#initialize ⇒ Influx
Initializes class Influx.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/one2influx/influx.rb', line 6 def initialize @authenticate = $CFG.influx[:authenticate] if @authenticate creds = $CFG.influx[:credentials].split(':') raise 'InfluxDB credentials have invalid form!' if creds.length != 2 @user = creds[0] @pass = creds[1] end uri = URI.parse($CFG.influx[:endpoint]) @host = uri.host @port = uri.port @db = $CFG.influx[:database] @retention_policy = $CFG.influx[:policy] end |
Instance Method Details
#db_exists? ⇒ boolean
Checks whether InfluxDB connection is possible, database @db exists and @retention_policy exists.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/one2influx/influx.rb', line 57 def db_exists? uri = URI('/query') query = {:q => "SHOW RETENTION POLICIES #{@db}"} uri.query = URI.encode_www_form(query) req = Net::HTTP::Get.new(uri.to_s) response = make_request(req) # Was request successful? if response.nil? return false end # Check for invalid HTTP response codes if response.code.to_i == 401 $LOG.error "Unauthorized user '#{@user}', unable to verify connection" + ' to InfluxDB.' return false elsif response.code.to_i != 200 $LOG.error 'Failed to store data to InfluxDB. Received HTTP code ' + "#{response.code}, body: #{response.body}" return false end # Parse response begin response = JSON.parse(response.body) rescue JSON::ParserError => e $LOG.error 'Unable to parse InfluxDB response, while verifying ' + "connection to database '#{@db}': #{e}. " + "Received: #{response.body}" return false end begin if response['results'][0].has_key? 'error' $LOG.error "Unable to verify connection to InfluxDB database '#{@db}'" + " with message: #{response['results'][0]['error']}" return false end response['results'][0]['series'][0]['values'].each do |policy| if policy[0] == @retention_policy $LOG.info "Connection to InfluxDB database '#{@db}' with retention " + "policy '#{@retention_policy}' verified." return true end end rescue Exception # Handle index out of bounds etc. exceptions $LOG.error "Unable to verify connection to InfluxDB database '#{@db}'. " + 'Invalid InfluxDB response format.' return false end $LOG.error "InfluxDB database '#{@db}' does not contain supplied " + "retention policy '#{@retention_policy}'." return false end |
#store(points) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/one2influx/influx.rb', line 24 def store(points) # Split points by 2500 for better performance points.each_slice(2500).to_a.each do |slice| # Default InfluxDB payload form payload = { :database => @db, :retentionPolicy => @retention_policy, :points => slice } # puts "Total number of points is #{points.length}" # Create InfluxDB write request req = Net::HTTP::Post.new( '/write', initheader = { 'Content-Type' => 'application/json' } ) req.body = payload.to_json response = make_request(req) if (not response.nil?) && (response.code != '200') raise 'Failed to store data to InfluxDB. Received HTTP code ' + "#{response.code}, body: #{response.body}" end $LOG.info "Successfully stored #{slice.length} data points." end end |