Class: Kafkr::Consumer
- Inherits:
-
Object
- Object
- Kafkr::Consumer
- Defined in:
- lib/kafkr/consumer.rb
Defined Under Namespace
Classes: Handler
Constant Summary collapse
- HANDLERS_DIRECTORY =
"./handlers"
Class Attribute Summary collapse
-
.handlers ⇒ Object
readonly
Returns the value of attribute handlers.
Class Method Summary collapse
- .configuration ⇒ Object
- .configure {|configuration| ... } ⇒ Object
- .list_registered_handlers ⇒ Object
- .load_handlers(directory = "./handlers") ⇒ Object
- .register_handler(handler) ⇒ Object
Instance Method Summary collapse
- #backoff_time(attempt) ⇒ Object
- #fibonacci(n) ⇒ Object
-
#initialize(host = Consumer.configuration.host, port = Consumer.configuration.port) ⇒ Consumer
constructor
A new instance of Consumer.
- #listen ⇒ Object (also: #consume, #receive, #connect, #monitor, #observe)
- #listen_for(message, send_message) ⇒ Object
- #valid_class_name?(name) ⇒ Boolean
- #valid_json?(json) ⇒ Boolean
Constructor Details
#initialize(host = Consumer.configuration.host, port = Consumer.configuration.port) ⇒ Consumer
Returns a new instance of Consumer.
82 83 84 85 |
# File 'lib/kafkr/consumer.rb', line 82 def initialize(host = Consumer.configuration.host, port = Consumer.configuration.port) @host = host @port = port end |
Class Attribute Details
.handlers ⇒ Object (readonly)
Returns the value of attribute handlers.
16 17 18 |
# File 'lib/kafkr/consumer.rb', line 16 def handlers @handlers end |
Class Method Details
.configuration ⇒ Object
18 19 20 21 22 23 24 25 26 |
# File 'lib/kafkr/consumer.rb', line 18 def configuration FileUtils.mkdir_p "./.kafkr" @configuration ||= OpenStruct.new @configuration.host = ENV.fetch("KAFKR_HOST", "localhost") @configuration.port = ENV.fetch("KAFKR_PORT", 4000) @configuration.timeout = ENV.fetch("KAFKR_CONSUMER_TIMEOUT", 300) @configuration.suggest_handlers = false @configuration end |
.configure {|configuration| ... } ⇒ Object
28 29 30 |
# File 'lib/kafkr/consumer.rb', line 28 def configure yield(configuration) if block_given? end |
.list_registered_handlers ⇒ Object
39 40 41 42 |
# File 'lib/kafkr/consumer.rb', line 39 def list_registered_handlers Kafkr.log "Registered handlers:" $loaded_handlers.keys.each { |handler| Kafkr.log "- #{handler}" } end |
.load_handlers(directory = "./handlers") ⇒ Object
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/kafkr/consumer.rb', line 44 def load_handlers(directory = "./handlers") Dir.glob("#{directory}/*.rb").each do |file| handler_name = File.basename(file, ".rb") unless $loaded_handlers[handler_name] require file $loaded_handlers[handler_name] = true $handlers_changed = true end end end |
.register_handler(handler) ⇒ Object
32 33 34 |
# File 'lib/kafkr/consumer.rb', line 32 def register_handler(handler) @handlers << handler end |
Instance Method Details
#backoff_time(attempt) ⇒ Object
91 92 93 |
# File 'lib/kafkr/consumer.rb', line 91 def backoff_time(attempt) [fibonacci(attempt), fibonacci(5)].min end |
#fibonacci(n) ⇒ Object
87 88 89 |
# File 'lib/kafkr/consumer.rb', line 87 def fibonacci(n) (n <= 1) ? n : fibonacci(n - 1) + fibonacci(n - 2) end |
#listen ⇒ Object Also known as: consume, receive, connect, monitor, observe
128 129 130 131 132 133 134 135 136 |
# File 'lib/kafkr/consumer.rb', line 128 def listen attempt = 0 loop do listen_for("dummy", ->(msg) { }) do || Kafkr.log ">> #{message}" dispatch_to_handlers() end end end |
#listen_for(message, send_message) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/kafkr/consumer.rb', line 99 def listen_for(, ) attempt = 0 begin socket = TCPSocket.new(@host, @port) attempt = 0 Timeout.timeout(Kafkr::Consumer.configuration.timeout) do sync_uid = .call() loop do = socket.gets raise LostConnection if .nil? = Kafkr::Encryptor.new.decrypt(.chomp) payload = yield , sync_uid if block_given? return payload end end rescue Timeout::Error, LostConnection, Errno::ECONNREFUSED attempt += 1 wait_time = backoff_time(attempt) sleep(wait_time) retry rescue Interrupt Kafkr.log "Received interrupt signal. Shutting down consumer gracefully..." socket&.close exit(0) end end |
#valid_class_name?(name) ⇒ Boolean
95 96 97 |
# File 'lib/kafkr/consumer.rb', line 95 def valid_class_name?(name) /^[A-Z]\w*$/.match?(name) end |
#valid_json?(json) ⇒ Boolean
138 139 140 141 142 143 |
# File 'lib/kafkr/consumer.rb', line 138 def valid_json?(json) JSON.parse(json) true rescue JSON::ParserError false end |