Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1812] Support spark-based external optimizer #2421

Merged
merged 58 commits into from
Feb 22, 2024
Merged

[AMORO-1812] Support spark-based external optimizer #2421

merged 58 commits into from
Feb 22, 2024

Conversation

tcodehuber
Copy link
Contributor

Why are the changes needed?

Close #1812 .

Brief change log

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before making a pull request

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@tcodehuber
Copy link
Contributor Author

@wangtaohz @baiyangtx Can you give me some suggestions about this PR, thx?

@tcodehuber tcodehuber closed this Dec 10, 2023
@tcodehuber tcodehuber reopened this Dec 10, 2023
Copy link

codecov bot commented Dec 10, 2023

Codecov Report

Attention: 5 lines in your changes are missing coverage. Please review.

Comparison is base (c2bdb64) 32.23% compared to head (2918ca9) 32.37%.
Report is 4 commits behind head on master.

Files Patch % Lines
...ase/arctic/optimizer/common/OptimizerExecutor.java 81.48% 4 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2421      +/-   ##
============================================
+ Coverage     32.23%   32.37%   +0.14%     
- Complexity     4383     4403      +20     
============================================
  Files           589      590       +1     
  Lines         49892    50041     +149     
  Branches       6618     6619       +1     
============================================
+ Hits          16081    16200     +119     
- Misses        32539    32579      +40     
+ Partials       1272     1262      -10     
Flag Coverage Δ
core 30.50% <87.50%> (+0.22%) ⬆️
trino 50.38% <ø> (-0.56%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

majin1102 and others added 20 commits December 21, 2023 18:59
)

* [AMORO-1951] Support parallelized planning in one optimizer group

* [AMORO-1951] add unit test for OptimizingQueue and DefaultOptimizingService

* [AMORO-1951] optimize default parameters

* fix bugs

* fix warnings and spotless issues

* merge from #2290

* add apache license and fix spotless

* fix config error

* Update ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java

Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>

* add annotations

* fix compile errors

* fix import problem

* remove isDebugEnabled()

* spotless apply

* Update ArcticManagementConf.java

* fix reboot bug and supply document content

* use MoreObjects.toStringHelper for OptimizerThread.java

* Merged from [AMORO-2376] Print right log info after calculating and sorting tables

* fix import problem

* remove unused codes

* spotless

* remove incorrect comments

* add max-planning-parallelism to config

---------

Co-authored-by: majin1102 <majin1102@163.com>
Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
…oading the optimizer-job.jar (#2379)

* load optimizer jar first

* fix code style

* change config name

* add config taskmanager.memory.managed.fraction

* fix
…r Iceberg V1 table (#2361)

* [AMORO-2222] [Improvement]: Skip cleaning up dangling delete files for Iceberg V1 table

* Update IcebergTableMaintainer.java

The `total-delete-files` could be 0.

---------

Co-authored-by: wangtaohz <103108928+wangtaohz@users.noreply.github.com>
…g expiring snapshots (#2405)

get hive locations return the uri path
…able (#2408)

* fix null partition

* fix listing files of non-partitioned iceberg table
…ith optimized sequence (#2394)

* should not expire the latest snapshot contains optimized sequence

* add visible for testing

* add fetchLatestNonOptimizedSnapshotTime for base store

* get hive locations return the uri path

* refactor codes and fix comments

* improve for exclude files is empty for expring snapshots

---------

Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
…g.yaml (#2393)

* [AMORO-2386][AMS] Configure `iceberg.worker.num-threads` in the config.yaml

* Fix

* [AMORO-2386][AMS] reuse config `table-manifest-io.thread-count` and reuse thread pool

* Add comment
…2362)

* improve: sort the table list returned by server

* optimize: sort tables by format

* optimize: optimiz tables sorting

* style: udpate comment

---------

Co-authored-by: chenyuzhi <chenyuzhi@corp.netease.com>
Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
* re add table filter

* implement in external catalog

* add ut case

* fix comment

* fix comment

* fix comment

* fix ut

* fix update properties

* roll back the engine side's filter

* resolve conflicts

* add ut

---------

Co-authored-by: baiyangtx <xiangnebula@163.com>
Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
…ole (#2297)

* cancel the running opimizing process from ams web console

* refact code to avoid NPE

* add o comment for com.netease.arctic.server.table.TableService#getServerTableIdentifier

* change the cancel post api to be more restful style
* Add UnifiedSparkCatalog under spark common module
* Extract MixedSparkCatalogBase and MixedSparkSessionCatalogBase to spark common module
* Refactor spark unit test framework to adapt unifed catalog tests and mixed format tests.
…phan files (#2403)

* [Improvement]: Extract the deleting dangling files from the cleaning orphan files

* [Improvement]: Extract the deleting dangling files from the cleaning orphan files

* [Improvement]: Extract the deleting dangling files from the cleaning orphan files
…table in Flink Engine (#2370)

* [AMORO-1341] [Flink]: Support UnifiedCatalog to contain Mixed format table in Flink Engine
…atalog (#2419)

* fix: If the current catalog is not the one in the query, the first db is selected by default.

* build dashboard frontend

---------

Co-authored-by: wangtao <wangtao3@corp.netease.com>
…ixed Format KeyedTable (#2430)

fix load target change snapshot id
[AMORO-2260] Show the format version of Iceberg Table

Signed-off-by: tcodehuber <tcodehuber@gmail.com>
* dashboard: rename optimized to optimizing

* dashboard: support optimizing taskes

* add optimizer token

* dashboard: modify column width

* dashboard: build

* sort the metrics field and change record cnt to long

* modify MetricsSummary Compatibility

* dashbard: build

* Update ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java

Co-authored-by: Qishang Zhong <zhongqishang@gmail.com>

* fix

* support input metrics and output metrics for optimizing process

* dashboard: support optimizing metrics

* dashbard: build

* dashboard:rebuild

* support MetricsSummary to map

* optimizing task supports input output

* dashboard: optimizing tasks support input and output

* dashboard: not display seconds when longer than 1 hour

* dashboard: optimizing process show summary

* remove useless import

* dashboard: build

* as head

* dashbard: build

* change process status to CLOSED after cancel process

* remove useless log

* dashboard: refresh after cancelled

* support cancel optimizing tasks

* dashboard: handle exception when can't cancel optimizing process

* throw exception when can't cancel optimizing process

* dashboard: build

* dashboard: refresh optimizing process when exist optimizing detail page

* dashboard: build

* fix cost time is 0ms

* change metrics name

* fix task startTime and endTime

* fix costTime

* using Preconditions.checkArgument

* fix task reset

* add comments

* cancel tasks before closing optimizing process

* fix unit test

* fix cancel task

* as head

* Revert "as head"

This reverts commit e469e71.

* dashboard: build

---------

Co-authored-by: Qishang Zhong <zhongqishang@gmail.com>
@tcodehuber
Copy link
Contributor Author

@baiyangtx Can you review it again?

Copy link
Contributor

@baiyangtx baiyangtx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recently, I discussed this PR with @zhoujinsong and overall agree with its implementation approach. However, I hope we can reuse the code in the Optimizer/Common package as much as possible. The refactoring should follow the logic below:

  1. Have SparkOptimizer extend the common.Optimizer class to reuse its OptimizerToucher and OptimizerExecutor management capabilities.

  2. Modify the part of the Optimizer constructor that initializes the executor array to use a protected newOptimizerExecutor() method instead of directly creating a new OptimizerExecutor object.

  3. Implement the SparkOptimizerExecutor class under the Spark module, which extends the common.OptimizerExecutor class type and overrides the private OptimizingTaskResult executeTask(OptimizingTask task) method.

  4. In the overridden OptimizingTaskResult executeTask(OptimizingTask task) method of SparkOptimizerExecutor, implement the logic currently found in SparkOptimizingTaskSubmitter.submitOptimizingTask.

In this way, we can reuse the code in the Optimizer/Common module as much as possible, and in the future, we can also implement an optimizer based on Flink batch in the Flink session using this framework.

@tcodehuber
Copy link
Contributor Author

Recently, I discussed this PR with @zhoujinsong and overall agree with its implementation approach. However, I hope we can reuse the code in the Optimizer/Common package as much as possible. The refactoring should follow the logic below:

  1. Have SparkOptimizer extend the common.Optimizer class to reuse its OptimizerToucher and OptimizerExecutor management capabilities.
  2. Modify the part of the Optimizer constructor that initializes the executor array to use a protected newOptimizerExecutor() method instead of directly creating a new OptimizerExecutor object.
  3. Implement the SparkOptimizerExecutor class under the Spark module, which extends the common.OptimizerExecutor class type and overrides the private OptimizingTaskResult executeTask(OptimizingTask task) method.
  4. In the overridden OptimizingTaskResult executeTask(OptimizingTask task) method of SparkOptimizerExecutor, implement the logic currently found in SparkOptimizingTaskSubmitter.submitOptimizingTask.

In this way, we can reuse the code in the Optimizer/Common module as much as possible, and in the future, we can also implement an optimizer based on Flink batch in the Flink session using this framework.

OK, I will try to work on it later.

@github-actions github-actions bot removed the module:ams-dashboard Ams dashboard module label Jan 14, 2024
Copy link
Contributor

@baiyangtx baiyangtx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to develop separate logic for the active exit of the Spark Optimizer. Therefore, the related code in this area should be unnecessary. I have already commented it out, and the other parts should be fine.

@github-actions github-actions bot added the module:core Core module label Feb 5, 2024
Copy link
Contributor

@zhoujinsong zhoujinsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tcodehuber All codes look fine to me expect a small issue should be fixed.

Copy link
Contributor

@zhoujinsong zhoujinsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Validated in my local environment.

@zhoujinsong zhoujinsong merged commit 0330b53 into apache:master Feb 22, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature]: Support Spark-based optimizer