-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode #8455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hi @zhijiangW could you help take a look on this PR when you are free, thanks. |
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
zhijiangW
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for concerning this issue and opening the PR @Aitozi !
Actually I also considered this issue before, but have not confirmed the proper way to fix this. I could share some my thoughts:
-
The current
inputBufferUsageis only reflecting the usage of floating buffers, and it could give an indicator whether the setting offloating-buffers-per-gateis enough for performance tuning. -
We could always assume if the exclusive buffers are enough for one channel, it will not request floating buffers. In other words, if we see the floating buffers are used, that means the exclusive buffers should also be used.
-
The semantics of
inputBufferUsageis also changed after credit-based mode. The previous usage indicates the buffer is actually filled with network data. Now the usage is only indicating the floating buffer is removed fromLocalBufferPooltoRemoteInputChannel, but the buffer might still be available, not filled with data. -
The metric purpose should be guidable for performance tuning I think. If we mixed the usages of exclusive and floating buffers together, it might not bring any help for performance tuning. E.g. if the usage is 100% after merging, we could easy to estimate exclusive/floating are both used. But now we could also think so based on my second point. If the usage is 50% or other, we still need to further calculate how many floating are used in all.
It is indeed loss of some metrics to help performance tuning better in credit-based mode. Such as
-
how to measure the exclusive usages
-
how to measure the floating distribution among channels which could give feedback whether the current mechanism of floating buffers is proper.
-
how to measure whether the sender in network is blocked by insufficient credit feedback which could help further improve the setting and internal rule.
I plan to focus on these issues future, maybe not covered in next release. For the issue of this PR I would confirm with other guys and feedback to you later.
FYI, the commit title should cover [network, metrics] for better recognization, and exists netowork typo. :)
|
Thanks @zhijiangW for sharing your thoughts on this issue. |
|
@pnowojski What do you think of this issue? |
I agree, there is a value of the current semantic. I would even build upon that. If you want to detect bottleneck, current semantic of tracking just floating buffers is perfect for that. If current operator (reading from this input) is causing a bottleneck it well can be that only a single one input channel is back-pressured. At the moment we could detect this situation with "all floating buffers are exhausted/used".
This is not true for even slight data skew. I think that there is a value of tracking separately floating and exclusive buffers. Maybe we should make |
|
Hi @pnowojski , what is the reason for the second point ?
And +1 for adding the |
|
Thanks for the confirmation and good suggestions. @pnowojski
Maybe my previous expression is not very clear. If floating buffers are used, I only mean the exclusive buffers for one/some For example, if the producer's backlog is 1, we would always request another 1 floating buffer even though the 2 exclusive buffers for this channel are available atm. Because we want to feedback some overhead credits beforehand in order not to block the network transport after producing more backlog soon. So it is not strong consistent for exclusive buffers used in time, might be eventual consistent within our expectation. If the backlog is becoming 0 from 1, the previous requested floating buffer would also be released by this channel if the 2 exclusive buffers are still available. So from the aspect of one input channel, it would not occupy extra floating buffers if its available exclusive buffers are enough. I also agree with the above suggestions for distinguishing the total |
|
Hi, @zhijiangW thanks for your detail explanation, I make a conclusion below
Credit-based mode:
I think this looks better to distinguish the buffer usage between floating and exclusive in credit-based mode and also give a view of the overall perspective with inPoolUsage metric. WDYT @pnowojski @zhijiangW |
|
@Aitozi : I think your proposal makes sense. Having 3 metrics for credit based flow control sounds as the best, the most explicit solution for me.
Which second point did you mean @Aitozi ? Or did @zhijiangW answer your question?
👍 I meant that from the perspective of such data skew, just knowledge of If you see that |
|
OK, I will open another issue additionally to track and will update this PR accordingly, thanks. |
|
Have added the three metric for the credit based mode, please take a look when you are free @zhijiangW @pnowojski |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @Aitozi for the update :) Generally speaking code looks conceptually OK. Left couple of comments.
| import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; | ||
|
|
||
| /** | ||
| * Gauge metric measuring the input buffer pool usage gauge for {@link SingleInputGate}s under credit based mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: metrics?
| private static final String METRIC_OUTPUT_POOL_USAGE = "outPoolUsage"; | ||
| private static final String METRIC_INPUT_QUEUE_LENGTH = "inputQueueLength"; | ||
| private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage"; | ||
| private static final String METRIC_CREDIT_BASED_FLOATING_BUFFER_POOL_USAGE = "floatingBufferUsage"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a nit:
I'm not a native english speaker so I might be wrong, but shouldn't this be floatingBuffersUsage? And ditto in other places? It seems to me like the correct spelling are the following forms/contexts:
- buffer pool usage (singular since its a single "buffer pool")
- red buffers usage (multiple buffers)
| /** | ||
| * Gauge metric measuring the floating buffer usage for {@link SingleInputGate}. | ||
| */ | ||
| public static class FloatingBufferPoolUsage implements Gauge<Float> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we deduplicate the code here a bit? For example we could introduce InputPoolUsageGauge that would contain the following code/loop:
protected abstract int calculateUsedBuffers(InputGate);
protected abstract int calculateBufferPoolSize(InputGate);
@Override
public Float getValue() {
int usedBuffers = 0;
int bufferPoolSize = 0;
for (SingleInputGate inputGate : inputGates) {
usedBuffers += calculateUsedBuffers(inputGate)
bufferPoolSize += calculateBufferPoolSize(inputGate);
}
if (bufferPoolSize != 0) {
return ((float) usedBuffers) / bufferPoolSize;
} else {
return 0.0f;
}
}
That could be extended/implemented by InputBufferPoolUsageGauge and the classes that you introduced.
Also it would be nice to express somehow the total usage as FloatingBufferPoolUsage + ExclusiveBufferPoolUsage instead of duplicating the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good suggestion, I will follow it.
| return initialCredit; | ||
| } | ||
|
|
||
| public int unsafeGetExclusiveBufferUsed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsynchronized instead of unsafe to match existing unsynchronizedGetNumberOfQueuedBuffers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExclusiveBuffers
| /** | ||
| * Gauge metric measuring the input buffer pool usage gauge for {@link SingleInputGate}s under credit based mode. | ||
| */ | ||
| public class CreditBasedInputBufferPoolMetric { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add tests for those classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| return Math.max(0, initialCredit - bufferQueue.exclusiveBuffers.size()); | ||
| } | ||
|
|
||
| public int unsafeGetFloatingBufferLeft() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FloatingBuffers
Left->Available?
| if (requestedFloatingBuffer == 0) { | ||
| return 0.0f; | ||
| } else { | ||
| return 1 - ((float) leftFloatingBuffer) / requestedFloatingBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is not very accurate here.
It should be (requestedFloatingBuffer - leftFloatingBuffer) / LocalBufferPool.getNumBuffers
| } | ||
|
|
||
| @Override | ||
| public Float getValue() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might extract the logics of above exclusiveBuffersUsage and floatingBuffersUsage into separate methods, then these methods could be reused here to get final results directly, to avoid below detail logics again.
| buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new InputBuffersGauge(inputGates)); | ||
| buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates)); | ||
|
|
||
| if (config.isCreditBased()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For exclusive case there is no buffer pool actually. So maybe we could break down the new class names into ExclusiveBuffersUsageGauge, FloatingBuffersUsageGauge, InputBuffersUsageGauge instead?
But considering the metric name compatibility with before, we might still use METRIC_INPUT_POOL_USAGE in credit-based mode to correspond with InputBuffersUsageGauge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong feelings, but maybe keeping the name of the gauge in sync with the metric name is a better idea? (I'm fine either way)
| private static final String METRIC_OUTPUT_POOL_USAGE = "outPoolUsage"; | ||
| private static final String METRIC_INPUT_QUEUE_LENGTH = "inputQueueLength"; | ||
| private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage"; | ||
| private static final String METRIC_CREDIT_BASED_FLOATING_BUFFER_POOL_USAGE = "floatingBufferUsage"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to not add the keywords CREDIT-BASED here, because after we remove the deprecated network codes finally, it is no need to emphasize this term any more. The terms of exclusive/floating already indicate that.
Also it should be consistent between variable name and value, like METRIC_FLOATING_BUFFERS_USAGE = "floatingBuffersUsage". ditto for the below case of exclusive.
|
Thanks for the updates @Aitozi ! I left some inline comments. |
|
Hi @zhijiangW @pnowojski I have addressed your comments. About the |
pnowojski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of more comments from my side :)
| buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new InputBuffersGauge(inputGates)); | ||
| buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates)); | ||
|
|
||
| if (config.isCreditBased()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong feelings, but maybe keeping the name of the gauge in sync with the metric name is a better idea? (I'm fine either way)
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
|
Sorry for the late response @pnowojski , I will solve the comments you mentioned. |
|
Hi @pnowojski , I have adjust the test case according to your suggestion, please take a look when you have time, Thanks. |
220d01e to
a43de92
Compare
|
Ping @zhijiangW |
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
...est/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
Outdated
Show resolved
Hide resolved
|
Thanks for the updates @Aitozi ! I left some other comments for tests. |
|
Very thanks for @zhijiangW 's carefully review, please take a look again, hoping this is the last look :) |
| } | ||
| } | ||
|
|
||
| private Tuple3<SingleInputGate, List<RemoteInputChannel>, List<LocalInputChannel>> buildInputGate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could only return Tuple2<SingleInputGate, List<RemoteInputChannel>> instead because the list of local channels would not be used any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
| .buildRemoteAndSetToGate(inputGate); | ||
| } | ||
|
|
||
| private LocalInputChannel buildLocalChannel( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it void method after adjusting tuple2 above.
|
Thanks for the patience and addressing my comments @Aitozi ! I think you might miss one previous comment for using tuple2 instead of tuple3. After fixing this I have no other concerns. 👍 LGTM! |
…ic in credit-based network mode
|
Ping @pnowojski |
|
Thanks for the fix @Aitozi and your patience with our reviews (especially that the scope of this ticket has grown significantly from your initial version). Merging as I can see that on your private travis this is already green. |
What is the purpose of the change
This PR is to fix the bug in the calculation of inputBufferUsage. It does't now take the exclusive buffer which is assigned from global buffer pool into account.
Brief change log
(for example:)
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation