7 min read

Getting started with Iceberg using Python & DuckDB

Getting started with Iceberg has never been easier! With a few lines of Python and we can query our Iceberg tables with DuckDB and convert them to pandas dataframes!
Getting started with Iceberg using Python & DuckDB
Photo by Jeremy Bishop / Unsplash

Introduction

The purpose of this article is to get you started with Iceberg using Python. We'll start with a quick introduction to Iceberg & table formats in general, then we'll get started with a simple example using Python & DuckDB to query Iceberg tables.

from the minio blog

Let's start with the basics


First of all, we need data. One of the most common places to store data is an object storage system like S3 or GCS. If there is some organization to how the files are stored, we can even go as far as calling this a data lake! Having all of our data available in a data lake is a great start, but definitely not enough for modern analytical use cases.


Data Lakes by themselves are great for the storage layer, but they do not provide any consistency guarantees or schema information that we would expect from a more structured data store. To help with this problem, we can use a table format!

What is a table format?

A table format is a specification for how to store data in a collection of files. The files are usually stored in an object store like S3 or GCS. The table format specification defines how to store data in the files, how to store metadata about the files, and how to store metadata about the table itself.


Think of a table format as a protocol for storing data in a collection of files. The files themselves are the same .csv or .parquet files you're used to, but instead of loading them into a data warehouse like Redshift or Snowflake, you define a structure over them using a table format.


Table formats have been around for a long time, but Iceberg and Delta Lake have been gaining popularity lately as two of the most prominent contenders, with Hudi coming in as a close second. The differences between them are very interesting,
but not the focus of this article, maybe a future one!

Why Iceberg?

Iceberg is a table format designed for huge analytic datasets. It offers a clean SQL interface for querying data, full schema evolution, time travel, and rollback, among other killer features.

For an exhaustive comparison of table formats like Iceberg and Delta Lake, check out the benchmark analysis by Brooklyn Data Co.


Now that we have a basic understanding of the tools we'll be using, let's get started!

Getting started with Iceberg using Python


Let's get started! Our goal is to spin up a local Iceberg table using Python and DuckDB. We'll use MinIO as our object storage system, spark to ingest our data, and DuckDB to query the table through the pyiceberg library.

The Data Lake

First of all, we'll need a Data Lake to store our data in. We'll use MinIO, an open-source S3-compatible object storage system. We'll use Docker to run MinIO, but you can also run it locally if you'd like. I've prepared a docker-compose.yml file with all the necessary services for the demo for convenience.

The MinIO section looks like this:

  minio:
    hostname: "minio"
    image: "minio/minio:latest"
    container_name: "minio"
    ports:
      - "9001:9001"
      - "9000:9000"
    command:
      - "server"
      - "/data"
      - "--console-address"
      - ":9001"
    volumes:
      - "minio:/data"
    environment:
      MINIO_ROOT_USER: "minio"
      MINIO_ROOT_PASSWORD: "minio123"
  mc:
    depends_on:
      - "minio"
    image: "minio/mc"
    container_name: "mc"
    entrypoint: >
      /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000
      minio minio123) do echo "...waiting..." && sleep 1; done; /usr/bin/mc rm
      -r --force minio/nyc-taxi-trips; /usr/bin/mc mb minio/nyc-taxi-trips;
      /usr/bin/mc policy set public minio/nyc-taxi-trips; exit 0; "


We'll use the `mc` container to create a bucket in MinIO that we can use to store our data. We'll map the `/data/` folder in the MinIO container to a volume so that we can access the data later on. For the demo, I included one `.parquet` file with some data from the NYC Taxi Trips dataset.

The Iceberg table

Now that we have some raw data in our Data Lake, we can start creating our Iceberg table. We'll use the iceberg-spark image to run a small pyspark script that will create the table for us.

The script is basically just two lines of code:

df = spark.read.parquet("/home/iceberg/warehouse/yellow_tripdata_2022-01.parquet")
df.write.saveAsTable("nyctaxi3.trips", format="iceberg")


The first line reads the data from the parquet file and the second line creates the Iceberg table.


To verify that the table was created successfully, we can head over to the MinIO console and see that there is a new folder called `metadata` in the bucket, next to our `data` folder. The `metadata` folder contains all the metadata
about the table, including the schema, the partitioning information, and the location of the data files.


We can even peek into the `<uuid>.metadata.json` file to get an idea of how Iceberg works under the hood.


A short explanation of the different parts of an Iceberg table straight from the docs:


Snapshot


A snapshot is the state of a table at some time.
Each snapshot lists all of the data files that make up the table’s contents at the time of the snapshot. Data files are stored across multiple manifest files, and the manifests for a snapshot are listed in a single manifest list file.


Manifest list


A manifest list is a metadata file that lists the manifests that make up a table snapshot. Each manifest file in the manifest list is stored with information about its contents, like partition value ranges, used to speed up metadata operations.


Manifest file


A manifest file is a metadata file that lists a subset of data files that make up a snapshot. Each data file in a manifest is stored with a partition tuple, column-level stats, and summary information used to prune splits during scan planning.


Partition spec


A partition spec is a description of how to partition data in a table. A spec consists of a list of source columns and transforms. A transform produces a partition value from a source value. For example, date(ts) produces the date associated with a timestamp column named ts.

Partition tuple


A partition tuple is a tuple or struct of partition data stored with each data file.
All values in a partition tuple are the same for all rows stored in a data file. Partition tuples are produced by transforming values from row data using a partition spec. Iceberg stores partition values unmodified, unlike Hive tables
that convert values to and from strings in file system paths and keys.


Snapshot log (history table)

The snapshot log is a metadata log of how the table’s current snapshot has changed over time. The log is a list of timestamp and ID pairs: when the current snapshot changed and the snapshot ID the current snapshot was changed to. The snapshot log is stored in table metadata as snapshot-log.

And now, finally,

Time to query!


Alright, now that we got all that out of the way, let’s query the table! Iceberg provides a REST catalog to interact with the tables, which we can use through the pyiceberg library to extract data. The REST catalog is a useful tool that abstracts away the actual catalog implementation.


The relevant `docker-compose.yml` snippet is:

  rest:
    hostname: "iceberg"
    image: "tabulario/iceberg-rest:latest"
    container_name: "iceberg"
    ports:
      - "8181:8181"
    environment:
      CATALOG_S3_ENDPOINT: "http://minio:9000"
      CATALOG_IO__IMPL: "org.apache.iceberg.aws.s3.S3FileIO"
      CATALOG_WAREHOUSE: "s3a://nyc-taxi-trips"
      AWS_REGION: "us-east-1"
      AWS_ACCESS_KEY_ID: "minio"
      AWS_SECRET_ACCESS_KEY: "minio123"


The REST catalog is configured to use the S3FileIO implementation, which is the Iceberg implementation for reading and writing data from S3. The S3FileIO implementation requires the AWS credentials to be passed as environment variables, which in our case are the MinIO credentials.

After our REST catalog is up and running, we can use the pyiceberg library to query the table, but first, we need to configure it. This can be easily done by creating a .pyiceberg.yaml file in our home directory with the following
contents:

catalog:
  default:
    uri:  http://0.0.0.0:8181/
    s3.endpoint: http://localhost:9000


The uri is the REST catalog endpoint, and the `s3.endpoint` is the S3 endpoint that the REST catalog will use to interact with the S3 bucket.


Now we can finally query the table! We will use the `pyiceberg` library to do so. The most basic example looks like
this:

from pyiceberg.catalog import load_catalog

os.environ["AWS_ACCESS_KEY_ID"] ="minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minio123"

CATALOG = load_catalog("default")

table = CATALOG.load_table((namespace, table_name))

print(table.describe())


This will print the table schema so we can verify that all fields are present, with the correct datatypes!


table {
 1: VendorID: optional long
 2: tpep_pickup_datetime: optional timestamptz
 3: tpep_dropoff_datetime: optional timestamptz
 4: passenger_count: optional double
 5: trip_distance: optional double
 6: RatecodeID: optional double
 7: store_and_fwd_flag: optional string
 8: PULocationID: optional long
 9: DOLocationID: optional long
 10: payment_type: optional long
 11: fare_amount: optional double
 12: extra: optional double
 13: mta_tax: optional double
 14: tip_amount: optional double
 15: tolls_amount: optional double
 16: improvement_surcharge: optional double
 17: total_amount: optional double
 18: congestion_surcharge: optional double
 19: airport_fee: optional double
}


Looks good, now let's take it a step further by running some analytical queries. By creating a table scan we can easily transform the results into an in-memory DuckDB connection.

con = table.scan().to_duckdb(table_name="trips")


Using this connection we are free to run any SQL query we want! As a bonus, we can convert the results to pandas dataframes with just calling `df()` on the result.

df = con.execute("SELECT * FROM distant_taxi_trips").df()

print(df.head(4))

This will print the first 4 rows of the table:

   VendorID      tpep_pickup_datetime  ... congestion_surcharge  airport_fee
0         1 2022-01-01 00:35:40+00:00  ...                  2.5          0.0
1         1 2022-01-01 00:33:43+00:00  ...                  0.0          0.0
2         2 2022-01-01 00:53:21+00:00  ...                  0.0          0.0
3         2 2022-01-01 00:25:21+00:00  ...                  2.5          0.0

[4 rows x 19 columns]


See how easy that was!? Even with DuckDB, we can easily query the table but converting the results to a pandas dataframe opens up a whole new world of possibilities. We can now use any of the pandas functions to analyze the data or even use it to train a machine learning model!


Conclusion


In this post, we have seen how to use Iceberg to create a table, and how to query it using the REST catalog and the pyiceberg library. We have also seen how to use the pyiceberg library to convert the results of a query into a pandas dataframe.

I hope you can see how easy it is to use Iceberg to create tables and query them. I find that it's not easy to wrap your head around the concepts of Iceberg, especially without playing around with it - but once you do, it's a breeze to use.