Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1035][Flink] Support customizing mixed-format table source parallelism #1973

Merged
merged 2 commits into from
Jan 22, 2024

Conversation

YesOrNo828
Copy link
Contributor

@YesOrNo828 YesOrNo828 commented Sep 13, 2023

Why are the changes needed?

Support customizing the parallelism of the filestore/logstore source via Flink SQL.

Close #1035 .

Brief change log

  • update flink modules,
  • update the filestore and logstore table source

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before making a pull request

Documentation

  • Does this pull request introduce a new feature? (yes / no) yes
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) docs

@github-actions github-actions bot added type:docs Improvements or additions to documentation module:mixed-flink Flink moduel for Mixed Format labels Sep 13, 2023
@codecov
Copy link

codecov bot commented Sep 13, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (bd58d1f) 32.64% compared to head (75b00ef) 30.76%.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1973      +/-   ##
============================================
- Coverage     32.64%   30.76%   -1.88%     
+ Complexity     4482     3896     -586     
============================================
  Files           599      553      -46     
  Lines         50321    45613    -4708     
  Branches       6691     6178     -513     
============================================
- Hits          16426    14035    -2391     
+ Misses        32578    30489    -2089     
+ Partials       1317     1089     -228     
Flag Coverage Δ
core 30.76% <ø> (+<0.01%) ⬆️
trino ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@lklhdu lklhdu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@huyuanfeng2018
Copy link
Contributor

Can you show the rendering of flink webui after the change? As far as I know, Flinksql has not yet supported custom source parallelism. In flinksql, all operators use the parallelism of the previous operator as the current parallelism, so After I customized the source.parallelism , I think the parallelism of subsequent tasks will not run according to the default parallelism of the task.

@YesOrNo828
Copy link
Contributor Author

YesOrNo828 commented Sep 14, 2023

Can you show the rendering of flink webui after the change? As far as I know, Flinksql has not yet supported custom source parallelism. In flinksql, all operators use the parallelism of the previous operator as the current parallelism, so After I customized the source.parallelism , I think the parallelism of subsequent tasks will not run according to the default parallelism of the task.

@huyuanfeng2018 Thanks for your comment. Actually we could modify the source operater's parallelism through the Table API. In the ScanTableSource.getScanRuntimeProvider() method, we could modify the DataStreamSource's parallelism.

image
select * from 
log_data /*+OPTIONS('arctic.read.mode'='log','properties.group.id'='......','parallelism'='5' )*/ --The job parallelism is 10.

Specify source parallelism with SQL hint

@huyuanfeng2018
Copy link
Contributor

Can you show the rendering of flink webui after the change? As far as I know, Flinksql has not yet supported custom source parallelism. In flinksql, all operators use the parallelism of the previous operator as the current parallelism, so After I customized the source.parallelism , I think the parallelism of subsequent tasks will not run according to the default parallelism of the task.

@huyuanfeng2018 Thanks for your comment. Actually we could modify the source operater's parallelism through the Table API. In the ScanTableSource.getScanRuntimeProvider() method, we could modify the DataStreamSource's parallelism.

image ```sql select * from log_data /*+OPTIONS('arctic.read.mode'='log','properties.group.id'='......','parallelism'='5' )*/ --The job parallelism is 10. ```

Specify source parallelism with SQL hint

What I am confused about is how the subsequent task changed back to 10 after the scan parallelism was changed? Let me give you an example. From the perspective of flink-table-planner, often a select statement in the source table will trigger Calc. However, the parallelism in Calc is the parallelism of the source Transformation used.:
image
I didn’t see this modification in this PR. I would like to know how this is done. In our internal practice, we need to make certain modifications to Flink-table-planner to achieve this effect.

@YesOrNo828
Copy link
Contributor Author

Can you show the rendering of flink webui after the change? As far as I know, Flinksql has not yet supported custom source parallelism. In flinksql, all operators use the parallelism of the previous operator as the current parallelism, so After I customized the source.parallelism , I think the parallelism of subsequent tasks will not run according to the default parallelism of the task.

@huyuanfeng2018 Thanks for your comment. Actually we could modify the source operater's parallelism through the Table API. In the ScanTableSource.getScanRuntimeProvider() method, we could modify the DataStreamSource's parallelism.
image

select * from 
log_data /*+OPTIONS('arctic.read.mode'='log','properties.group.id'='......','parallelism'='5' )*/ --The job parallelism is 10.

Specify source parallelism with SQL hint

What I am confused about is how the subsequent task changed back to 10 after the scan parallelism was changed? Let me give you an example. From the perspective of flink-table-planner, often a select statement in the source table will trigger Calc. However, the parallelism in Calc is the parallelism of the source Transformation used.: image I didn’t see this modification in this PR. I would like to know how this is done. In our internal practice, we need to make certain modifications to Flink-table-planner to achieve this effect.

Because the above example has a window aggregate operator, it can't be chained with the previous source and calc operators, so it will become 10 parallelism, and the source and calc operators are 5 parallelism. This PR does not change the parallelism of the source operator alone, but also the operators in its chain.

Copy link
Contributor

@huyuanfeng2018 huyuanfeng2018 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@YesOrNo828 YesOrNo828 force-pushed the AMORO-1035 branch 2 times, most recently from 3d9ebba to 2822226 Compare September 27, 2023 02:42
@YesOrNo828
Copy link
Contributor Author

Fixed the conflicts after the spotless tool was added to the Flink module.

@CLAassistant
Copy link

CLAassistant commented Nov 22, 2023

CLA assistant check
All committers have signed the CLA.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@czy006
Copy link
Contributor

czy006 commented Jan 19, 2024

LGTM

Copy link
Contributor

@zhoujinsong zhoujinsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@zhoujinsong zhoujinsong merged commit 962bc2f into apache:master Jan 22, 2024
4 of 5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
module:mixed-flink Flink moduel for Mixed Format type:docs Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improvement][Flink]: Support customizing Arctic source parallelism
6 participants