Class: LogStash::Outputs::Application_insights::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/application_insights/channel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(instrumentation_key, table_id) ⇒ Channel

Returns a new instance of Channel.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/logstash/outputs/application_insights/channel.rb', line 33

def initialize ( instrumentation_key, table_id )
  @closing = false
  configuration = Config.current
  
  @file_pipe = !configuration[:disable_compression]
  @gzip_file = !configuration[:disable_compression]
  @blob_max_bytesize = configuration[:blob_max_bytesize]
  @blob_max_events = configuration[:blob_max_events]

  @logger = configuration[:logger]

  @logger.debug { "Create a new channel, instrumentation_key / table_id : #{instrumentation_key} / #{table_id}" }
  @instrumentation_key = instrumentation_key
  @table_id = table_id
  set_table_properties( configuration )
  @semaphore = Mutex.new
  @workers_channel = {  }

  @failed_on_notify_retry_Q = Queue.new
  launch_notify_recovery_thread

  @blob_extension = ".#{@event_format}"
  if file_pipe?
    @blob_extension = "_#{@event_format}.gz" if gzip_file?
    @add_pipe_threshold = 0
    @file_prefix = configuration[:local_file_prefix]
    @file = nil
    @failed_on_file_upload_retry_Q = Queue.new
    launch_file_upload_recovery_thread
  else
    @add_pipe_threshold = CHANNEL_THRESHOLD_TO_ADD_UPLOAD_PIPE
    @failed_on_block_upload_retry_Q = Queue.new
    launch_block_upload_recovery_thread
  end

  @active_upload_pipes = [ Upload_pipe.new( self, 1 ) ]
end

Instance Attribute Details

#blob_extensionObject (readonly)

Returns the value of attribute blob_extension.



28
29
30
# File 'lib/logstash/outputs/application_insights/channel.rb', line 28

def blob_extension
  @blob_extension
end

#blob_max_delayObject (readonly)

Returns the value of attribute blob_max_delay.



27
28
29
# File 'lib/logstash/outputs/application_insights/channel.rb', line 27

def blob_max_delay
  @blob_max_delay
end

#event_formatObject (readonly)

Returns the value of attribute event_format.



29
30
31
# File 'lib/logstash/outputs/application_insights/channel.rb', line 29

def event_format
  @event_format
end

#instrumentation_keyObject (readonly)

Returns the value of attribute instrumentation_key.



25
26
27
# File 'lib/logstash/outputs/application_insights/channel.rb', line 25

def instrumentation_key
  @instrumentation_key
end

#table_idObject (readonly)

Returns the value of attribute table_id.



26
27
28
# File 'lib/logstash/outputs/application_insights/channel.rb', line 26

def table_id
  @table_id
end

Instance Method Details

#<<(data) ⇒ Object

received data is an hash of the event (does not include metadata)



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/logstash/outputs/application_insights/channel.rb', line 92

def << ( data )
  if @serialized_event_field && data[@serialized_event_field]
    serialized_event = serialize_serialized_event_field( data[@serialized_event_field] )
  else
    serialized_event = ( EXT_EVENT_FORMAT_CSV == @event_format ? serialize_to_csv( data ) : serialize_to_json( data ) )
  end

  if serialized_event
    sub_channel = @workers_channel[Thread.current] || @semaphore.synchronize { @workers_channel[Thread.current] = Sub_channel.new( @event_separator ) }
    sub_channel << serialized_event
  else
    @logger.warn { "event not uploaded, no relevant data in event. table_id: #{table_id}, event: #{data}" }
  end
end

#closeObject



80
81
82
83
84
85
# File 'lib/logstash/outputs/application_insights/channel.rb', line 80

def close
  @closing = true
  @active_upload_pipes.each do |upload_pipe|
    upload_pipe.close
  end
end

#file_pipe?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/logstash/outputs/application_insights/channel.rb', line 76

def file_pipe?
  @file_pipe
end

#flushObject



108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/logstash/outputs/application_insights/channel.rb', line 108

def flush
  if file_pipe?
    gz_collect_and_compress_blocks_to_file
    if file_expired_or_full?
      enqueue_to_pipe( [ @file ] )
      @file = nil
    end
  else
    list = collect_blocks
    enqueue_to_pipe( list )
  end
end

#gzip_file?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/logstash/outputs/application_insights/channel.rb', line 72

def gzip_file?
  @gzip_file
end

#recover_later_block_upload(block_to_upload) ⇒ Object



127
128
129
# File 'lib/logstash/outputs/application_insights/channel.rb', line 127

def recover_later_block_upload( block_to_upload )
  @failed_on_block_upload_retry_Q << block_to_upload
end

#recover_later_file_upload(file_to_upload) ⇒ Object



131
132
133
# File 'lib/logstash/outputs/application_insights/channel.rb', line 131

def recover_later_file_upload( file_to_upload )
  @failed_on_file_upload_retry_Q << file_to_upload
end

#recover_later_notification(tuple) ⇒ Object



122
123
124
# File 'lib/logstash/outputs/application_insights/channel.rb', line 122

def recover_later_notification( tuple )
  @failed_on_notify_retry_Q << tuple
end

#stopped?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/logstash/outputs/application_insights/channel.rb', line 87

def stopped?
  @closing
end