Introduction
Using Apache Iceberg with Flink typically requires the following steps:
- Including the
iceberg-flink-runtimeJAR on Flink classpath (FLINK_HOME/lib) - Downloading the Hadoop common libraries and setting
HADOOP_HOME
Working with Catalogs
Catalogs consideration in Apache Iceberg Catalogs are applicable to Flink too. The syntax is slightly different and catalogs are typically created via the SQL API:
CREATE CATALOG local_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://nn:8020/warehouse/path'
);Querying the metadata
Metadata tables can be accessed by appending a $ followed by the metadata table name in Flink SQL and can help understanding the health (i.e. orphan files) in the table. History, metadata logs and snapshots are example of metadata tables that can be queried in Flink
History
The history metadata table allows you to view the evolution of your table over time, including whether any transactions were rolled back:
SELECT * FROM catalog.database.table$history;
Metadata logs
Metadata logs keep track of all the metadata files, including information such as the
latest_snapshot_id and latest_schema_id, as well as the timestamp.
SELECT * FROM catalog.database.table$metadata_log_entries;
Snapshots
The snapshots table provide information such as the number of added or deleted records after any write operation and the Flink job ID
SELECT * FROM catalog.database.table$snapshots;