Class: Prefab::SSEConfigClient

Inherits:
Object
  • Object
show all
Defined in:
lib/prefab/sse_config_client.rb

Defined Under Namespace

Classes: Options

Constant Summary collapse

AUTH_USER =
'authuser'
LOG =
Prefab::InternalLogger.new(self)

Instance Method Summary collapse

Constructor Details

#initialize(prefab_options, config_loader, options = nil, logger = nil) ⇒ SSEConfigClient

Returns a new instance of SSEConfigClient.



26
27
28
29
30
31
32
# File 'lib/prefab/sse_config_client.rb', line 26

def initialize(prefab_options, config_loader, options = nil, logger = nil)
  @prefab_options = prefab_options
  @options = options || Options.new
  @config_loader = config_loader
  @connected = false
  @logger = logger || LOG
end

Instance Method Details

#closeObject



34
35
36
37
# File 'lib/prefab/sse_config_client.rb', line 34

def close
  @retry_thread&.kill
  @client&.close
end

#connect(&load_configs) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/prefab/sse_config_client.rb', line 66

def connect(&load_configs)
  url = "#{source}/api/v1/sse/config"
  @logger.debug "SSE Streaming Connect to #{url} start_at #{@config_loader.highwater_mark}"

  SSE::Client.new(url,
                  headers: headers,
                  read_timeout: @options.sse_read_timeout,
                  reconnect_time: @options.sse_default_reconnect_time,
                  logger: Prefab::InternalLogger.new(SSE::Client)) do |client|
    client.on_event do |event|
      configs = PrefabProto::Configs.decode(Base64.decode64(event.data))
      load_configs.call(configs, event, :sse)
    end

    client.on_error do |error|
      @logger.error "SSE Streaming Error: #{error.inspect} for url #{url}"

      if @options.errors_to_close_connection.any? { |klass| error.is_a?(klass) }
        @logger.debug "Closing SSE connection for url #{url}"
        client.close
      end
    end
  end
end

#headersObject



91
92
93
94
95
96
97
98
99
100
# File 'lib/prefab/sse_config_client.rb', line 91

def headers
  auth = "#{AUTH_USER}:#{@prefab_options.api_key}"
  auth_string = Base64.strict_encode64(auth)
  return {
    'x-prefab-start-at-id' => @config_loader.highwater_mark,
    'Authorization' => "Basic #{auth_string}",
    'Accept' => 'text/event-stream',
    'X-PrefabCloud-Client-Version' => "prefab-cloud-ruby-#{Prefab::VERSION}"
  }
end

#sourceObject



102
103
104
105
106
107
108
109
110
# File 'lib/prefab/sse_config_client.rb', line 102

def source
  @source_index = @source_index.nil? ? 0 : @source_index + 1

  if @source_index >= @prefab_options.sse_sources.size
    @source_index = 0
  end

  return @prefab_options.sse_sources[@source_index].sub(/(belt|suspenders)\./, 'stream.')
end

#start(&load_configs) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/prefab/sse_config_client.rb', line 39

def start(&load_configs)
  if @prefab_options.sse_sources.empty?
    @logger.debug 'No SSE sources configured'
    return
  end

  @client = connect(&load_configs)

  closed_count = 0

  @retry_thread = Thread.new do
    loop do
      sleep @options.sleep_delay_for_new_connection_check

      if @client.closed?
        closed_count += @options.sleep_delay_for_new_connection_check

        if closed_count > @options.seconds_between_new_connection
          closed_count = 0
          @logger.debug 'Reconnecting SSE client'
          @client = connect(&load_configs)
        end
      end
    end
  end
end