Class: Prefab::SSEConfigClient
- Inherits:
-
Object
- Object
- Prefab::SSEConfigClient
- Defined in:
- lib/prefab/sse_config_client.rb
Defined Under Namespace
Classes: Options
Constant Summary collapse
- AUTH_USER =
'authuser'
- LOG =
Prefab::InternalLogger.new(self)
Instance Method Summary collapse
- #close ⇒ Object
- #connect(&load_configs) ⇒ Object
- #headers ⇒ Object
-
#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient
constructor
A new instance of SSEConfigClient.
- #source ⇒ Object
- #start(&load_configs) ⇒ Object
Constructor Details
#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient
Returns a new instance of SSEConfigClient.
26 27 28 29 30 31 32 |
# File 'lib/prefab/sse_config_client.rb', line 26 def initialize(, config_loader, = nil, logger = nil) @prefab_options = @options = || Options.new @config_loader = config_loader @connected = false @logger = logger || LOG end |
Instance Method Details
#close ⇒ Object
34 35 36 37 |
# File 'lib/prefab/sse_config_client.rb', line 34 def close @retry_thread&.kill @client&.close end |
#connect(&load_configs) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/prefab/sse_config_client.rb', line 66 def connect(&load_configs) url = "#{source}/api/v1/sse/config" @logger.debug "SSE Streaming Connect to #{url} start_at #{@config_loader.highwater_mark}" SSE::Client.new(url, headers: headers, read_timeout: @options.sse_read_timeout, reconnect_time: @options.sse_default_reconnect_time, logger: Prefab::InternalLogger.new(SSE::Client)) do |client| client.on_event do |event| configs = PrefabProto::Configs.decode(Base64.decode64(event.data)) load_configs.call(configs, event, :sse) end client.on_error do |error| @logger.error "SSE Streaming Error: #{error.inspect} for url #{url}" if @options.errors_to_close_connection.any? { |klass| error.is_a?(klass) } @logger.debug "Closing SSE connection for url #{url}" client.close end end end end |
#headers ⇒ Object
91 92 93 94 95 96 97 98 99 100 |
# File 'lib/prefab/sse_config_client.rb', line 91 def headers auth = "#{AUTH_USER}:#{@prefab_options.api_key}" auth_string = Base64.strict_encode64(auth) return { 'x-prefab-start-at-id' => @config_loader.highwater_mark, 'Authorization' => "Basic #{auth_string}", 'Accept' => 'text/event-stream', 'X-PrefabCloud-Client-Version' => "prefab-cloud-ruby-#{Prefab::VERSION}" } end |
#source ⇒ Object
102 103 104 105 106 107 108 109 110 |
# File 'lib/prefab/sse_config_client.rb', line 102 def source @source_index = @source_index.nil? ? 0 : @source_index + 1 if @source_index >= @prefab_options.sse_sources.size @source_index = 0 end return @prefab_options.sse_sources[@source_index].sub(/(belt|suspenders)\./, 'stream.') end |
#start(&load_configs) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/prefab/sse_config_client.rb', line 39 def start(&load_configs) if @prefab_options.sse_sources.empty? @logger.debug 'No SSE sources configured' return end @client = connect(&load_configs) closed_count = 0 @retry_thread = Thread.new do loop do sleep @options.sleep_delay_for_new_connection_check if @client.closed? closed_count += @options.sleep_delay_for_new_connection_check if closed_count > @options.seconds_between_new_connection closed_count = 0 @logger.debug 'Reconnecting SSE client' @client = connect(&load_configs) end end end end end |