Class: LogStash::Outputs::Redis

Inherits:
Base
  • Object
show all
Includes:
Stud::Buffer
Defined in:
lib/logstash/outputs/redis.rb

Overview

This output will send events to a Redis queue using RPUSH. The RPUSH command is supported in Redis v0.0.7+. Using PUBLISH to a channel requires at least v1.3.8+. While you may be able to make these Redis versions work, the best performance and stability will be found in more recent stable versions. Versions 2.6.0+ are recommended.

For more information, see redis.io/[the Redis homepage]

Instance Method Summary collapse

Instance Method Details

#closeObject



164
165
166
167
168
169
170
171
172
# File 'lib/logstash/outputs/redis.rb', line 164

def close
  if @batch
    buffer_flush(:final => true)
  end
  if @data_type == 'channel' and @redis
    @redis.quit
    @redis = nil
  end
end

#congestion_check(key) ⇒ Object

def receive



135
136
137
138
139
140
141
142
143
144
# File 'lib/logstash/outputs/redis.rb', line 135

def congestion_check(key)
  return if @congestion_threshold == 0
  if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check.
    while @redis.llen(key) > @congestion_threshold # Don't push event to Redis key which has reached @congestion_threshold.
      @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds")
      sleep @congestion_interval
    end
    @congestion_check_times[key] = Time.now.to_i
  end
end

#flush(events, key, close = false) ⇒ Object

called from Stud::Buffer#buffer_flush when there are events to flush



147
148
149
150
151
152
153
# File 'lib/logstash/outputs/redis.rb', line 147

def flush(events, key, close=false)
  @redis ||= connect
  # we should not block due to congestion on close
  # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value.
  congestion_check(key) unless close
  @redis.rpush(key, events)
end

#on_flush_error(e) ⇒ Object

called from Stud::Buffer#buffer_flush when an error occurs



155
156
157
158
159
160
161
162
# File 'lib/logstash/outputs/redis.rb', line 155

def on_flush_error(e)
  @logger.warn("Failed to send backlog of events to Redis",
    :identity => identity,
    :exception => e,
    :backtrace => e.backtrace
  )
  @redis = connect
end

#receive(event) ⇒ Object

def register



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/logstash/outputs/redis.rb', line 117

def receive(event)
  # TODO(sissel): We really should not drop an event, but historically
  # we have dropped events that fail to be converted to json.
  # TODO(sissel): Find a way to continue passing events through even
  # if they fail to convert properly.
  begin
    @codec.encode(event)
  rescue LocalJumpError
    # This LocalJumpError rescue clause is required to test for regressions
    # for https://github.com/logstash-plugins/logstash-output-redis/issues/26
    # see specs. Without it the LocalJumpError is rescued by the StandardError
    raise
  rescue StandardError => e
    @logger.warn("Error encoding event", :exception => e,
                 :event => event)
  end
end

#registerObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/logstash/outputs/redis.rb', line 90

def register
  require 'redis'

  if @batch
    if @data_type != "list"
      raise RuntimeError.new(
        "batch is not supported with data_type #{@data_type}"
      )
    end
    buffer_initialize(
      :max_items => @batch_events,
      :max_interval => @batch_timeout,
      :logger => @logger
    )
  end

  @redis = nil
  if @shuffle_hosts
      @host.shuffle!
  end
  @host_idx = 0

  @congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval }

  @codec.on_event(&method(:send_to_redis))
end