Class: LogStash::Outputs::Redis
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Redis
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash/outputs/redis.rb
Overview
This output will send events to a Redis queue using RPUSH. The RPUSH command is supported in Redis v0.0.7+. Using PUBLISH to a channel requires at least v1.3.8+. While you may be able to make these Redis versions work, the best performance and stability will be found in more recent stable versions. Versions 2.6.0+ are recommended.
For more information, see redis.io/[the Redis homepage]
Instance Method Summary collapse
- #close ⇒ Object
-
#congestion_check(key) ⇒ Object
def receive.
-
#flush(events, key, close = false) ⇒ Object
called from Stud::Buffer#buffer_flush when there are events to flush.
-
#on_flush_error(e) ⇒ Object
called from Stud::Buffer#buffer_flush when an error occurs.
-
#receive(event) ⇒ Object
def register.
- #register ⇒ Object
Instance Method Details
#close ⇒ Object
164 165 166 167 168 169 170 171 172 |
# File 'lib/logstash/outputs/redis.rb', line 164 def close if @batch buffer_flush(:final => true) end if @data_type == 'channel' and @redis @redis.quit @redis = nil end end |
#congestion_check(key) ⇒ Object
def receive
135 136 137 138 139 140 141 142 143 144 |
# File 'lib/logstash/outputs/redis.rb', line 135 def congestion_check(key) return if @congestion_threshold == 0 if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check. while @redis.llen(key) > @congestion_threshold # Don't push event to Redis key which has reached @congestion_threshold. @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds") sleep @congestion_interval end @congestion_check_times[key] = Time.now.to_i end end |
#flush(events, key, close = false) ⇒ Object
called from Stud::Buffer#buffer_flush when there are events to flush
147 148 149 150 151 152 153 |
# File 'lib/logstash/outputs/redis.rb', line 147 def flush(events, key, close=false) @redis ||= connect # we should not block due to congestion on close # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value. congestion_check(key) unless close @redis.rpush(key, events) end |
#on_flush_error(e) ⇒ Object
called from Stud::Buffer#buffer_flush when an error occurs
155 156 157 158 159 160 161 162 |
# File 'lib/logstash/outputs/redis.rb', line 155 def on_flush_error(e) @logger.warn("Failed to send backlog of events to Redis", :identity => identity, :exception => e, :backtrace => e.backtrace ) @redis = connect end |
#receive(event) ⇒ Object
def register
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/logstash/outputs/redis.rb', line 117 def receive(event) # TODO(sissel): We really should not drop an event, but historically # we have dropped events that fail to be converted to json. # TODO(sissel): Find a way to continue passing events through even # if they fail to convert properly. begin @codec.encode(event) rescue LocalJumpError # This LocalJumpError rescue clause is required to test for regressions # for https://github.com/logstash-plugins/logstash-output-redis/issues/26 # see specs. Without it the LocalJumpError is rescued by the StandardError raise rescue StandardError => e @logger.warn("Error encoding event", :exception => e, :event => event) end end |
#register ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/logstash/outputs/redis.rb', line 90 def register require 'redis' if @batch if @data_type != "list" raise RuntimeError.new( "batch is not supported with data_type #{@data_type}" ) end buffer_initialize( :max_items => @batch_events, :max_interval => @batch_timeout, :logger => @logger ) end @redis = nil if @shuffle_hosts @host.shuffle! end @host_idx = 0 @congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval } @codec.on_event(&method(:send_to_redis)) end |