Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement]: modify the parameter transfer method for flink optimize #1789

Closed
2 of 3 tasks
Tracked by #1847 ...
xujiangfeng001 opened this issue Aug 3, 2023 · 7 comments · Fixed by #1803
Closed
2 of 3 tasks
Tracked by #1847 ...

[Improvement]: modify the parameter transfer method for flink optimize #1789

xujiangfeng001 opened this issue Aug 3, 2023 · 7 comments · Fixed by #1803

Comments

@xujiangfeng001
Copy link
Contributor

xujiangfeng001 commented Aug 3, 2023

Search before asking

  • I have searched in the issues and found no similar issues.

What would you like to be improved?

At present, when flink optimize is started, only jobmanager.memory and taskmanager.memory are supported for task parameter transfer methods.If users want to modify some tuning parameters of flink optimize, they can only use the flink-conf.yaml method to modify them.

How should we improve?

The following code:

  protected String buildOptimizerStartupArgsString(Resource resource) {
    String taskManagerMemory = PropertyUtil.checkAndGetProperty(resource.getProperties(),
        TASK_MANAGER_MEMORY_PROPERTY);
    String jobManagerMemory = PropertyUtil.checkAndGetProperty(resource.getProperties(), JOB_MANAGER_MEMORY_PROPERTY);
    String jobPath = getAMSHome() + "/plugin/optimize/OptimizeJob.jar";
    long memory = Long.parseLong(jobManagerMemory) + Long.parseLong(taskManagerMemory) * resource.getThreadCount();
    return String.format("%s/bin/flink run -m yarn-cluster -ytm %s -yjm %s -c %s %s -m %s %s",
        getFlinkHome(), taskManagerMemory, jobManagerMemory,
        FlinkOptimizer.class.getName(), jobPath, memory,
        super.buildOptimizerStartupArgsString(resource));
  }

Firstly, users must define jobmanager.memory and taskmanager.memory in order to start the flink optimizer properly, and cannot use the configuration parameters in flink-conf.yaml.

Secondly, I guess that -yD <property=value> should be supported for parameter passing, so that the flink-optimizer can support more parameters.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Subtasks

No response

Code of Conduct

@xujiangfeng001
Copy link
Contributor Author

@shidayang
Copy link
Contributor

make sense

@majin1102
Copy link
Contributor

Hi xujiangfeng001,

Thank you for taking the time to report the issue you encountered in our project. We appreciate your feedback and would like to invite you to contribute to our project by submitting a pull request to fix the issue.

We believe that your contribution would be very valuable, and we are happy to help you get started with contributing. Please take a look at our contribution guide at contribution guide and feel free to reach out to us(@shidayang @zhoujinsong) if you have any questions.

@wangtaohz
Copy link
Contributor

I agree to provide a more generic method, such as using -yD <property=value>, to support more parameter configurations when starting the flink-optimizer.

But, the two parameters jobmanager.memory and taskmanager.memory are a little special. They are forced to be configured in order to obtain the accurate memory usage of the optimizer, like:
image

If new parameter settings allow memory to be set from flink-conf.yaml, we have to consider how to obtain the value of memory.

@zhongqishang
Copy link
Contributor

zhongqishang commented Aug 4, 2023

+1 for this.

we have to consider how to obtain the value of memory.

I think we can report resources to ams on the optimizer side, for the external optimizer, the resource can also be displayed.

MemorySize jobMemorySize = env.getConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY);
MemorySize taskMemorySize = env.getConfiguration().get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
int parallelism = env.getParallelism();

BTW, I think that the yarn submission method can be changed from perjob to application mode.
One is because perjob was deprecated in version 1.15,
and the configuration( include jobmanager.memory/taskmanager.memory) is all configured via -D, which is more convenient to provide a general solution.

To add, so far (1.17.x ) perjob can still be used normally.

@wangtaohz
Copy link
Contributor

I think we can report resources to ams on the optimizer side, for the external optimizer, the resource can also be displayed.

MemorySize jobMemorySize = env.getConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY);
MemorySize taskMemorySize = env.getConfiguration().get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
int parallelism = env.getParallelism();

It looks practical. 👍
The memory can be reported to AMS during registration.

@xujiangfeng001
Copy link
Contributor Author

I think we can report resources to ams on the optimizer side, for the external optimizer, the resource can also be displayed.

MemorySize jobMemorySize = env.getConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY);
MemorySize taskMemorySize = env.getConfiguration().get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
int parallelism = env.getParallelism();

It looks practical. 👍 The memory can be reported to AMS during registration.

I will keep the original logic and report memory resources to AMS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants