-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-36112][Connector/Filesystem].Add Support for CreateFlag.NO_LOCAL_WRITE in FLINK on YARN's File Creation to Manage Disk Space and Network Load in Labeled YARN Nodes #25226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@xuyangzhong |
...p-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
Outdated
Show resolved
Hide resolved
|
@xintongsong |
xintongsong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @liangyu-1,
Sorry for the late reply. I must have overlooked the email notification.
Thanks for working on this PR. I've left some comments. PTAL.
...ctors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Outdated
Show resolved
Hide resolved
.../flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
Outdated
Show resolved
Hide resolved
...nk-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterTest.java
Show resolved
Hide resolved
But in this pr I have to use Mockito as I explained in the code reviews, I didn't find a way to make sure that miniCluster returns exactly one dataNode to be the LocalNode each time it attempt to write a new block. |
xintongsong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liangyu-1
Thanks for addressing my comments. I think the PR is very close to a mergable state. I left only one minor inline comment. In addition, the CI is failing due to the usage of Mockito. I personally would be fine with using Mockito in this case. In order to fix the CI, I think we need to append the file that imports Mockito to the suppress file list for the IllegalImport check in tools/maven/suppressions.xml.
| } | ||
|
|
||
| public T disableLocalWriting() { | ||
| this.writerConfig.put("fs.hdfs.no-local-write", "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nicer to change this string literal into a static final constant of FileSink, and use the same constant in HadoopFileSystem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I have modified this field.
@xintongsong |
|
@flinkbot run azure re-run the last Azure build |
…ARN's File Creation to Manage Disk Space and Network Load in Labeled YARN Nodes
…NK on YARN's File Creation to Manage Disk Space and Network Load in Labeled YARN Nodes
xintongsong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @liangyu-1. LGTM. Taking over from here.
9bd0197 to
224c99c
Compare
…ARN's File Creation to Manage Disk Space and Network Load in Labeled YARN Nodes This closes apache#25226
Add Support for CreateFlag.NO_LOCAL_WRITE in FLINK on YARN's File Creation to Manage Disk Space and Network Load in Labeled YARN Nodes
Description
I am currently using Apache Flink on Yarn to write files into Hadoop. The Flink application runs on a labeled YARN queue.
During operation, it has been observed that the local disks on these labeled nodes get filled up quickly, and the network load is significantly high. This issue arises because Hadoop prioritizes writing files to the local node first, and the number of these labeled nodes is quite limited.
Problem
The current behavior leads to inefficient disk space utilization and high network traffic on these few labeled nodes, which could potentially affect the performance and reliability of the application. As shown in the picture, the host I circled have a average net_bytes_sent speed 1.2GB/s while the others are just 50MB/s, this imbalance in network and disk space nearly destroyed the whole cluster.
this patch is to solve FLINK-36112
Also this modification has already been discussed in Hadoop's pull request
What is the purpose of the change
This pull request makes FileSink that writes to Hadoop Filesystem able to choose whether write the first replica on the local machine or choose to write to other dataNodes of the cluster randomly. This way we avoid the host that have both taskManager process and DataNode process having an extremely high network load average. (especially in the case that we run Flink on a labeled yarn queue)
Brief change log
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation