Flink Queries
Iceberg support streaming and batch read With Apache Flink's DataStream API and Table API.
Reading with SQL
Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from streaming
to batch
, and vice versa:
-- Execute the flink job in streaming mode for current session context
SET execution.runtime-mode = streaming;
-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
Flink batch read
Submit a Flink batch job using the following sentences:
-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM sample;
Flink streaming read
Iceberg supports processing incremental data in Flink streaming jobs which starts from a historical snapshot-id:
-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;
-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;
-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
There are some options that could be set in Flink SQL hint options for streaming job, see read options for details.
FLIP-27 source for SQL
Here are the SQL settings for the FLIP-27 source. All other SQL settings and options documented above are applicable to the FLIP-27 source.
Reading branches and tags with SQL
Branch and tags can be read via SQL by specifying options. For more details refer to Flink Configuration
--- Read from branch b1
SELECT * FROM table /*+ OPTIONS('branch'='b1') */ ;
--- Read from tag t1
SELECT * FROM table /*+ OPTIONS('tag'='t1') */;
--- Incremental scan from tag t1 to tag t2
SELECT * FROM table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='t1', 'end-tag'='t2') */;
Reading with DataStream
Iceberg support streaming or batch read in Java API now.
Batch Read
This example will read all records from iceberg table and then print to the stdout console in flink batch job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(false)
.build();
// Print all records to stdout.
batch.print();
// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");
Streaming read
This example will read incremental records which start from snapshot-id '3821550127947089987' and print to stdout console in flink streaming job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.startSnapshotId(3821550127947089987L)
.build();
// Print all records to stdout.
stream.print();
// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
There are other options that can be set, please see the FlinkSource#Builder.
Reading with DataStream (FLIP-27 source)
FLIP-27 source interface
was introduced in Flink 1.12. It aims to solve several shortcomings of the old SourceFunction
streaming source interface. It also unifies the source interfaces for both batch and streaming executions.
Most source connectors (like Kafka, file) in Flink repo have migrated to the FLIP-27 interface.
Flink is planning to deprecate the old SourceFunction
interface in the near future.
A FLIP-27 based Flink IcebergSource
is added in iceberg-flink
module. The FLIP-27 IcebergSource
is currently an experimental feature.
Batch Read
This example will read all records from iceberg table and then print to the stdout console in flink batch job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
IcebergSource<RowData> source = IcebergSource.forRowData()
.tableLoader(tableLoader)
.assignerFactory(new SimpleSplitAssignerFactory())
.build();
DataStream<RowData> batch = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"My Iceberg Source",
TypeInformation.of(RowData.class));
// Print all records to stdout.
batch.print();
// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");
Streaming read
This example will start the streaming read from the latest table snapshot (inclusive). Every 60s, it polls Iceberg table to discover new append-only snapshots. CDC read is not supported yet.
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
IcebergSource source = IcebergSource.forRowData()
.tableLoader(tableLoader)
.assignerFactory(new SimpleSplitAssignerFactory())
.streaming(true)
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
.monitorInterval(Duration.ofSeconds(60))
.build()
DataStream<RowData> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"My Iceberg Source",
TypeInformation.of(RowData.class));
// Print all records to stdout.
stream.print();
// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
There are other options that could be set by Java API, please see the IcebergSource#Builder.
Reading branches and tags with DataStream
Branches and tags can also be read via the DataStream API
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
// Read from branch
DataStream<RowData> batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.branch("test-branch")
.streaming(false)
.build();
// Read from tag
DataStream<RowData> batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.tag("test-tag")
.streaming(false)
.build();
// Streaming read from start-tag
DataStream<RowData> batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.startTag("test-tag")
.build();
Read as Avro GenericRecord
FLIP-27 Iceberg source provides AvroGenericRecordReaderFunction
that converts
Flink RowData
Avro GenericRecord
. You can use the convert to read from
Iceberg table as Avro GenericRecord DataStream.
Please make sure flink-avro
jar is included in the classpath.
Also iceberg-flink-runtime
shaded bundle jar can't be used
because the runtime jar shades the avro package.
Please use non-shaded iceberg-flink
jar instead.
TableLoader tableLoader = ...;
Table table;
try (TableLoader loader = tableLoader) {
loader.open();
table = loader.loadTable();
}
AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table);
IcebergSource<GenericRecord> source =
IcebergSource.<GenericRecord>builder()
.tableLoader(tableLoader)
.readerFunction(readerFunction)
.assignerFactory(new SimpleSplitAssignerFactory())
...
.build();
DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema));
Options
Read options
Flink read options are passed when configuring the Flink IcebergSource:
IcebergSource.forRowData()
.tableLoader(TableLoader.fromCatalog(...))
.assignerFactory(new SimpleSplitAssignerFactory())
.streaming(true)
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
.startSnapshotId(3821550127947089987L)
.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
.build()
For Flink SQL, read options can be passed in via SQL hints like this:
Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
Check out all the options here: read-options
Inspecting tables
To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables.
Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table
is read using db.table$history
.
History
To show table history:
made_current_at | snapshot_id | parent_id | is_current_ancestor |
---|---|---|---|
2019-02-08 03:29:51.215 | 5781947118336215154 | NULL | true |
2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true |
2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false |
2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true |
2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true |
2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true |
Info
This shows a commit that was rolled back. In this example, snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was rolled back and is not an ancestor of the current table state.
Metadata Log Entries
To show table metadata log entries:
timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number |
---|---|---|---|---|
2022-07-28 10:43:52.93 | s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json | null | null | null |
2022-07-28 10:43:57.487 | s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json | 170260833677645300 | 0 | 1 |
2022-07-28 10:43:58.25 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 |
Snapshots
To show the valid snapshots for a table:
committed_at | snapshot_id | parent_id | operation | manifest_list | summary |
---|---|---|---|---|---|
2019-02-08 03:29:51.215 | 57897183625154 | null | append | s3://.../table/metadata/snap-57897183625154-1.avro | { added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, flink.job-id -> 2e274eecb503d85369fb390e8956c813 } |
You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:
select
h.made_current_at,
s.operation,
h.snapshot_id,
h.is_current_ancestor,
s.summary['flink.job-id']
from prod.db.table$history h
join prod.db.table$snapshots s
on h.snapshot_id = s.snapshot_id
order by made_current_at
made_current_at | operation | snapshot_id | is_current_ancestor | summary[flink.job-id] |
---|---|---|---|---|
2019-02-08 03:29:51.215 | append | 57897183625154 | true | 2e274eecb503d85369fb390e8956c813 |
Files
To show a table's current data files:
content | file_path | file_format | spec_id | partition | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | sort_order_id |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 01} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> c] | [1 -> , 2 -> c] | null | [4] | null | null |
0 | s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 02} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> b] | [1 -> , 2 -> b] | null | [4] | null | null |
0 | s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 03} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> a] | [1 -> , 2 -> a] | null | [4] | null | null |
Manifests
To show a table's current file manifests:
path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partition_summaries |
---|---|---|---|---|---|---|---|
s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro | 4479 | 0 | 6668963634911763636 | 8 | 0 | 0 | [[false,null,2019-05-13,2019-05-15]] |
Note:
- Fields within
partition_summaries
column of the manifests table correspond tofield_summary
structs within manifest list, with the following order:contains_null
contains_nan
lower_bound
upper_bound
contains_nan
could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, wherecontains_nan
is not populated.
Partitions
To show a table's current partitions:
partition | record_count | file_count | spec_id |
---|---|---|---|
{20211001, 11} | 1 | 1 | 0 |
{20211002, 11} | 1 | 1 | 0 |
{20211001, 10} | 1 | 1 | 0 |
{20211002, 10} | 1 | 1 | 0 |
Note: For unpartitioned tables, the partitions table will contain only the record_count and file_count columns.
All Metadata Tables
These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.
Danger
The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.
All Data Files
To show all of the table's data files and each file's metadata:
content | file_path | file_format | partition | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | sort_order_id |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | s3://.../dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca475e7879-00001.parquet | PARQUET | {20210102} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210102} | {1 -> 2, 2 -> 20210102} | null | [4] | null | 0 |
0 | s3://.../dt=20210103/00000-0-26222098-032f-472b-8ea5-651a55b21210-00001.parquet | PARQUET | {20210103} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210103} | {1 -> 3, 2 -> 20210103} | null | [4] | null | 0 |
0 | s3://.../dt=20210104/00000-0-a3bb1927-88eb-4f1c-bc6e-19076b0d952e-00001.parquet | PARQUET | {20210104} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210104} | {1 -> 3, 2 -> 20210104} | null | [4] | null | 0 |
All Manifests
To show all of the table's manifest files:
path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partition_summaries |
---|---|---|---|---|---|---|---|
s3://.../metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro | 6376 | 0 | 6272782676904868561 | 2 | 0 | 0 | [{false, false, 20210101, 20210101}] |
Note:
- Fields within
partition_summaries
column of the manifests table correspond tofield_summary
structs within manifest list, with the following order:contains_null
contains_nan
lower_bound
upper_bound
contains_nan
could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, wherecontains_nan
is not populated.
References
To show a table's known snapshot references:
name | type | snapshot_id | max_reference_age_in_ms | min_snapshots_to_keep | max_snapshot_age_in_ms |
---|---|---|---|---|---|
main | BRANCH | 4686954189838128572 | 10 | 20 | 30 |
testTag | TAG | 4686954189838128572 | 10 | null | null |