Class: LogStash::Outputs::ADLS

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/adls.rb

Overview

Usage

This is an example of Logstash config:

source,ruby

input

...

filter

...

output

adls {
  adls_fqdn => "XXXXXXXXXXX.azuredatalakestore.net"                                         # (required)
  adls_token_endpoint => "https://login.microsoftonline.com/XXXXXXXXXX/oauth2/token"        # (required)
  adls_client_id => "00000000-0000-0000-0000-000000000000"                                  # (required)
  adls_client_key => "XXXXXXXXXXXXXXXXXXXXXX"                                               # (required)
  path => "/logstash/%{+YYYY/%+MM/%+dd/logstash-%+HH-%[@metadata][cid].log"        # (required)
  line_separator => "\n"                                                                    # (optional, default: "\n")
  created_files_permission => 755                                                           # (optional, default: 755)
  adls_token_expire_security_margin => 300                                                  # (optional, default: 300)
  single_file_per_thread = > true                                                           # (optional, default: true)
  retry_interval => 0.5                                                                     # (optional, default: 0.5)
  max_retry_interval => 10                                                                  # (optional, default: 10)
  retry_times => 3                                                                          # (optional, default: 3)
  exit_if_retries_exceeded => false                                                         # (optional, default: false)
  codec => "json"                                                                           # (optional, default: default codec defined by Logstash)
}

}


Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#azureOauthTokenRefreshDateObject

Returns the value of attribute azureOauthTokenRefreshDate.



90
91
92
# File 'lib/logstash/outputs/adls.rb', line 90

def azureOauthTokenRefreshDate
  @azureOauthTokenRefreshDate
end

#clientObject

Returns the value of attribute client.



89
90
91
# File 'lib/logstash/outputs/adls.rb', line 89

def client
  @client
end

#timerObject

Returns the value of attribute timer.



92
93
94
# File 'lib/logstash/outputs/adls.rb', line 92

def timer
  @timer
end

#timerTaskClassObject

Returns the value of attribute timerTaskClass.



91
92
93
# File 'lib/logstash/outputs/adls.rb', line 91

def timerTaskClass
  @timerTaskClass
end

Instance Method Details

#closeObject



142
143
144
# File 'lib/logstash/outputs/adls.rb', line 142

def close
  @logger.info("Logstash ADLS output plugin is shutting down...")
end

#multi_receive(events) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/logstash/outputs/adls.rb', line 164

def multi_receive(events)
  return if not events

  timeElapsed = Time.now

  output_files = Hash.new { |hash, key| hash[key] = "" }
  events.collect do |event|

    if @single_file_per_thread
      event.set("[@metadata][cid]", "#{@randomValuePerInstance.to_s}#{Thread.current.object_id.to_s}")
    end

    path = event.sprintf(@path)
    event_as_string = @codec.encode(event)
    event_as_string +=  @line_separator unless event_as_string.end_with?  @line_separator
    output_files[path] << event_as_string
  end

  output_files.each do |path, output|
    # Retry max_retry times. This can solve problems like leases being hold by another process.
    write_tries = 0
    begin
      write_data(path, output)
    rescue Exception => e
      if (write_tries < @retry_times) or (@retry_times == -1)
        sleepTime = [@retry_interval * write_tries, @max_retry_interval].min
        @logger.warn("ADLS write caused an exception: #{e.message}. Maybe you should increase retry_interval or reduce number of workers. Attempt: #{write_tries.to_s}. Retrying in #{sleepTime.to_s} seconds...")
        sleep(sleepTime)
        write_tries += 1
        retry
      else
        if e.instance_of? com.microsoft.azure.datalake.store.ADLException
          @logger.error("Max write retries reached. Events discarded! ADLS_RemoteMessage: #{e.remoteExceptionMessage}; Exception: #{e.message};  ADLS_Path: #{path}; StackTrace:#{e.backtrace.join("\n\t")}")
        else
          @logger.error("Max write retries reached. Events discarded! Exception: #{e.message}; StackTrace:#{e.backtrace.join("\n\t")}")
        end
        if @exit_if_retries_exceeded
          exit 1
        end
      end
    end 
  end
  @logger.debug("#{events.length.to_s} events written on ADLS in #{Time.now-timeElapsed} seconds.")

end

#prepare_client(accountFQDN, clientId, authTokenEndpoint, clientKey) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/logstash/outputs/adls.rb', line 146

def prepare_client(accountFQDN, clientId, authTokenEndpoint, clientKey)
  azureToken = com.microsoft.azure.datalake.store.oauth2.AzureADAuthenticator.getTokenUsingClientCreds(authTokenEndpoint, clientId, clientKey)

  calendar = java.util.Calendar.getInstance()
  calendar.setTime(azureToken.expiry)
  calendar.set(java.util.Calendar::SECOND,(calendar.get(java.util.Calendar::SECOND)-@adls_token_expire_security_margin))
  @azureOauthTokenRefreshDate = calendar.getTime()

  @logger.info("Got ADLS OAuth Token with expire date #{azureToken.expiry.to_s}. Token will be refreshed at #{@azureOauthTokenRefreshDate.to_s}")
  
  client = com.microsoft.azure.datalake.store.ADLStoreClient.createClient(accountFQDN, azureToken)
  options = com.microsoft.azure.datalake.store.ADLStoreOptions.new()
  options.setUserAgentSuffix("Logstash-ADLS-Output-Plugin")
  client.setOptions(options)
  client.checkExists("testFile") # Test the Client to make sure it works. The return value is irrelevant. 
  client
end

#registerObject



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/logstash/outputs/adls.rb', line 96

def register()

    begin
      @client = prepare_client(@adls_fqdn, @adls_client_id, @adls_token_endpoint, @adls_client_key)
    rescue => e
      logger.error("Cannot Login in ADLS. Aborting.... Exception: #{e.message}; Trace:#{e.backtrace.join("\n\t")}")
      exit 1
    end

    # Make sure @path contains %{[@metadata][thread_id]} format value
    if @single_file_per_thread and !@path.include? "%{[@metadata][cid]}"
      @logger.error("Please set %{[@metadata][cid]} format value in @path to avoid file locks in ADLS.")
      raise LogStash::ConfigurationError
    end

    @codec.on_event do |event, encoded_event|
      encoded_event
    end

    @timerTaskClass = Class.new java.util.TimerTask do
      def setContext(parent)
        @parent = parent
      end
      def run
        begin
          @parent.client = @parent.prepare_client(@parent.adls_fqdn, @parent.adls_client_id, @parent.adls_token_endpoint, @parent.adls_client_key)
        rescue => e
          sleepTime = [@parent.retry_interval, @parent.max_retry_interval].min
          @parent.logger.error("ADLS Refresh OAuth Token Failed! Retrying in #{sleepTime.to_s} seconds... Exception: #{e.message}; Trace:#{e.backtrace.join("\n\t")}")         
          sleep(sleepTime)
        end
        timerTask = @parent.timerTaskClass.new
        timerTask.setContext(@parent)
        @parent.timer.schedule(timerTask, @parent.azureOauthTokenRefreshDate) # Rearm timer
      end  
    end  

    timerTask = @timerTaskClass.new
    timerTask.setContext(self)

    @timer = java.util.Timer.new
    @timer.schedule(timerTask, @azureOauthTokenRefreshDate)

    @randomValuePerInstance = rand(10..10000) # To make sure different instances in different machines don't generate the same threadId.
end

#write_data(path, data) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/logstash/outputs/adls.rb', line 210

def write_data(path, data)
  begin
    #@logger.info("Trying to write at #{path}")
    adlsClient = @client
    
    # Try to append to already existing file, which will work most of the times.
    stream = adlsClient.getAppendStream(path)
    outStream = java.io.PrintStream.new(stream)
    outStream.print(data)
    outStream.close()
    stream.close()

  # File does not exist, so create it.
  rescue com.microsoft.azure.datalake.store.ADLException => e
    if e.httpResponseCode == 404
      createStream = adlsClient.createFile(path, com.microsoft.azure.datalake.store.IfExists::OVERWRITE, @created_files_permission.to_s, true)
      outStream = java.io.PrintStream.new(createStream)
      outStream.print(data)
      outStream.close()
      createStream.close()
      @logger.debug("File #{path} created.")
    else
       raise e  
    end           
  end
  #@logger.info("Data written to ADLS: #{data}")

end