Class: Avrodrome::Adaptor

Inherits:
Object
  • Object
show all
Defined in:
lib/avrodrome/adaptor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger: Avrodrome.logger, registry: Avrodrome::Registry.new) ⇒ Adaptor

Returns a new instance of Adaptor.



5
6
7
8
# File 'lib/avrodrome/adaptor.rb', line 5

def initialize(logger: Avrodrome.logger, registry: Avrodrome::Registry.new)
  @logger   = logger
  @registry = registry
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



3
4
5
# File 'lib/avrodrome/adaptor.rb', line 3

def logger
  @logger
end

#registryObject (readonly)

Returns the value of attribute registry.



3
4
5
# File 'lib/avrodrome/adaptor.rb', line 3

def registry
  @registry
end

Instance Method Details

#check(subject_name, schema) ⇒ Object

Check if a schema exists. Returns nil if not found.



45
46
47
48
49
# File 'lib/avrodrome/adaptor.rb', line 45

def check(subject_name, schema)
  subject = registry.subject(subject_name)
  return errors[:subject_not_found] unless subject
  subject.check(schema)&.to_hash
end

#compatible?(subject, schema, version = 'latest') ⇒ Boolean

AC: Assume short lived in memory store, no historic schemas for now

Check if a schema is compatible with the stored version. Returns:

  • true if compatible

  • nil if the subject or version does not exist

  • false if incompatible

docs.confluent.io/3.1.2/schema-registry/docs/api.html#compatibility

Returns:

  • (Boolean)


59
60
61
62
63
64
65
# File 'lib/avrodrome/adaptor.rb', line 59

def compatible?(subject, schema, version = 'latest')
  true
  # data = post("/compatibility/subjects/#{subject}/versions/#{version}",
  #             expects: [200, 404],
  #             body: { schema: schema.to_s }.to_json)
  # data.fetch('is_compatible', false) unless data.has_key?('error_code')
end

#fetch(id) ⇒ Object



10
11
12
13
# File 'lib/avrodrome/adaptor.rb', line 10

def fetch(id)
  logger.info "Fetching schema with id #{id}"
  registry.find(id)&.body&.to_s || errors[:not_found]
end

#global_configObject

Get global config



68
69
70
# File 'lib/avrodrome/adaptor.rb', line 68

def global_config
  @gloabl_config ||= { "compatibilityLevel" => "BACKWARD" }
end

#register(subject, schema) ⇒ Object



15
16
17
18
19
20
21
22
# File 'lib/avrodrome/adaptor.rb', line 15

def register(subject, schema)
  id = registry.subject(subject)&.check(schema)&.id
  return id if id

  registered_schema = registry.register!(subject, schema)
  logger.info "Registered schema for subject `#{subject}`; id = #{registered_schema.id}"
  registered_schema.id
end

#subject_config(subject) ⇒ Object

Get config for subject



78
79
80
81
82
# File 'lib/avrodrome/adaptor.rb', line 78

def subject_config(subject)
  subject = registry.subject(subject)
  return errors[:subject_not_found] unless subject
  subject.config
end

#subject_version(subject_name, version = 'latest') ⇒ Object

Get a specific version for a subject



37
38
39
40
41
42
# File 'lib/avrodrome/adaptor.rb', line 37

def subject_version(subject_name, version = 'latest')
  subject = registry.subject(subject_name)
  return errors[:subject_not_found] unless subject
  version = nil if version == 'latest'
  subject.version(version) || errors[:version_not_found]
end

#subject_versions(subject_name) ⇒ Object

List all versions for a subject



30
31
32
33
34
# File 'lib/avrodrome/adaptor.rb', line 30

def subject_versions(subject_name)
  subject = registry.subject(subject_name)
  return errors[:subject_not_found] unless subject
  subject.schemas.map(&:version)
end

#subjectsObject

List all subjects



25
26
27
# File 'lib/avrodrome/adaptor.rb', line 25

def subjects
  registry.subjects.map(&:name)
end

#update_global_config(config) ⇒ Object

Update global config



73
74
75
# File 'lib/avrodrome/adaptor.rb', line 73

def update_global_config(config)
  global_config.merge!(config)
end

#update_subject_config(subject, config) ⇒ Object

Update config for subject



85
86
87
88
89
# File 'lib/avrodrome/adaptor.rb', line 85

def update_subject_config(subject, config)
  subject = registry.subject(subject)
  return errors[:subject_not_found] unless subject
  subject.config.merge!(config)
end