Class: Avro::DataFile::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/data_file.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(writer, datum_writer, writers_schema = nil, codec = nil, meta = {}) ⇒ Writer

Returns a new instance of Writer.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/avro/data_file.rb', line 97

def initialize(writer, datum_writer, writers_schema=nil, codec=nil, meta={})
  # If writers_schema is not present, presume we're appending
  @writer = writer
  @encoder = IO::BinaryEncoder.new(@writer)
  @datum_writer = datum_writer
  @meta = meta
  @buffer_writer = StringIO.new(+'', 'w')
  @buffer_writer.set_encoding('BINARY') if @buffer_writer.respond_to?(:set_encoding)
  @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer)
  @block_count = 0

  if writers_schema
    @sync_marker = Writer.generate_sync_marker
    @codec = DataFile.get_codec(codec)
    @meta['avro.codec'] = @codec.codec_name.to_s
    @meta['avro.schema'] = writers_schema.to_s
    datum_writer.writers_schema = writers_schema
    write_header
  else
    # open writer for reading to collect metadata
    dfr = Reader.new(writer, Avro::IO::DatumReader.new)

    # FIXME(jmhodges): collect arbitrary metadata
    # collect metadata
    @sync_marker = dfr.sync_marker
    @meta['avro.codec'] = dfr.meta['avro.codec']
    @codec = DataFile.get_codec(meta['avro.codec'])

    # get schema used to write existing file
    schema_from_file = dfr.meta['avro.schema']
    @meta['avro.schema'] = schema_from_file
    datum_writer.writers_schema = Schema.parse(schema_from_file)

    # seek to the end of the file and prepare for writing
    writer.seek(0,2)
  end
end

Instance Attribute Details

#block_countObject

Returns the value of attribute block_count.



95
96
97
# File 'lib/avro/data_file.rb', line 95

def block_count
  @block_count
end

#buffer_encoderObject (readonly)

Returns the value of attribute buffer_encoder.



94
95
96
# File 'lib/avro/data_file.rb', line 94

def buffer_encoder
  @buffer_encoder
end

#buffer_writerObject (readonly)

Returns the value of attribute buffer_writer.



94
95
96
# File 'lib/avro/data_file.rb', line 94

def buffer_writer
  @buffer_writer
end

#codecObject (readonly)

Returns the value of attribute codec.



94
95
96
# File 'lib/avro/data_file.rb', line 94

def codec
  @codec
end

#datum_writerObject (readonly)

Returns the value of attribute datum_writer.



94
95
96
# File 'lib/avro/data_file.rb', line 94

def datum_writer
  @datum_writer
end

#encoderObject (readonly)

Returns the value of attribute encoder.



94
95
96
# File 'lib/avro/data_file.rb', line 94

def encoder
  @encoder
end

#metaObject (readonly)

Returns the value of attribute meta.



94
95
96
# File 'lib/avro/data_file.rb', line 94

def meta
  @meta
end

#sync_markerObject (readonly)

Returns the value of attribute sync_marker.



94
95
96
# File 'lib/avro/data_file.rb', line 94

def sync_marker
  @sync_marker
end

#writerObject (readonly)

Returns the value of attribute writer.



94
95
96
# File 'lib/avro/data_file.rb', line 94

def writer
  @writer
end

Class Method Details

.generate_sync_markerObject



90
91
92
# File 'lib/avro/data_file.rb', line 90

def self.generate_sync_marker
  OpenSSL::Random.random_bytes(16)
end

Instance Method Details

#<<(datum) ⇒ Object

Append a datum to the file



136
137
138
139
140
141
142
143
144
145
# File 'lib/avro/data_file.rb', line 136

def <<(datum)
  datum_writer.write(datum, buffer_encoder)
  self.block_count += 1

  # if the data to write is larger than the sync interval, write
  # the block
  if buffer_writer.tell >= SYNC_INTERVAL
    write_block
  end
end

#closeObject



161
162
163
164
# File 'lib/avro/data_file.rb', line 161

def close
  flush
  writer.close
end

#flushObject

Flush the current state of the file, including metadata



156
157
158
159
# File 'lib/avro/data_file.rb', line 156

def flush
  write_block
  writer.flush
end

#syncObject

Return the current position as a value that may be passed to DataFileReader.seek(long). Forces the end of the current block, emitting a synchronization marker.



150
151
152
153
# File 'lib/avro/data_file.rb', line 150

def sync
  write_block
  writer.tell
end