Class: Kafkr::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafkr/consumer.rb

Defined Under Namespace

Classes: Handler

Constant Summary collapse

HANDLERS_DIRECTORY =
"./handlers"

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = Consumer.configuration.host, port = Consumer.configuration.port) ⇒ Consumer

Returns a new instance of Consumer.



82
83
84
85
# File 'lib/kafkr/consumer.rb', line 82

def initialize(host = Consumer.configuration.host, port = Consumer.configuration.port)
  @host = host
  @port = port
end

Class Attribute Details

.handlersObject (readonly)

Returns the value of attribute handlers.



16
17
18
# File 'lib/kafkr/consumer.rb', line 16

def handlers
  @handlers
end

Class Method Details

.configurationObject



18
19
20
21
22
23
24
25
26
# File 'lib/kafkr/consumer.rb', line 18

def configuration
  FileUtils.mkdir_p "./.kafkr"
  @configuration ||= OpenStruct.new
  @configuration.host = ENV.fetch("KAFKR_HOST", "localhost")
  @configuration.port = ENV.fetch("KAFKR_PORT", 4000)
  @configuration.timeout = ENV.fetch("KAFKR_CONSUMER_TIMEOUT", 300) 
  @configuration.suggest_handlers = false
  @configuration
end

.configure {|configuration| ... } ⇒ Object

Yields:



28
29
30
# File 'lib/kafkr/consumer.rb', line 28

def configure
  yield(configuration) if block_given?
end

.list_registered_handlersObject



39
40
41
42
# File 'lib/kafkr/consumer.rb', line 39

def list_registered_handlers
  Kafkr.log "Registered handlers:"
  $loaded_handlers.keys.each { |handler| Kafkr.log "- #{handler}" }
end

.load_handlers(directory = "./handlers") ⇒ Object



44
45
46
47
48
49
50
51
52
53
# File 'lib/kafkr/consumer.rb', line 44

def load_handlers(directory = "./handlers")
  Dir.glob("#{directory}/*.rb").each do |file|
    handler_name = File.basename(file, ".rb")
    unless $loaded_handlers[handler_name]
      require file
      $loaded_handlers[handler_name] = true
      $handlers_changed = true
    end
  end
end

.register_handler(handler) ⇒ Object



32
33
34
# File 'lib/kafkr/consumer.rb', line 32

def register_handler(handler)
  @handlers << handler
end

Instance Method Details

#backoff_time(attempt) ⇒ Object



91
92
93
# File 'lib/kafkr/consumer.rb', line 91

def backoff_time(attempt)
  [fibonacci(attempt), fibonacci(5)].min
end

#fibonacci(n) ⇒ Object



87
88
89
# File 'lib/kafkr/consumer.rb', line 87

def fibonacci(n)
  (n <= 1) ? n : fibonacci(n - 1) + fibonacci(n - 2)
end

#listenObject Also known as: consume, receive, connect, monitor, observe



128
129
130
131
132
133
134
135
136
# File 'lib/kafkr/consumer.rb', line 128

def listen
  attempt = 0
  loop do
    listen_for("dummy", ->(msg) {  }) do |message|
      Kafkr.log ">> #{message}"
      dispatch_to_handlers(message)
    end
  end
end

#listen_for(message, send_message) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/kafkr/consumer.rb', line 99

def listen_for(message, send_message)
  attempt = 0
  begin
    socket = TCPSocket.new(@host, @port)
    attempt = 0

    Timeout.timeout(Kafkr::Consumer.configuration.timeout) do
      sync_uid = send_message.call(message)

      loop do
        received_message = socket.gets
        raise LostConnection if received_message.nil?
        received_message = Kafkr::Encryptor.new.decrypt(received_message.chomp)
        payload = yield received_message, sync_uid if block_given?
        return payload
      end
    end
  rescue Timeout::Error, LostConnection, Errno::ECONNREFUSED
    attempt += 1
    wait_time = backoff_time(attempt)
    sleep(wait_time)
    retry
  rescue Interrupt
    Kafkr.log "Received interrupt signal. Shutting down consumer gracefully..."
    socket&.close
    exit(0)
  end
end

#valid_class_name?(name) ⇒ Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/kafkr/consumer.rb', line 95

def valid_class_name?(name)
  /^[A-Z]\w*$/.match?(name)
end

#valid_json?(json) ⇒ Boolean

Returns:

  • (Boolean)


138
139
140
141
142
143
# File 'lib/kafkr/consumer.rb', line 138

def valid_json?(json)
  JSON.parse(json)
  true
rescue JSON::ParserError
  false
end