quickavro examples¶
Reading an avro file¶
quickavro receives the schema from Avro file header so there is no need to specify the schema when using quickavro.FileReader.
import quickavro
with quickavro.FileReader("example.avro") as reader:
for record in reader.records():
print(record)
Writing an avro file¶
import quickavro
records = [
{"name": "Larry", "age": 21},
{"name": "Gary", "age": 34},
{"name": "Barry", "age": 27},
{"name": "Dark Larry", "age": 1134},
{"name": "Larry of the Void", "age": None},
]
with quickavro.FileWriter("example.avro") as writer:
writer.schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]}
]
}
for record in records:
writer.write_record(record)
Reading an avro file with BinaryEncoder¶
The quickavro.BinaryEncoder can also be used directly for reading and writing Avro files directly for extra flexibility.
import quickavro
with quickavro.BinaryEncoder() as encoder:
encoder.schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]}
]
}
with open("example.avro", "rb") as f:
data = f.read()
header, data = encoder.read_header(data)
for record in encoder.read_blocks(data):
print(record)
Writing an avro file with BinaryEncoder¶
import quickavro
records = [
{"name": "Larry", "age": 21},
{"name": "Gary", "age": 34},
{"name": "Barry", "age": 27},
{"name": "Dark Larry", "age": 1134},
{"name": "Larry of the Void", "age": None},
]
with quickavro.BinaryEncoder() as encoder:
encoder.schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]}
]
}
with open("example.avro", "w") as f:
f.write(encoder.header)
for block in encoder.write_blocks(records):
f.write(block)
Using a deflate/snappy codec¶
import quickavro
records = [
{"name": "Larry", "age": 21},
{"name": "Gary", "age": 34},
{"name": "Barry", "age": 27},
{"name": "Dark Larry", "age": 1134},
{"name": "Larry of the Void", "age": None},
]
with quickavro.BinaryEncoder(codec="deflate") as encoder:
encoder.schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]}
]
}
with open("example.avro", "w") as f:
f.write(encoder.header)
for block in encoder.write_blocks(records):
f.write(block)
with quickavro.BinaryEncoder(codec="snappy") as encoder:
encoder.schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]}
]
}
with open("example.avro", "w") as f:
f.write(encoder.header)
for block in encoder.write_blocks(records):
f.write(block)
Without context handling¶
When not using context handling with quickavro.FileWriter, blocks must be created manually by calling quickavro.FileWriter.flush() and then finally call quickavro.FileWriter.close() when finished:
with open(avro_file, 'w') as f:
writer = quickavro.FileWriter(f)
writer.schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["int", "null"]}
]
}
for record in records:
if writer.block_size >= quickavro.DEFAULT_SYNC_INTERVAL:
# This ensures that blocks of records are created
# correctly.
writer.flush()
writer.write_record(record)
# This ensures that any records left in the current block are
# written.
writer.close()