Introduction

Using Apache Iceberg with Flink typically requires the following steps:

  • Including the iceberg-flink-runtime JAR on Flink classpath (FLINK_HOME/lib)
  • Downloading the Hadoop common libraries and setting HADOOP_HOME

Working with Catalogs

Catalogs consideration in Apache Iceberg Catalogs are applicable to Flink too. The syntax is slightly different and catalogs are typically created via the SQL API:

CREATE CATALOG local_catalog WITH (
	'type'='iceberg',
	'catalog-type'='hadoop',
	'warehouse'='hdfs://nn:8020/warehouse/path'
);

Querying the metadata

Metadata tables can be accessed by appending a $ followed by the metadata table name in Flink SQL and can help understanding the health (i.e. orphan files) in the table. History, metadata logs and snapshots are example of metadata tables that can be queried in Flink

History

The history metadata table allows you to view the evolution of your table over time, including whether any transactions were rolled back: SELECT * FROM catalog.database.table$history;

Metadata logs

Metadata logs keep track of all the metadata files, including information such as the latest_snapshot_id and latest_schema_id, as well as the timestamp. SELECT * FROM catalog.database.table$metadata_log_entries;

Snapshots

The snapshots table provide information such as the number of added or deleted records after any write operation and the Flink job ID SELECT * FROM catalog.database.table$snapshots;