Class: LogStash::Outputs::Application_insights::Storage_cleanup

Inherits:
Blob
  • Object
show all
Defined in:
lib/logstash/outputs/application_insights/storage_cleanup.rb

Constant Summary

Constants inherited from Blob

Blob::CREATE_EXIST_ERRORS

Instance Attribute Summary

Attributes inherited from Blob

#last_io_exception

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Blob

close, #create_container_exist_recovery, #create_exist_recovery, #create_table_exist_recovery, #state_table_delete, #state_table_insert, #state_table_query, #state_table_update, stopped?, #update_commited_or_uncommited_list

Methods inherited from Context

#clear_context, #context_to_table_entity, #context_to_tuple, #table_entity_to_context, #table_entity_to_tuple, #tuple_to_context

Constructor Details

#initialize(storage_account_name) ⇒ Storage_cleanup

Returns a new instance of Storage_cleanup.


42
43
44
45
46
47
48
49
50
51
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 42

def initialize (  )
  # super first parameter must be nil. blob first parameter is channel, otherwise it will pass storage_account_name as channel

  super( nil )
   = 
  @azure_storage_container_prefix = "#{AZURE_STORAGE_CONTAINER_LOGSTASH_PREFIX}#{@configuration[:azure_storage_container_prefix]}"
  @retention_time = @configuration[:blob_retention_time] + 24 * 60 * 60
  @not_notified_container = "#{AZURE_STORAGE_CONTAINER_LOGSTASH_PREFIX}#{@configuration[:azure_storage_container_prefix]}-#{AZURE_STORAGE_ORPHAN_BLOBS_CONTAINER_NAME}"
  # launch tread that cleans the storage

  periodic_storage_cleanup
end

Class Method Details

.startObject


27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 27

def self.start
  configuration = Config.current
  @disable_cleanup = configuration[:disable_cleanup]
  @delete_not_notified_blobs = configuration[:delete_not_notified_blobs]

  unless @disable_cleanup
    @@list = [ ]
    configuration[:storage_account_name_key].each do |, |
      @@list << Storage_cleanup.new(  )
    end
  end
  private_class_method :new
end

Instance Method Details

#container_cleanup(container_name) ⇒ Object


101
102
103
104
105
106
107
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 101

def container_cleanup ( container_name )
  unless  @delete_not_notified_blobs
    return unless copy_not_notified_blobs( container_name )
    return unless delete_container_entities( container_name )
  end
  delete_container( container_name )
end

#copy_not_notified_blob(container_name, blob_name) ⇒ Object

return status or nil if failed


136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 136

def copy_not_notified_blob( container_name, blob_name )
  @action = :copy_blob_to_not_notified_container
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :blob_exit, :create_container, :container_exist ]
  @info  = "#{@action} #{@storage_account_name}/#{container_name}/#{blob_name}"
  tuple = nil
  success =  storage_io_block {
    create_exist_recovery( :container, @not_notified_container ) { |name| @client.blobClient.create_container( name ) }
    if :blob_exit == @recovery
      tuple = ["", :pending]
    else
      tuple = @client.blobClient.copy_blob(@not_notified_container, blob_name, container_name, blob_name)
    end
  }
  tuple ? tuple[1].to_sym : nil
end

#copy_not_notified_blobs(container_name) ⇒ Object

return true if all notified entities were copied


111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 111

def copy_not_notified_blobs( container_name )
  pending = nil
  continuation_token = nil
  filter = "#{:container_name} eq '#{container_name}' and #{:log_state} ne '#{:notified}'"
  begin
    entities = state_table_query( , filter , continuation_token )
    return nil unless entities
    token = entities.continuation_token
    entities.each do |entity|
      blob_name = entity.properties[:blob_name.to_s]
      return nil unless ( status = not_notified_blob_copy_status( blob_name ) )
      if :pending == status
        pending = true
      elsif :success != status
        return nil unless (status = copy_not_notified_blob( container_name, blob_name ) )
        pending = true unless :success == status
      end
      @logger.warn { "copied blob: #{@storage_account_name}/#{container_name}/#{blob_name} to #{@not_notified_container} container because cannot notify" } if :success == status
    end
  end while continuation_token
  pending.nil?
end

#delete_container(container_name) ⇒ Object

return true if container deleted


191
192
193
194
195
196
197
198
199
200
201
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 191

def delete_container ( container_name )
  @action = :delete_container
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :create_container ]
  @info  = "#{@action} #{@storage_account_name}/#{container_name}"

  success =  storage_io_block {
    # delete container, if not found, skip

    containers = @client.blobClient.delete_container( container_name )  unless :create_container == @recovery
  }
  success
end

#delete_container_entities(container_name) ⇒ Object

return true if all container entities were removed from log table


174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 174

def delete_container_entities( container_name )
  continuation_token = nil
  filter = "#{:container_name} eq '#{container_name}'"
  begin
    entities = state_table_query( , filter , continuation_token )
    return nil unless entities
    token = entities.continuation_token
    entities.each do |entity|
      table_entity_to_context( entity.properties )
      return nil unless state_table_delete
    end
  end while continuation_token
  true
end

#list_container_names(azure_storage_container_prefix = nil, token = nil) ⇒ Object

return blob containers


85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 85

def list_container_names ( azure_storage_container_prefix = nil, token = nil )
  @action = :list_container_names
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable ]
  @info  = "#{@action} #{@storage_account_name}"

  containers = nil
  success =  storage_io_block {
    options = { :metadata => true }
    options[:marker] = token if token
    options[:prefix] = azure_storage_container_prefix if azure_storage_container_prefix
    containers = @client.blobClient.list_containers( options )
  }
  containers
end

#list_containers_to_cleanupObject

return list of containers ready to be cleaned up, return empty list in case failed to get list


68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 68

def list_containers_to_cleanup
  continuation_token = nil
  container_names_to_delete = [  ]
  begin
    containers = list_container_names( @azure_storage_container_prefix, continuation_token)
    break unless containers
    token = containers.continuation_token
    containers.each do |container|
      expiration_time = Time.parse( container.properties[:last_modified] ) + @retention_time
      container_names_to_delete << container.name if expiration_time <= Time.now.utc
    end
  end while continuation_token
  container_names_to_delete
end

#not_notified_blob_copy_status(blob_name) ⇒ Object

return copy status, if failed return nil


153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 153

def not_notified_blob_copy_status ( blob_name )
  @action = :check_not_notified_blob_copy_status
  @recoverable = [ :invalid_storage_key, :io_failure, :service_unavailable, :create_resource, :create_container, :container_exist ]
  @info  = "#{@action} #{@storage_account_name}/#{@not_notified_container}/#{blob_name}"
  status = nil
  success =  storage_io_block {
    create_exist_recovery( :container, @not_notified_container ) { |name| @client.blobClient.create_container( name ) }
    if :create_resource == @recovery
      status = :not_started
    elsif
      result = @client.blobClient.get_blob_properties( @not_notified_container, blob_name )
      if result
        properties = result.properties
        status = ( properties[:copy_status] || :success).to_sym
      end
    end
  }
  status
end

#periodic_storage_cleanupObject


54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/outputs/application_insights/storage_cleanup.rb', line 54

def periodic_storage_cleanup
  Thread.new do
    loop do
      container_names = list_containers_to_cleanup
      container_names.each do |container_name|
        container_cleanup( container_name )
      end
      sleep( 60 * 60 )
    end
  end
end