Skip to content

Flink: Implement Flink InputFormat and integrate it to FlinkCatalog #1275

@JingsongLi

Description

@JingsongLi

The final user case can be:

-- In Flink SQL client

CREATE CATALOG iceberg WITH('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/tmp/warehouse_path');
USE CATALOG iceberg;

-- Create a "default.my_table" Iceberg Hadoop table in "/tmp/warehouse_path".
-- CREATE TABLE my_table (a INT, b STRING, c STRING) PARTITIONED BY(c) WITH('type'='iceberg'); -- Not supported now.
-- INSERT INTO my_table VALUES (1, '1', '1'), (1, '2', '2'); -- Not supported now.

-- Support:
SELECT * FROM my_table; -- SELECT
SELECT a, c FROM my_table; -- projection push down
SELECT * FROM my_table WHERE c='2'; -- filter push down
SELECT * FROM my_table LIMIT 1; -- limit push down

The implementations can be:

  • Implement FlinkInputFormat: implement SplitGenerator and RowDataReader.
  • Integrate it to Catalog: Implement FlinkTableFactory and FlinkTableSource.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions