Class: LogStash::Outputs::SumoLogic::Sender
- Inherits:
-
Object
- Object
- LogStash::Outputs::SumoLogic::Sender
- Includes:
- Common
- Defined in:
- lib/logstash/outputs/sumologic/sender.rb
Constant Summary
Constants included from Common
Common::CARBON2, Common::CATEGORY_HEADER, Common::CATEGORY_HEADER_DEFAULT, Common::CATEGORY_HEADER_DEFAULT_STATS, Common::CLIENT_HEADER, Common::CLIENT_HEADER_VALUE, Common::CONTENT_ENCODING, Common::CONTENT_TYPE, Common::CONTENT_TYPE_CARBON2, Common::CONTENT_TYPE_GRAPHITE, Common::CONTENT_TYPE_LOG, Common::DEFAULT_LOG_FORMAT, Common::DEFLATE, Common::GRAPHITE, Common::GZIP, Common::HOST_HEADER, Common::LOG_TO_CONSOLE, Common::METRICS_NAME_PLACEHOLDER, Common::NAME_HEADER, Common::NAME_HEADER_DEFAULT, Common::STATS_TAG, Common::STOP_TAG
Instance Method Summary collapse
-
#connect ⇒ Object
def stop.
-
#initialize(client, queue, stats, config) ⇒ Sender
constructor
A new instance of Sender.
-
#start ⇒ Object
def initialize.
-
#stop ⇒ Object
def start.
Methods included from Common
#blank?, #log_dbg, #log_err, #log_info, #log_warn, #set_logger
Constructor Details
#initialize(client, queue, stats, config) ⇒ Sender
Returns a new instance of Sender.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/logstash/outputs/sumologic/sender.rb', line 18 def initialize(client, queue, stats, config) @client = client @queue = queue @stats = stats @stopping = Concurrent::AtomicBoolean.new(false) @url = config["url"] @sender_max = (config["sender_max"] ||= 1) < 1 ? 1 : config["sender_max"] @sleep_before_requeue = config["sleep_before_requeue"] ||= 30 @stats_enabled = config["stats_enabled"] ||= false @tokens = SizedQueue.new(@sender_max) @sender_max.times { |t| @tokens << t } @compressor = LogStash::Outputs::SumoLogic::Compressor.new(config) end |
Instance Method Details
#connect ⇒ Object
def stop
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/logstash/outputs/sumologic/sender.rb', line 63 def connect() uri = URI.parse(@url) http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = @url.downcase().start_with?("https") request = Net::HTTP::Get.new(uri.request_uri) begin res = http.request(request) if res.code.to_i != 200 log_err("ping rejected", :url => @url, :code => res.code, :body => res.body) false else log_info("ping accepted", :url => @url) true end rescue Exception => exception log_err("ping failed", :url => @url, :message => exception., :class => exception.class.name, :backtrace => exception.backtrace) false end end |
#start ⇒ Object
def initialize
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/logstash/outputs/sumologic/sender.rb', line 35 def start() log_info("starting sender...", :max => @sender_max, :requeue => @sleep_before_requeue) @stopping.make_false() @sender_t = Thread.new { while @stopping.false? batch = @queue.deq() send_request(batch) end # while @queue.drain().map { |batch| send_request(batch) } log_info("waiting while senders finishing...") while @tokens.size < @sender_max sleep 1 end # while } end |