Class: LogStash::Inputs::Jms
- Inherits:
-
Threadable
- Object
- Threadable
- LogStash::Inputs::Jms
show all
- Extended by:
- PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
- Includes:
- PluginMixins::EventSupport::EventFactoryAdapter
- Defined in:
- lib/logstash/inputs/jms.rb
Overview
Read events from a Jms Broker. Supports both Jms Queues and Topics.
For more information about Jms, see <docs.oracle.com/javaee/6/tutorial/doc/bncdq.html> For more information about the Ruby Gem used, see <github.com/reidmorrison/jruby-jms> Here is a config example to pull from a queue:
jms {
=> false
include_properties => false
include_body => true
use_jms_timestamp => false
interval => 10
destination => "myqueue"
pub-sub => false
yaml_file => "~/jms.yml"
yaml_section => "mybroker"
}
Defined Under Namespace
Classes: HeadersMapper, LegacyHeadersMapper
Constant Summary
collapse
- TARGET_NOT_SET_MESSAGE =
LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck::TARGET_NOT_SET_MESSAGE
Instance Method Summary
collapse
Constructor Details
#initialize(*params) ⇒ Jms
Returns a new instance of Jms.
157
158
159
160
161
162
163
164
165
166
167
168
169
|
# File 'lib/logstash/inputs/jms.rb', line 157
def initialize(*params)
super
if ecs_compatibility != :disabled @headers_target = '[@metadata][input][jms][headers]' unless original_params.include?('headers_target')
@properties_target = '[@metadata][input][jms][properties]' unless original_params.include?('properties_target')
end
@headers_setter = event_setter_for(@headers_target)
@properties_setter = event_setter_for(@properties_target)
@headers_mapper = ecs_compatibility == :disabled ? LegacyHeadersMapper::INSTANCE : HeadersMapper::INSTANCE
end
|
Instance Method Details
#check_config! ⇒ Object
247
248
249
250
251
252
253
254
255
256
257
258
|
# File 'lib/logstash/inputs/jms.rb', line 247
def check_config!
if original_params.include?('include_header')
if original_params.include?('include_headers')
raise(LogStash::ConfigurationError, "Both `include_headers => #{}` and `include_header => #{}`" +
" options are specified, please only set one")
end
@include_headers = end
check_durable_subscription_config
raise(LogStash::ConfigurationError, "Threads cannot be > 1 if pub_sub is set") if @threads > 1 && @pub_sub
end
|
#check_durable_subscription_config ⇒ Object
260
261
262
263
264
265
|
# File 'lib/logstash/inputs/jms.rb', line 260
def check_durable_subscription_config
return unless @durable_subscriber
raise(LogStash::ConfigurationError, "pub_sub must be true if durable_subscriber is set") unless @pub_sub
@durable_subscriber_client_id ||= 'Logstash'
@durable_subscriber_name ||= destination
end
|
#correct_factory_hash(original, value) ⇒ Object
211
212
213
214
215
216
217
218
219
220
221
222
|
# File 'lib/logstash/inputs/jms.rb', line 211
def correct_factory_hash(original, value)
if hash.is_a?(String)
return true if value.downcase == "true"
return false if value.downcase == "false"
end
if value.is_a?(Hash)
value.each { |key, value| original[key.to_sym] = correct_factory_hash({}, value) }
return original
end
value
end
|
#decode_message(msg) ⇒ LogStash::Event?
368
369
370
371
372
373
|
# File 'lib/logstash/inputs/jms.rb', line 368
def decode_message(msg)
text = msg.to_s event = nil
@codec.decode(text) { |e| event = e } unless text.nil?
event
end
|
#do_target_check ⇒ Object
360
361
362
363
364
|
# File 'lib/logstash/inputs/jms.rb', line 360
def do_target_check
return true unless target.nil?
return nil if ecs_compatibility == :disabled
logger.info(TARGET_NOT_SET_MESSAGE) end
|
#do_target_check_once_and_get_event_factory ⇒ Object
351
352
353
354
355
356
|
# File 'lib/logstash/inputs/jms.rb', line 351
def do_target_check_once_and_get_event_factory
@target_checked ||= begin
do_target_check
targeted_event_factory
end
end
|
#durable_subscriber(session, queue_or_topic, params) ⇒ Object
384
385
386
387
|
# File 'lib/logstash/inputs/jms.rb', line 384
def durable_subscriber(session, queue_or_topic, params)
params[:selector] ? session.create_durable_subscriber(queue_or_topic, @durable_subscriber_name, params[:selector], false) :
session.create_durable_subscriber(queue_or_topic, @durable_subscriber_name)
end
|
#error_hash(e) ⇒ Object
394
395
396
397
398
399
|
# File 'lib/logstash/inputs/jms.rb', line 394
def error_hash(e)
error_hash = {:exception => e.class.name, :exception_message => e.message, :backtrace => e.backtrace}
root_cause = get_root_cause(e)
error_hash[:root_cause] = root_cause unless root_cause.nil?
error_hash
end
|
#get_root_cause(e) ⇒ Object
JMS Exceptions can contain chains of Exceptions, making it difficult to determine the root cause of an error without knowing the actual root cause behind the problem. This method protects against Java Exceptions where the cause methods loop. If there is a cause loop, the last cause exception before the loop is detected will be returned, along with an entry in the root_cause hash indicating that an exception loop was detected. This will mean that the root cause may not be the actual root cause of the problem, and further investigation is required
407
408
409
410
411
412
413
414
415
416
417
418
419
420
|
# File 'lib/logstash/inputs/jms.rb', line 407
def get_root_cause(e)
return nil unless e.respond_to?(:get_cause) && !e.get_cause.nil?
cause = e
slow_pointer = e
move_slow = false
until (next_cause = cause.get_cause).nil?
cause = next_cause
return {:exception => cause.class.name, :exception_message => cause.message, :exception_loop => true } if cause == slow_pointer
slow_pointer = slow_pointer.cause if move_slow
move_slow = !move_slow
end
{:exception => cause.class.name, :exception_message => cause.message }
end
|
#jms_config ⇒ Object
190
191
192
193
194
|
# File 'lib/logstash/inputs/jms.rb', line 190
def jms_config
return jms_config_from_yaml(@yaml_file, @yaml_section) if @yaml_file
return jms_config_from_jndi if @jndi_name
jms_config_from_configuration
end
|
#jms_config_from_configuration ⇒ Object
197
198
199
200
201
202
203
204
205
206
207
208
209
|
# File 'lib/logstash/inputs/jms.rb', line 197
def jms_config_from_configuration
config = {
:require_jars => @require_jars,
:factory => @factory,
:username => @username,
:broker_url => @broker_url,
:url => @broker_url }
config[:password] = @password.value unless @password.nil?
correct_factory_hash(config, @factory_settings) unless @factory_settings.nil?
config
end
|
#jms_config_from_jndi ⇒ Object
224
225
226
227
228
229
230
|
# File 'lib/logstash/inputs/jms.rb', line 224
def jms_config_from_jndi
{
:require_jars => @require_jars,
:jndi_name => @jndi_name,
:jndi_context => @jndi_context
}
end
|
#jms_config_from_yaml(file, section) ⇒ Object
232
233
234
|
# File 'lib/logstash/inputs/jms.rb', line 232
def jms_config_from_yaml(file, section)
YAML.load_file(file)[section]
end
|
#load_ssl_properties ⇒ Object
236
237
238
239
240
241
|
# File 'lib/logstash/inputs/jms.rb', line 236
def load_ssl_properties
java.lang.System.setProperty("javax.net.ssl.keyStore", @keystore) if @keystore
java.lang.System.setProperty("javax.net.ssl.keyStorePassword", @keystore_password.value) if @keystore_password
java.lang.System.setProperty("javax.net.ssl.trustStore", @truststore) if @truststore
java.lang.System.setProperty("javax.net.ssl.trustStorePassword", @truststore_password.value) if @truststore_password
end
|
#load_system_properties ⇒ Object
243
244
245
|
# File 'lib/logstash/inputs/jms.rb', line 243
def load_system_properties
@system_properties.each { |k,v| java.lang.System.set_property(k,v.to_s) }
end
|
#obfuscatable?(setting) ⇒ Boolean
186
187
188
|
# File 'lib/logstash/inputs/jms.rb', line 186
def obfuscatable?(setting)
[:password, :keystore_password, :truststore_password].include?(setting)
end
|
#obfuscate_jms_config(config) ⇒ Object
182
183
184
|
# File 'lib/logstash/inputs/jms.rb', line 182
def obfuscate_jms_config(config)
config.each_with_object({}) { |(k, v), h| h[k] = obfuscatable?(k) ? 'xxxxx' : v }
end
|
#process_map_message(msg) ⇒ LogStash::Event
346
347
348
349
|
# File 'lib/logstash/inputs/jms.rb', line 346
def process_map_message(msg)
data = to_string_keyed_hash(msg.data)
do_target_check_once_and_get_event_factory.new_event(data)
end
|
#queue_event(msg, output_queue) ⇒ Object
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
|
# File 'lib/logstash/inputs/jms.rb', line 299
def queue_event(msg, output_queue)
begin
if @include_body
if msg.java_kind_of?(JMS::MapMessage)
event = process_map_message(msg)
elsif msg.java_kind_of?(JMS::TextMessage) || msg.java_kind_of?(JMS::BytesMessage)
event = decode_message(msg)
else
@logger.error( "Unsupported message type #{msg.data.class.to_s}" )
end
end
event ||= event_factory.new_event
if @use_jms_timestamp && msg.jms_timestamp
event.set("@timestamp", LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000))
end
if @include_headers
= (msg)
@skip_headers.each { |key| .delete(key) }
@headers_setter.call(event, )
end
if @include_properties
properties = to_string_keyed_hash(msg.properties)
@skip_properties.each { |key| properties.delete(key) }
@properties_setter.call(event, properties)
end
decorate(event)
output_queue << event
rescue => e @logger.error("Failed to create event", :message => msg, :exception => e,
:backtrace => e.backtrace)
end
end
|
#register ⇒ Object
171
172
173
174
175
176
177
178
179
180
|
# File 'lib/logstash/inputs/jms.rb', line 171
def register
require "jms"
check_config!
load_ssl_properties
load_system_properties if @system_properties
@jms_config = jms_config
@logger.debug("JMS Config being used ", :context => obfuscate_jms_config(@jms_config))
end
|
#regular_subscriber(session, queue_or_topic, params) ⇒ Object
389
390
391
392
|
# File 'lib/logstash/inputs/jms.rb', line 389
def regular_subscriber(session, queue_or_topic, params)
params[:selector] ? session.create_consumer(queue_or_topic, params[:selector]) :
session.create_consumer(queue_or_topic)
end
|
#run(output_queue) ⇒ Object
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
|
# File 'lib/logstash/inputs/jms.rb', line 267
def run(output_queue)
begin
connection = JMS::Connection.new(@jms_config)
connection.client_id = @durable_subscriber_client_id if @durable_subscriber_client_id
session = connection.create_session(@jms_config)
connection.start
params = {:timeout => @timeout * 1000, :selector => @selector}
subscriber = subscriber(session, params)
until stop?
subscriber.each({:timeout => @interval * 1000}) do |message|
queue_event(message, output_queue)
break if stop?
end
end
rescue => e
logger.warn("JMS Consumer Died", error_hash(e))
unless stop?
sleep(5)
subscriber && subscriber.close
session && session.close
connection && connection.close
retry
end
ensure
subscriber && subscriber.close
session && session.close
connection && connection.close
end
end
|
#subscriber(session, params) ⇒ Object
375
376
377
378
379
380
381
|
# File 'lib/logstash/inputs/jms.rb', line 375
def subscriber(session, params)
destination_key = @pub_sub ? :topic_name : :queue_name
params[destination_key] = @destination
queue_or_topic = session.create_destination(params)
@durable_subscriber ? durable_subscriber(session, queue_or_topic, params) :
regular_subscriber(session, queue_or_topic, params)
end
|