Class: Tros::IO::DatumReader
- Inherits:
-
Object
- Object
- Tros::IO::DatumReader
- Defined in:
- lib/tros/io.rb
Instance Attribute Summary collapse
-
#readers_schema ⇒ Object
Returns the value of attribute readers_schema.
-
#writers_schema ⇒ Object
Returns the value of attribute writers_schema.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader
constructor
A new instance of DatumReader.
- #read(decoder) ⇒ Object
- #read_array(writers_schema, readers_schema, decoder) ⇒ Object
- #read_data(writers_schema, readers_schema, decoder) ⇒ Object
- #read_default_value(field_schema, default_value) ⇒ Object
- #read_enum(writers_schema, readers_schema, decoder) ⇒ Object
- #read_fixed(writers_schema, readers_schema, decoder) ⇒ Object
- #read_map(writers_schema, readers_schema, decoder) ⇒ Object
- #read_record(writers_schema, readers_schema, decoder) ⇒ Object
- #read_union(writers_schema, readers_schema, decoder) ⇒ Object
- #skip_array(writers_schema, decoder) ⇒ Object
- #skip_data(writers_schema, decoder) ⇒ Object
- #skip_enum(writers_schema, decoder) ⇒ Object
- #skip_fixed(writers_schema, decoder) ⇒ Object
- #skip_map(writers_schema, decoder) ⇒ Object
- #skip_record(writers_schema, decoder) ⇒ Object
- #skip_union(writers_schema, decoder) ⇒ Object
Constructor Details
#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader
Returns a new instance of DatumReader.
263 264 265 266 |
# File 'lib/tros/io.rb', line 263 def initialize(writers_schema=nil, readers_schema=nil) @writers_schema = writers_schema @readers_schema = readers_schema end |
Instance Attribute Details
#readers_schema ⇒ Object
Returns the value of attribute readers_schema.
261 262 263 |
# File 'lib/tros/io.rb', line 261 def readers_schema @readers_schema end |
#writers_schema ⇒ Object
Returns the value of attribute writers_schema.
261 262 263 |
# File 'lib/tros/io.rb', line 261 def writers_schema @writers_schema end |
Class Method Details
.match_schemas(writers_schema, readers_schema) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/tros/io.rb', line 218 def self.match_schemas(writers_schema, readers_schema) w_type = writers_schema.type_sym r_type = readers_schema.type_sym # This conditional is begging for some OO love. if w_type == :union || r_type == :union return true end if w_type == r_type return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type) case r_type when :record return writers_schema.fullname == readers_schema.fullname when :error return writers_schema.fullname == readers_schema.fullname when :request return true when :fixed return writers_schema.fullname == readers_schema.fullname && writers_schema.size == readers_schema.size when :enum return writers_schema.fullname == readers_schema.fullname when :map return writers_schema.values.type == readers_schema.values.type when :array return writers_schema.items.type == readers_schema.items.type end end # Handle schema promotion if w_type == :int && [:long, :float, :double].include?(r_type) return true elsif w_type == :long && [:float, :double].include?(r_type) return true elsif w_type == :float && r_type == :double return true end return false end |
Instance Method Details
#read(decoder) ⇒ Object
268 269 270 271 |
# File 'lib/tros/io.rb', line 268 def read(decoder) self.readers_schema = writers_schema unless readers_schema read_data(writers_schema, readers_schema, decoder) end |
#read_array(writers_schema, readers_schema, decoder) ⇒ Object
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/tros/io.rb', line 328 def read_array(writers_schema, readers_schema, decoder) read_items = [] block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count block_size = decoder.read_long end block_count.times do read_items << read_data(writers_schema.items, readers_schema.items, decoder) end block_count = decoder.read_long end read_items end |
#read_data(writers_schema, readers_schema, decoder) ⇒ Object
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/tros/io.rb', line 273 def read_data(writers_schema, readers_schema, decoder) # schema matching unless self.class.match_schemas(writers_schema, readers_schema) raise SchemaMatchException.new(writers_schema, readers_schema) end # schema resolution: reader's schema is a union, writer's # schema is not if writers_schema.type_sym != :union && readers_schema.type_sym == :union rs = readers_schema.schemas.find{|s| self.class.match_schemas(writers_schema, s) } return read_data(writers_schema, rs, decoder) if rs raise SchemaMatchException.new(writers_schema, readers_schema) end # function dispatch for reading data based on type of writer's # schema case writers_schema.type_sym when :null; decoder.read_null when :boolean; decoder.read_boolean when :string; decoder.read_string when :int; decoder.read_int when :long; decoder.read_long when :float; decoder.read_float when :double; decoder.read_double when :bytes; decoder.read_bytes when :fixed; read_fixed(writers_schema, readers_schema, decoder) when :enum; read_enum(writers_schema, readers_schema, decoder) when :array; read_array(writers_schema, readers_schema, decoder) when :map; read_map(writers_schema, readers_schema, decoder) when :union; read_union(writers_schema, readers_schema, decoder) when :record, :error, :request; read_record(writers_schema, readers_schema, decoder) else raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" end end |
#read_default_value(field_schema, default_value) ⇒ Object
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 |
# File 'lib/tros/io.rb', line 404 def read_default_value(field_schema, default_value) # Basically a JSON Decoder? case field_schema.type_sym when :null return nil when :boolean return default_value when :int, :long return Integer(default_value) when :float, :double return Float(default_value) when :enum, :fixed, :string, :bytes return default_value when :array read_array = [] default_value.each do |json_val| item_val = read_default_value(field_schema.items, json_val) read_array << item_val end return read_array when :map read_map = {} default_value.each do |key, json_val| map_val = read_default_value(field_schema.values, json_val) read_map[key] = map_val end return read_map when :union return read_default_value(field_schema.schemas[0], default_value) when :record, :error read_record = {} field_schema.fields.each do |field| json_val = default_value[field.name] json_val = field.default unless json_val field_val = read_default_value(field.type, json_val) read_record[field.name] = field_val end return read_record else fail_msg = "Unknown type: #{field_schema.type}" raise AvroError, fail_msg end end |
#read_enum(writers_schema, readers_schema, decoder) ⇒ Object
315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/tros/io.rb', line 315 def read_enum(writers_schema, readers_schema, decoder) index_of_symbol = decoder.read_int read_symbol = writers_schema.symbols[index_of_symbol] # TODO(jmhodges): figure out what unset means for resolution # schema resolution unless readers_schema.symbols.include?(read_symbol) # 'unset' here end read_symbol end |
#read_fixed(writers_schema, readers_schema, decoder) ⇒ Object
311 312 313 |
# File 'lib/tros/io.rb', line 311 def read_fixed(writers_schema, readers_schema, decoder) decoder.read(writers_schema.size) end |
#read_map(writers_schema, readers_schema, decoder) ⇒ Object
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/tros/io.rb', line 347 def read_map(writers_schema, readers_schema, decoder) read_items = {} block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count block_size = decoder.read_long end block_count.times do key = decoder.read_string read_items[key] = read_data(writers_schema.values, readers_schema.values, decoder) end block_count = decoder.read_long end read_items end |
#read_record(writers_schema, readers_schema, decoder) ⇒ Object
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/tros/io.rb', line 374 def read_record(writers_schema, readers_schema, decoder) readers_fields_hash = readers_schema.fields_hash read_record = {} writers_schema.fields.each do |field| if readers_field = readers_fields_hash[field.name] field_val = read_data(field.type, readers_field.type, decoder) read_record[field.name] = field_val else skip_data(field.type, decoder) end end # fill in the default values if readers_fields_hash.size > read_record.size writers_fields_hash = writers_schema.fields_hash readers_fields_hash.each do |field_name, field| unless writers_fields_hash.has_key? field_name if !field.default.nil? field_val = read_default_value(field.type, field.default) read_record[field.name] = field_val else # FIXME(jmhodges) another 'unset' here end end end end read_record end |
#read_union(writers_schema, readers_schema, decoder) ⇒ Object
367 368 369 370 371 372 |
# File 'lib/tros/io.rb', line 367 def read_union(writers_schema, readers_schema, decoder) index_of_schema = decoder.read_long selected_writers_schema = writers_schema.schemas[index_of_schema] read_data(selected_writers_schema, readers_schema, decoder) end |
#skip_array(writers_schema, decoder) ⇒ Object
496 497 498 |
# File 'lib/tros/io.rb', line 496 def skip_array(writers_schema, decoder) skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } end |
#skip_data(writers_schema, decoder) ⇒ Object
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 |
# File 'lib/tros/io.rb', line 448 def skip_data(writers_schema, decoder) case writers_schema.type_sym when :null decoder.skip_null when :boolean decoder.skip_boolean when :string decoder.skip_string when :int decoder.skip_int when :long decoder.skip_long when :float decoder.skip_float when :double decoder.skip_double when :bytes decoder.skip_bytes when :fixed skip_fixed(writers_schema, decoder) when :enum skip_enum(writers_schema, decoder) when :array skip_array(writers_schema, decoder) when :map skip_map(writers_schema, decoder) when :union skip_union(writers_schema, decoder) when :record, :error, :request skip_record(writers_schema, decoder) else raise AvroError, "Unknown schema type: #{writers_schema.type}" end end |
#skip_enum(writers_schema, decoder) ⇒ Object
487 488 489 |
# File 'lib/tros/io.rb', line 487 def skip_enum(writers_schema, decoder) decoder.skip_int end |
#skip_fixed(writers_schema, decoder) ⇒ Object
483 484 485 |
# File 'lib/tros/io.rb', line 483 def skip_fixed(writers_schema, decoder) decoder.skip(writers_schema.size) end |
#skip_map(writers_schema, decoder) ⇒ Object
500 501 502 503 504 505 |
# File 'lib/tros/io.rb', line 500 def skip_map(writers_schema, decoder) skip_blocks(decoder) { decoder.skip_string skip_data(writers_schema.values, decoder) } end |
#skip_record(writers_schema, decoder) ⇒ Object
507 508 509 |
# File 'lib/tros/io.rb', line 507 def skip_record(writers_schema, decoder) writers_schema.fields.each{|f| skip_data(f.type, decoder) } end |
#skip_union(writers_schema, decoder) ⇒ Object
491 492 493 494 |
# File 'lib/tros/io.rb', line 491 def skip_union(writers_schema, decoder) index = decoder.read_long skip_data(writers_schema.schemas[index], decoder) end |