quickavro examples

Reading an avro file

quickavro receives the schema from Avro file header so there is no need to specify the schema when using quickavro.FileReader.

import quickavro

with quickavro.FileReader("example.avro") as reader:
    for record in reader.records():
        print(record)

Writing an avro file

import quickavro

records = [
    {"name": "Larry", "age": 21},
    {"name": "Gary", "age": 34},
    {"name": "Barry", "age": 27},
    {"name": "Dark Larry", "age": 1134},
    {"name": "Larry of the Void", "age": None},
]

with quickavro.FileWriter("example.avro") as writer:
    writer.schema = {
      "type": "record",
      "name": "Person",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "age",  "type": ["int", "null"]}
      ]
    }
    for record in records:
        writer.write_record(record)

Reading an avro file with BinaryEncoder

The quickavro.BinaryEncoder can also be used directly for reading and writing Avro files directly for extra flexibility.

import quickavro

with quickavro.BinaryEncoder() as encoder:
    encoder.schema = {
      "type": "record",
      "name": "Person",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "age",  "type": ["int", "null"]}
      ]
    }
    with open("example.avro", "rb") as f:
        data = f.read()

    header, data = encoder.read_header(data)

    for record in encoder.read_blocks(data):
        print(record)

Writing an avro file with BinaryEncoder

import quickavro

records = [
    {"name": "Larry", "age": 21},
    {"name": "Gary", "age": 34},
    {"name": "Barry", "age": 27},
    {"name": "Dark Larry", "age": 1134},
    {"name": "Larry of the Void", "age": None},
]

with quickavro.BinaryEncoder() as encoder:
    encoder.schema = {
      "type": "record",
      "name": "Person",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "age",  "type": ["int", "null"]}
      ]
    }
    with open("example.avro", "w") as f:
        f.write(encoder.header)
        for block in encoder.write_blocks(records):
            f.write(block)

Using a deflate/snappy codec

import quickavro

records = [
    {"name": "Larry", "age": 21},
    {"name": "Gary", "age": 34},
    {"name": "Barry", "age": 27},
    {"name": "Dark Larry", "age": 1134},
    {"name": "Larry of the Void", "age": None},
]

with quickavro.BinaryEncoder(codec="deflate") as encoder:
    encoder.schema = {
      "type": "record",
      "name": "Person",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "age",  "type": ["int", "null"]}
      ]
    }
    with open("example.avro", "w") as f:
        f.write(encoder.header)
        for block in encoder.write_blocks(records):
            f.write(block)

with quickavro.BinaryEncoder(codec="snappy") as encoder:
    encoder.schema = {
      "type": "record",
      "name": "Person",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "age",  "type": ["int", "null"]}
      ]
    }
    with open("example.avro", "w") as f:
        f.write(encoder.header)
        for block in encoder.write_blocks(records):
            f.write(block)

Without context handling

When not using context handling with quickavro.FileWriter, blocks must be created manually by calling quickavro.FileWriter.flush() and then finally call quickavro.FileWriter.close() when finished:

with open(avro_file, 'w') as f:
    writer = quickavro.FileWriter(f)
    writer.schema = {
      "type": "record",
      "name": "Person",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "age",  "type": ["int", "null"]}
      ]
    }
    for record in records:
        if writer.block_size >= quickavro.DEFAULT_SYNC_INTERVAL:
            # This ensures that blocks of records are created
            # correctly.
            writer.flush()
        writer.write_record(record)
    # This ensures that any records left in the current block are
    # written.
    writer.close()