Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1457]Support computed columns and watermark in Flink DDL #2239

Merged
merged 27 commits into from
Nov 27, 2023

Conversation

xieyi888
Copy link
Contributor

@xieyi888 xieyi888 commented Nov 2, 2023

Why are the changes needed?

Close #1457 .

Brief change log

  1. suppoort serialized compute columns and watermarks into arctic table properties
    compute columns and watermark in arctic properties like below
    image

  2. support deserializing compute columns and watermarks into flink tableSchema

  3. Only convert flink physicalColums into arctic(iceberg) schema during create table, compute columns and watermarks should be serialized into arctic table properties

  4. when get table(write and read), deserializing compute columns and watermarks into flink TableSchema ,and combine with flink TableSchema convert by arctic(iceberg) schema

  5. only support one watermark strategy

  6. not support alter properties for compute columns and watermark strategy now.

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before making a pull request

Local test includes as below:
flink 1.15, mix_iceberg
1.1 create arctic table with compute columns and watermark under flink arctic catalog.

1.2 read and write table with compute columns.

        tableEnv.executeSql("CREATE CATALOG arctic_test WITH ( " +
                "  'type'='arctic', " +
                "  'metastore.url' = 'thrift://****:1260/mix_iceberg'  " + 
                ")");

       tableEnv.executeSql("CREATE TABLE arctic_test.iceberg_test_db.arctic_catalog_mix_iceberg_column2_orc (  " +
               "                     `id` int,    " +
               "                      `cal_id` as id+5, "  +
               "                       proc as PROCTIME() " +
               ") ");

       tableEnv.executeSql("CREATE TABLE gen_order (  " +
               "id INT " +
               ") " +
               "with( " +
               "'connector' = 'datagen',  " +
               "'rows-per-second'='1',  " +
               "'fields.id.kind'='random',  " +
               "'fields.id.min'='1',  " +
               "'fields.id.max'='1000') ");

       tableEnv.executeSql("INSERT INTO  arctic_test.iceberg_test_db.arctic_catalog_mix_iceberg_column2_orc       " +
               "                      select id          " +
               "                   from gen_order where id<10");

        tableEnv.sqlQuery("select * from arctic_test.iceberg_test_db.arctic_catalog_mix_iceberg_column2_orc")
                .execute().print();

1.3 read arctic table and do lookup join with mysql dimension table

        tableEnv.executeSql("CREATE CATALOG arctic_test WITH ( " +
                "  'type'='arctic', " +
                "  'metastore.url' = 'thrift://****:1260/mix_iceberg'  " + 
                ")");

        tableEnv.executeSql("CREATE TABLE dim_table(    " +
                "id bigint,    " +
                "name string,    " +
                "PRIMARY KEY(id) NOT ENFORCED    " +
                ") WITH (    " +
                "  'connector' = 'jdbc',    " +
                "  'url' = 'jdbc:mysql://****:3306/sloth_learning',    " +
                "  'table-name' = 'dim_table',    " +
                "  'username' = '****',    " +
                "  'password' = '****'    " +
                ")");

        tableEnv.sqlQuery("select t1.id, t1.cal_id,t1.proc,t2.name from     " +
                "arctic_test.iceberg_test_db.arctic_catalog_mix_iceberg_column2_orc t1    " +
                "JOIN dim_table FOR SYSTEM_TIME AS OF t1.proc as t2 on t1.id=t2.id")
                .execute().print();

2.1. create arctic table with compute columns under flink default catalog. read with compute columns.

        tableEnv.executeSql(" CREATE TABLE flink_catalog_mix_iceberg_column1_orc (         " +
                "                                      id bigint,           " +
                "                                      `cal_id` as id+5,     " +
                "                                      proc as PROCTIME()    " +
                "                                      " +
                "                 ) WITH (         " +
                "                     'connector' = 'arctic',         " +
                "                     'metastore.url' = 'thrift://****:1260/mix_iceberg',         " +
                "                     'arctic.catalog' = 'mix_iceberg',         " +
                "                     'arctic.database' = 'iceberg_test_db',         " +
                "                     'arctic.table' = 'flink_catalog_mix_iceberg_column1_orc'         " +
                "                 )");


        tableEnv.sqlQuery("select * from flink_catalog_mix_iceberg_column1_orc")
                .execute().print();

2.2 read arctic table and do lookup join with mysql dimension table

        tableEnv.executeSql(" CREATE TABLE flink_catalog_mix_iceberg_column1_orc (        " +
                "                                      id bigint,          " +
                "                                      `cal_id` as id+5,    " +
                "                                      proc as PROCTIME()   " +
                "                                     " +
                "                 ) WITH (        " +
                "                     'connector' = 'arctic',        " +
                "                     'metastore.url' = 'thrift://****:1260/mix_iceberg',        " +
                "                     'arctic.catalog' = 'mix_iceberg',        " +
                "                     'arctic.database' = 'iceberg_test_db',        " +
                "                     'arctic.table' = 'flink_catalog_mix_iceberg_column1_orc'        " +
                "                 )");

        tableEnv.executeSql("CREATE TABLE dim_table(   " +
                "id bigint,   " +
                "name string,   " +
                "PRIMARY KEY(id) NOT ENFORCED   " +
                ") WITH (   " +
                "  'connector' = 'jdbc',   " +
                "  'url' = 'jdbc:mysql://****:3306/****',   " +
                "  'table-name' = 'dim_table',   " +
                "  'username' = '****',   " +
                "  'password' = '****'   " +
                ")");

        tableEnv.sqlQuery("select t1.id, t1.cal_id,t1.proc,t2.name from    " +
                "flink_catalog_mix_iceberg_column1_orc t1   " +
                "JOIN dim_table FOR SYSTEM_TIME AS OF t1.proc as t2 on t1.id=t2.id")
                .execute().print();

Documentation

  • Does this pull request introduce a new feature? (yes / no)
    yes
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    docs

@github-actions github-actions bot added type:docs Improvements or additions to documentation module:mixed-flink Flink moduel for Mixed Format labels Nov 2, 2023
@xieyi888 xieyi888 changed the title supoort_colume and watermark for flink1.15 [AMORO-1457]Support computed columns in Flink DDL Nov 2, 2023
@xieyi888
Copy link
Contributor Author

xieyi888 commented Nov 3, 2023

some unit test failed, I'll try to fix it

@YesOrNo828
Copy link
Contributor

YesOrNo828 commented Nov 3, 2023

@xieyi888 there are some UTs failed about the amoro's schema fetching, could you kindly have a time to take a look?

TestJoin.testLookupJoin:221->FlinkTestBase.exec:203->FlinkTestBase.exec:207 » IllegalArgument Field op_time not found in source schema
TestJoin.testLookupJoinWithPartialFields:316->FlinkTestBase.exec:203->FlinkTestBase.exec:207 » IndexOutOfBounds Index: 0, Size: 0
TestJoin.testRightEmptyLookupJoin:130->FlinkTestBase.exec:203->FlinkTestBase.exec:207 » IllegalArgument Field op_time not found in source schema

@xieyi888 xieyi888 force-pushed the AMORO_1457_support_compute branch from 8a41de6 to 579ddac Compare November 3, 2023 08:12
Copy link

codecov bot commented Nov 3, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (be005ee) 52.65% compared to head (a539a9b) 53.02%.
Report is 1 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2239      +/-   ##
============================================
+ Coverage     52.65%   53.02%   +0.37%     
+ Complexity     4219     3632     -587     
============================================
  Files           511      465      -46     
  Lines         29358    24650    -4708     
  Branches       2853     2340     -513     
============================================
- Hits          15457    13070    -2387     
+ Misses        12653    10560    -2093     
+ Partials       1248     1020     -228     
Flag Coverage Δ
core 53.02% <ø> (+0.03%) ⬆️
trino ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@xieyi888
Copy link
Contributor Author

xieyi888 commented Nov 3, 2023

@xieyi888 there are some UTs failed about the amoro's schema fetching, could you kindly have a time to take a look?

TestJoin.testLookupJoin:221->FlinkTestBase.exec:203->FlinkTestBase.exec:207 » IllegalArgument Field op_time not found in source schema TestJoin.testLookupJoinWithPartialFields:316->FlinkTestBase.exec:203->FlinkTestBase.exec:207 » IndexOutOfBounds Index: 0, Size: 0 TestJoin.testRightEmptyLookupJoin:130->FlinkTestBase.exec:203->FlinkTestBase.exec:207 » IllegalArgument Field op_time not found in source schema

Thanks , I had separate function getPhysicalSchemaForDimTable when dim_table.enabled=true. TestJoin can work normally

@xieyi888 xieyi888 changed the title [AMORO-1457]Support computed columns in Flink DDL [AMORO-1457]Support computed columns in Flink DDL and watermarks Nov 5, 2023
@xieyi888 xieyi888 changed the title [AMORO-1457]Support computed columns in Flink DDL and watermarks [AMORO-1457]Support computed columns and watermark in Flink DDL Nov 5, 2023
Yi Xie added 3 commits November 6, 2023 10:27
# Conflicts:
#	flink/v1.15/flink/src/main/java/com/netease/arctic/flink/FlinkSchemaUtil.java
#	flink/v1.15/flink/src/main/java/com/netease/arctic/flink/catalog/ArcticCatalog.java
#	flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/DynamicTableFactory.java
#	flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java
#	flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/FlinkSink.java
#	flink/v1.15/flink/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java
@xieyi888
Copy link
Contributor Author

xieyi888 commented Nov 6, 2023

@YesOrNo828 @zhoujinsong Could you please review this pr?

Copy link
Contributor

@YesOrNo828 YesOrNo828 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xieyi888 Thanks for the PR, I left some comments, please kindly have a look.

Copy link
Contributor

@zhoujinsong zhoujinsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xieyi888 Thanks a lot for your work!
I left some comments. Please take another look.

@zhoujinsong
Copy link
Contributor

zhoujinsong commented Nov 7, 2023

BTW, I'm curious about the property design of compute columns. The index in the property key doesn't seem to have a specific use. Would a design like this be better for users to understand these properties, such as:

flink.compute-column.cal_id.data_type=INT, 
flink.compute-column.cal_id.expr=`id`+5

HDYT? @xieyi888 @YesOrNo828

Yi Xie added 3 commits November 8, 2023 17:47
# Conflicts:
#	flink/flink-common/src/test/java/com/netease/arctic/flink/catalog/TestCatalog.java
@xieyi888 xieyi888 force-pushed the AMORO_1457_support_compute branch from 4882f10 to 2d8bd55 Compare November 8, 2023 10:00
@xieyi888
Copy link
Contributor Author

xieyi888 commented Nov 8, 2023

BTW, I'm curious about the property design of compute columns. The index in the property key doesn't seem to have a specific use. Would a design like this be better for users to understand these properties, such as:

flink.compute-column.cal_id.data_type=INT, 
flink.compute-column.cal_id.expr=`id`+5

HDYT? @xieyi888 @YesOrNo828

The index is depedended on flink TableSchema index. It just to specify the different columns. Although It seems not very convenient for users to understand the properties. It's better for serialize/deserialize When the field name contains some exception string. HDYT? @YesOrNo828

@zhoujinsong
Copy link
Contributor

It's better for serialize/deserialize When the field name contains some exception string.

Yes, Including column names in property names can indeed introduce some special characters.

So perhaps we can still use an index to differentiate different compute columns. The index would only count the number of compute columns, starting from 1, and we would maintain this order when reassembling the schema. This might make it easier for users to understand.

@YesOrNo828
Copy link
Contributor

I agree with using the index to determine the order of fields(including physical and computed columns) instead of the field name.

There would be a data type mismatching exception. If we only use an index to differentiate different compute columns, not include physical columns.
For example:

CREATE TABLE amoro.db.T ( 
  id int,
  id2 as id + 2,
  id3 as id * 2,
  name string);

INSERT INTO R
SELECT * FROM T;

The query of select * from T would return the schema: id, name, id2, id3. But this is a disorder with the original table R's data types(int, int, int, name).

@xieyi888
Copy link
Contributor Author

xieyi888 commented Nov 9, 2023

I agree with using the index to determine the order of fields(including physical and computed columns) instead of the field name.

There would be a data type mismatching exception. If we only use an index to differentiate different compute columns, not include physical columns. For example:

CREATE TABLE amoro.db.T ( 
  id int,
  id2 as id + 2,
  id3 as id * 2,
  name string);

INSERT INTO R
SELECT * FROM T;

The query of select * from T would return the schema: id, name, id2, id3. But this is a disorder with the original table R's data types(int, int, int, name).

I agree with it.
so Maybe we can design as below, and make somn documents for users.

  1. when create table with flink DDL, We serialized table schema into table properties.
  2. We serialized columns(expect physical column) into table properties as below, ${index} means the column index(order) in table.
flink.schema.column.${index}.name= **
flink.schema.column.${index}.data-type=**

if it's computed column, we also serialized the expresion

flink.schema.column.${index}.expr=**

if it's computed column, we also serialized metadata and virtual

flink.schema.column.${index}.metadata=**
flink.schema.column.${index}.virtual=**
  1. We also serialized watermark into table properties as below if table created with watermark
flink.schema.watermark.rowtime=**
flink.schema.watermark.strategy.expr=**
flink.schema.watermark.strategy.data-type=**

HDYT @YesOrNo828 @zhoujinsong

@xieyi888
Copy link
Contributor Author

It's better for serialize/deserialize When the field name contains some exception string.

Yes, Including column names in property names can indeed introduce some special characters.

So perhaps we can still use an index to differentiate different compute columns. The index would only count the number of compute columns, starting from 1, and we would maintain this order when reassembling the schema. This might make it easier for users to understand.

if physical column was deleted by spark or other engine, property for compute column would be destroed and leading to deserializing compute column inaccuracy, we deside to make compute column listed after all physical columns and validate column order during create table

@xieyi888
Copy link
Contributor Author

It's better for serialize/deserialize When the field name contains some exception string.

Yes, Including column names in property names can indeed introduce some special characters.
So perhaps we can still use an index to differentiate different compute columns. The index would only count the number of compute columns, starting from 1, and we would maintain this order when reassembling the schema. This might make it easier for users to understand.

if physical column was deleted by spark or other engine, property for compute column would be destroed and leading to deserializing compute column inaccuracy, we deside to make compute column listed after all physical columns and validate column order during create table

done

Copy link
Contributor

@zhoujinsong zhoujinsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments, please task another look.

@CLAassistant
Copy link

CLAassistant commented Nov 22, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
2 out of 3 committers have signed the CLA.

✅ xieyi888
✅ zhoujinsong
❌ Yi Xie


Yi Xie seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
0 out of 3 committers have signed the CLA.

❌ Yi Xie
❌ xieyi888
❌ zhoujinsong


Yi Xie seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Contributor

@zhoujinsong zhoujinsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Thanks a lot for your contribution!

@zhoujinsong zhoujinsong merged commit f11336f into apache:master Nov 27, 2023
5 of 6 checks passed
ShawHee pushed a commit to ShawHee/arctic that referenced this pull request Dec 29, 2023
…he#2239)

* supoort_colume and watermark for flink1.15

* separate getPhysicalSchemaForDimTable when dim_table.enabled=true

* rm ut for flink1.15

* change for review comments

* change for master conflict

* fix checkstyle

* change compute index start from1

* change method name

* change doc for computr column

* fix check style

* fix for comments

---------

Co-authored-by: Yi Xie <xieyi01@rd.netease.com>
Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
@zhoujinsong zhoujinsong mentioned this pull request Jun 25, 2024
66 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
module:mixed-flink Flink moduel for Mixed Format type:docs Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improvement]: Support computed columns for mixed-format tables in Flink DDL
4 participants