Class: LogStash::Outputs::Application_insights::Channel
- Inherits:
-
Object
- Object
- LogStash::Outputs::Application_insights::Channel
- Defined in:
- lib/logstash/outputs/application_insights/channel.rb
Instance Attribute Summary collapse
-
#blob_extension ⇒ Object
readonly
Returns the value of attribute blob_extension.
-
#blob_max_delay ⇒ Object
readonly
Returns the value of attribute blob_max_delay.
-
#event_format ⇒ Object
readonly
Returns the value of attribute event_format.
-
#instrumentation_key ⇒ Object
readonly
Returns the value of attribute instrumentation_key.
-
#table_id ⇒ Object
readonly
Returns the value of attribute table_id.
Instance Method Summary collapse
-
#<<(data) ⇒ Object
received data is an hash of the event (does not include metadata).
- #close ⇒ Object
- #file_pipe? ⇒ Boolean
- #flush ⇒ Object
- #gzip_file? ⇒ Boolean
-
#initialize(instrumentation_key, table_id) ⇒ Channel
constructor
A new instance of Channel.
- #recover_later_block_upload(block_to_upload) ⇒ Object
- #recover_later_file_upload(file_to_upload) ⇒ Object
- #recover_later_notification(tuple) ⇒ Object
- #stopped? ⇒ Boolean
Constructor Details
#initialize(instrumentation_key, table_id) ⇒ Channel
Returns a new instance of Channel.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 33 def initialize ( instrumentation_key, table_id ) @closing = false configuration = Config.current @file_pipe = !configuration[:disable_compression] @gzip_file = !configuration[:disable_compression] @blob_max_bytesize = configuration[:blob_max_bytesize] @blob_max_events = configuration[:blob_max_events] @logger = configuration[:logger] @logger.debug { "Create a new channel, instrumentation_key / table_id : #{instrumentation_key} / #{table_id}" } @instrumentation_key = instrumentation_key @table_id = table_id set_table_properties( configuration ) @semaphore = Mutex.new @workers_channel = { } @failed_on_notify_retry_Q = Queue.new launch_notify_recovery_thread @blob_extension = ".#{@event_format}" if file_pipe? @blob_extension = "_#{@event_format}.gz" if gzip_file? @add_pipe_threshold = 0 @file_prefix = configuration[:local_file_prefix] @file = nil @failed_on_file_upload_retry_Q = Queue.new launch_file_upload_recovery_thread else @add_pipe_threshold = CHANNEL_THRESHOLD_TO_ADD_UPLOAD_PIPE @failed_on_block_upload_retry_Q = Queue.new launch_block_upload_recovery_thread end @active_upload_pipes = [ Upload_pipe.new( self, 1 ) ] end |
Instance Attribute Details
#blob_extension ⇒ Object (readonly)
Returns the value of attribute blob_extension.
28 29 30 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 28 def blob_extension @blob_extension end |
#blob_max_delay ⇒ Object (readonly)
Returns the value of attribute blob_max_delay.
27 28 29 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 27 def blob_max_delay @blob_max_delay end |
#event_format ⇒ Object (readonly)
Returns the value of attribute event_format.
29 30 31 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 29 def event_format @event_format end |
#instrumentation_key ⇒ Object (readonly)
Returns the value of attribute instrumentation_key.
25 26 27 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 25 def instrumentation_key @instrumentation_key end |
#table_id ⇒ Object (readonly)
Returns the value of attribute table_id.
26 27 28 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 26 def table_id @table_id end |
Instance Method Details
#<<(data) ⇒ Object
received data is an hash of the event (does not include metadata)
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 92 def << ( data ) if @serialized_event_field && data[@serialized_event_field] serialized_event = serialize_serialized_event_field( data[@serialized_event_field] ) else serialized_event = ( EXT_EVENT_FORMAT_CSV == @event_format ? serialize_to_csv( data ) : serialize_to_json( data ) ) end if serialized_event sub_channel = @workers_channel[Thread.current] || @semaphore.synchronize { @workers_channel[Thread.current] = Sub_channel.new( @event_separator ) } sub_channel << serialized_event else @logger.warn { "event not uploaded, no relevant data in event. table_id: #{table_id}, event: #{data}" } end end |
#close ⇒ Object
80 81 82 83 84 85 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 80 def close @closing = true @active_upload_pipes.each do |upload_pipe| upload_pipe.close end end |
#file_pipe? ⇒ Boolean
76 77 78 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 76 def file_pipe? @file_pipe end |
#flush ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 108 def flush if file_pipe? gz_collect_and_compress_blocks_to_file if file_expired_or_full? enqueue_to_pipe( [ @file ] ) @file = nil end else list = collect_blocks enqueue_to_pipe( list ) end end |
#gzip_file? ⇒ Boolean
72 73 74 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 72 def gzip_file? @gzip_file end |
#recover_later_block_upload(block_to_upload) ⇒ Object
127 128 129 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 127 def recover_later_block_upload( block_to_upload ) @failed_on_block_upload_retry_Q << block_to_upload end |
#recover_later_file_upload(file_to_upload) ⇒ Object
131 132 133 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 131 def recover_later_file_upload( file_to_upload ) @failed_on_file_upload_retry_Q << file_to_upload end |
#recover_later_notification(tuple) ⇒ Object
122 123 124 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 122 def recover_later_notification( tuple ) @failed_on_notify_retry_Q << tuple end |
#stopped? ⇒ Boolean
87 88 89 |
# File 'lib/logstash/outputs/application_insights/channel.rb', line 87 def stopped? @closing end |