diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 1f14347364e39..cc285542b4652 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -998,7 +998,7 @@ Thus, in order to infer the metric identifier:
-### Network
+### Network (Deprecated: use [Default shuffle service metrics]({{ site.baseurl }}/zh/monitoring/metrics.html#default-shuffle-service))
@@ -1045,7 +1045,7 @@ Thus, in order to infer the metric identifier:
Gauge
-
Network.<Input|Output>.<gate>
+
Network.<Input|Output>.<gate|partition> (only available if taskmanager.net.detailed-metrics config option is set)
totalQueueLen
Total number of queued buffers in all input/output channels.
@@ -1069,6 +1069,123 @@ Thus, in order to infer the metric identifier:
+### Default shuffle service
+
+Metrics related to data exchange between task executors using netty network communication.
+
+
+
+
+
Scope
+
Infix
+
Metrics
+
Description
+
Type
+
+
+
+
+
TaskManager
+
Status.Shuffle.Netty
+
AvailableMemorySegments
+
The number of unused memory segments.
+
Gauge
+
+
+
TotalMemorySegments
+
The number of allocated memory segments.
+
Gauge
+
+
+
Task
+
Shuffle.Netty.Input.Buffers
+
inputQueueLength
+
The number of queued input buffers.
+
Gauge
+
+
+
inPoolUsage
+
An estimate of the input buffers usage.
+
Gauge
+
+
+
Shuffle.Netty.Output.Buffers
+
outputQueueLength
+
The number of queued output buffers.
+
Gauge
+
+
+
outPoolUsage
+
An estimate of the output buffers usage.
+
Gauge
+
+
+
Shuffle.Netty.<Input|Output>.<gate|partition>
+ (only available if taskmanager.net.detailed-metrics config option is set)
+
totalQueueLen
+
Total number of queued buffers in all input/output channels.
+
Gauge
+
+
+
minQueueLen
+
Minimum number of queued buffers in all input/output channels.
+
Gauge
+
+
+
maxQueueLen
+
Maximum number of queued buffers in all input/output channels.
+
Gauge
+
+
+
avgQueueLen
+
Average number of queued buffers in all input/output channels.
+
Gauge
+
+
+
Task
+
Shuffle.Netty.Input
+
numBytesInLocal
+
The total number of bytes this task has read from a local source.
+
Counter
+
+
+
numBytesInLocalPerSecond
+
The number of bytes this task reads from a local source per second.
+
Meter
+
+
+
numBytesInRemote
+
The total number of bytes this task has read from a remote source.
+
Counter
+
+
+
numBytesInRemotePerSecond
+
The number of bytes this task reads from a remote source per second.
+
Meter
+
+
+
numBuffersInLocal
+
The total number of network buffers this task has read from a local source.
+
Counter
+
+
+
numBuffersInLocalPerSecond
+
The number of network buffers this task reads from a local source per second.
+
Meter
+
+
+
numBuffersInRemote
+
The total number of network buffers this task has read from a remote source.
+
Counter
+
+
+
numBuffersInRemotePerSecond
+
The number of network buffers this task reads from a remote source per second.
+
Meter
+
+
+
+
### Cluster
@@ -1234,42 +1351,42 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
Task
numBytesInLocal
-
The total number of bytes this task has read from a local source.
Registers legacy metric groups if shuffle service implementation is original default one.
+ *
+ * @deprecated should be removed in future
+ */
+ @SuppressWarnings("DeprecatedIsStillUsed")
+ @Deprecated
+ public void registerLegacyNetworkMetrics(
+ MetricGroup metricGroup,
+ ResultPartitionWriter[] producedPartitions,
+ InputGate[] inputGates) {
+ NettyShuffleMetricFactory.registerLegacyNetworkMetrics(
+ config.isNetworkDetailedMetrics(),
+ metricGroup,
+ producedPartitions,
+ inputGates);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 8a4e04701bffa..15bdfbc0c933d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -20,7 +20,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -38,6 +37,7 @@
import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
+import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -45,10 +45,6 @@
*/
public class NettyShuffleServiceFactory implements ShuffleServiceFactory {
- private static final String METRIC_GROUP_NETWORK = "Network";
- private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments";
- private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments";
-
@Override
public NettyShuffleMaster createShuffleMaster(Configuration configuration) {
return NettyShuffleMaster.INSTANCE;
@@ -96,7 +92,7 @@ static NettyShuffleEnvironment createNettyShuffleEnvironment(
config.networkBufferSize(),
config.networkBuffersPerChannel());
- registerNetworkMetrics(metricGroup, networkBufferPool);
+ registerShuffleMetrics(metricGroup, networkBufferPool);
ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
resultPartitionManager,
@@ -123,12 +119,4 @@ static NettyShuffleEnvironment createNettyShuffleEnvironment(
resultPartitionFactory,
singleInputGateFactory);
}
-
- private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
- MetricGroup networkGroup = metricGroup.addGroup(METRIC_GROUP_NETWORK);
- networkGroup.>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
- networkBufferPool::getTotalNumberOfMemorySegments);
- networkGroup.>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
- networkBufferPool::getNumberOfAvailableMemorySegments);
- }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java
index 51afd1d3f5810..b48a6649160ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java
@@ -24,6 +24,7 @@
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.util.Preconditions;
/**
* Collects metrics for {@link RemoteInputChannel} and {@link LocalInputChannel}.
@@ -32,29 +33,28 @@ public class InputChannelMetrics {
private static final String IO_NUM_BYTES_IN_LOCAL = MetricNames.IO_NUM_BYTES_IN + "Local";
private static final String IO_NUM_BYTES_IN_REMOTE = MetricNames.IO_NUM_BYTES_IN + "Remote";
- private static final String IO_NUM_BYTES_IN_LOCAL_RATE = IO_NUM_BYTES_IN_LOCAL + MetricNames.SUFFIX_RATE;
- private static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + MetricNames.SUFFIX_RATE;
-
private static final String IO_NUM_BUFFERS_IN_LOCAL = MetricNames.IO_NUM_BUFFERS_IN + "Local";
private static final String IO_NUM_BUFFERS_IN_REMOTE = MetricNames.IO_NUM_BUFFERS_IN + "Remote";
- private static final String IO_NUM_BUFFERS_IN_LOCAL_RATE = IO_NUM_BUFFERS_IN_LOCAL + MetricNames.SUFFIX_RATE;
- private static final String IO_NUM_BUFFERS_IN_REMOTE_RATE = IO_NUM_BUFFERS_IN_REMOTE + MetricNames.SUFFIX_RATE;
private final Counter numBytesInLocal;
private final Counter numBytesInRemote;
private final Counter numBuffersInLocal;
private final Counter numBuffersInRemote;
- public InputChannelMetrics(MetricGroup parent) {
- this.numBytesInLocal = parent.counter(IO_NUM_BYTES_IN_LOCAL);
- this.numBytesInRemote = parent.counter(IO_NUM_BYTES_IN_REMOTE);
- parent.meter(IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 60));
- parent.meter(IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 60));
+ public InputChannelMetrics(MetricGroup ... parents) {
+ this.numBytesInLocal = createCounter(IO_NUM_BYTES_IN_LOCAL, parents);
+ this.numBytesInRemote = createCounter(IO_NUM_BYTES_IN_REMOTE, parents);
+ this.numBuffersInLocal = createCounter(IO_NUM_BUFFERS_IN_LOCAL, parents);
+ this.numBuffersInRemote = createCounter(IO_NUM_BUFFERS_IN_REMOTE, parents);
+ }
- this.numBuffersInLocal = parent.counter(IO_NUM_BUFFERS_IN_LOCAL);
- this.numBuffersInRemote = parent.counter(IO_NUM_BUFFERS_IN_REMOTE);
- parent.meter(IO_NUM_BUFFERS_IN_LOCAL_RATE, new MeterView(numBuffersInLocal, 60));
- parent.meter(IO_NUM_BUFFERS_IN_REMOTE_RATE, new MeterView(numBuffersInRemote, 60));
+ private static Counter createCounter(String name, MetricGroup ... parents) {
+ Counter[] counters = new Counter[parents.length];
+ for (int i = 0; i < parents.length; i++) {
+ counters[i] = parents[i].counter(name);
+ parents[i].meter(name + MetricNames.SUFFIX_RATE, new MeterView(counters[i], 60));
+ }
+ return new MultiCounterWrapper(counters);
}
public Counter getNumBytesInLocalCounter() {
@@ -72,4 +72,47 @@ public Counter getNumBuffersInLocalCounter() {
public Counter getNumBuffersInRemoteCounter() {
return numBuffersInRemote;
}
+
+ private static class MultiCounterWrapper implements Counter {
+ private final Counter[] counters;
+
+ private MultiCounterWrapper(Counter ... counters) {
+ Preconditions.checkArgument(counters.length > 0);
+ this.counters = counters;
+ }
+
+ @Override
+ public void inc() {
+ for (Counter c : counters) {
+ c.inc();
+ }
+ }
+
+ @Override
+ public void inc(long n) {
+ for (Counter c : counters) {
+ c.inc(n);
+ }
+ }
+
+ @Override
+ public void dec() {
+ for (Counter c : counters) {
+ c.dec();
+ }
+ }
+
+ @Override
+ public void dec(long n) {
+ for (Counter c : counters) {
+ c.dec(n);
+ }
+ }
+
+ @Override
+ public long getCount() {
+ // assume that the counters are not accessed directly elsewhere
+ return counters[0].getCount();
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
new file mode 100644
index 0000000000000..6bcb027a6e855
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/NettyShuffleMetricFactory.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Factory for netty shuffle service metrics.
+ */
+public class NettyShuffleMetricFactory {
+
+ // deprecated metric groups
+
+ @SuppressWarnings("DeprecatedIsStillUsed")
+ @Deprecated
+ private static final String METRIC_GROUP_NETWORK_DEPRECATED = "Network";
+ @SuppressWarnings("DeprecatedIsStillUsed")
+ @Deprecated
+ private static final String METRIC_GROUP_BUFFERS_DEPRECATED = "buffers";
+
+ // shuffle environment level metrics: Shuffle.Netty.*
+
+ private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments";
+ private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments";
+
+ // task level metric group structure: Shuffle.Netty..Buffers
+
+ private static final String METRIC_GROUP_SHUFFLE = "Shuffle";
+ private static final String METRIC_GROUP_NETTY = "Netty";
+ public static final String METRIC_GROUP_OUTPUT = "Output";
+ public static final String METRIC_GROUP_INPUT = "Input";
+ private static final String METRIC_GROUP_BUFFERS = "Buffers";
+
+ // task level output metrics: Shuffle.Netty.Output.*
+
+ private static final String METRIC_OUTPUT_QUEUE_LENGTH = "outputQueueLength";
+ private static final String METRIC_OUTPUT_POOL_USAGE = "outPoolUsage";
+
+ // task level input metrics: Shuffle.Netty.Input.*
+
+ private static final String METRIC_INPUT_QUEUE_LENGTH = "inputQueueLength";
+ private static final String METRIC_INPUT_POOL_USAGE = "inPoolUsage";
+
+ private NettyShuffleMetricFactory() {
+ }
+
+ public static void registerShuffleMetrics(
+ MetricGroup metricGroup,
+ NetworkBufferPool networkBufferPool) {
+ checkNotNull(metricGroup);
+ checkNotNull(networkBufferPool);
+
+ //noinspection deprecation
+ registerShuffleMetrics(METRIC_GROUP_NETWORK_DEPRECATED, metricGroup, networkBufferPool);
+ registerShuffleMetrics(METRIC_GROUP_NETTY, metricGroup.addGroup(METRIC_GROUP_SHUFFLE), networkBufferPool);
+ }
+
+ private static void registerShuffleMetrics(
+ String groupName,
+ MetricGroup metricGroup,
+ NetworkBufferPool networkBufferPool) {
+ MetricGroup networkGroup = metricGroup.addGroup(groupName);
+ networkGroup.>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
+ networkBufferPool::getTotalNumberOfMemorySegments);
+ networkGroup.>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
+ networkBufferPool::getNumberOfAvailableMemorySegments);
+ }
+
+ public static MetricGroup createShuffleIOOwnerMetricGroup(MetricGroup parentGroup) {
+ return parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_NETTY);
+ }
+
+ /**
+ * Registers legacy network metric groups before shuffle service refactoring.
+ *
+ *
Registers legacy metric groups if shuffle service implementation is original default one.
+ *
+ * @deprecated should be removed in future
+ */
+ @SuppressWarnings("DeprecatedIsStillUsed")
+ @Deprecated
+ public static void registerLegacyNetworkMetrics(
+ boolean isDetailedMetrics,
+ MetricGroup metricGroup,
+ ResultPartitionWriter[] producedPartitions,
+ InputGate[] inputGates) {
+ checkNotNull(metricGroup);
+ checkNotNull(producedPartitions);
+ checkNotNull(inputGates);
+
+ // add metrics for buffers
+ final MetricGroup buffersGroup = metricGroup.addGroup(METRIC_GROUP_BUFFERS_DEPRECATED);
+
+ // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup (metricGroup)
+ final MetricGroup networkGroup = metricGroup.addGroup(METRIC_GROUP_NETWORK_DEPRECATED);
+ final MetricGroup outputGroup = networkGroup.addGroup(METRIC_GROUP_OUTPUT);
+ final MetricGroup inputGroup = networkGroup.addGroup(METRIC_GROUP_INPUT);
+
+ ResultPartition[] resultPartitions = Arrays.copyOf(producedPartitions, producedPartitions.length, ResultPartition[].class);
+ registerOutputMetrics(isDetailedMetrics, outputGroup, buffersGroup, resultPartitions);
+
+ SingleInputGate[] singleInputGates = Arrays.copyOf(inputGates, inputGates.length, SingleInputGate[].class);
+ registerInputMetrics(isDetailedMetrics, inputGroup, buffersGroup, singleInputGates);
+ }
+
+ public static void registerOutputMetrics(
+ boolean isDetailedMetrics,
+ MetricGroup outputGroup,
+ ResultPartition[] resultPartitions) {
+ registerOutputMetrics(
+ isDetailedMetrics,
+ outputGroup,
+ outputGroup.addGroup(METRIC_GROUP_BUFFERS),
+ resultPartitions);
+ }
+
+ private static void registerOutputMetrics(
+ boolean isDetailedMetrics,
+ MetricGroup outputGroup,
+ MetricGroup buffersGroup,
+ ResultPartition[] resultPartitions) {
+ if (isDetailedMetrics) {
+ ResultPartitionMetrics.registerQueueLengthMetrics(outputGroup, resultPartitions);
+ }
+ buffersGroup.gauge(METRIC_OUTPUT_QUEUE_LENGTH, new OutputBuffersGauge(resultPartitions));
+ buffersGroup.gauge(METRIC_OUTPUT_POOL_USAGE, new OutputBufferPoolUsageGauge(resultPartitions));
+ }
+
+ public static void registerInputMetrics(
+ boolean isDetailedMetrics,
+ MetricGroup inputGroup,
+ SingleInputGate[] inputGates) {
+ registerInputMetrics(
+ isDetailedMetrics,
+ inputGroup,
+ inputGroup.addGroup(METRIC_GROUP_BUFFERS),
+ inputGates);
+ }
+
+ private static void registerInputMetrics(
+ boolean isDetailedMetrics,
+ MetricGroup inputGroup,
+ MetricGroup buffersGroup,
+ SingleInputGate[] inputGates) {
+ if (isDetailedMetrics) {
+ InputGateMetrics.registerQueueLengthMetrics(inputGroup, inputGates);
+ }
+ buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new InputBuffersGauge(inputGates));
+ buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates));
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
index 882e52cc7bddd..ac00643df2bae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
@@ -92,25 +92,34 @@ public interface ShuffleEnvironment
This method has to be called only once to avoid duplicated internal metric group registration.
+ *
+ * @param ownerName the owner name, used for logs
+ * @param executionAttemptID execution attempt id of the producer or consumer
+ * @param parentGroup parent of shuffle specific metric group
+ * @return context of the shuffle input/output owner used to create partitions or gates belonging to the owner
+ */
+ ShuffleIOOwnerContext createShuffleIOOwnerContext(
+ String ownerName,
+ ExecutionAttemptID executionAttemptID,
+ MetricGroup parentGroup);
+
/**
* Factory method for the {@link ResultPartitionWriter ResultPartitionWriters} to produce result partitions.
*
*
The order of the {@link ResultPartitionWriter ResultPartitionWriters} in the returned collection
* should be the same as the iteration order of the passed {@code resultPartitionDeploymentDescriptors}.
*
- * @param ownerName the owner name, used for logs
- * @param executionAttemptID execution attempt id of the producer
+ * @param ownerContext the owner context relevant for partition creation
* @param resultPartitionDeploymentDescriptors descriptors of the partition, produced by the owner
- * @param outputGroup shuffle specific group for output metrics
- * @param buffersGroup shuffle specific group for buffer metrics
* @return collection of the {@link ResultPartitionWriter ResultPartitionWriters}
*/
Collection
createResultPartitionWriters(
- String ownerName,
- ExecutionAttemptID executionAttemptID,
- Collection resultPartitionDeploymentDescriptors,
- MetricGroup outputGroup,
- MetricGroup buffersGroup);
+ ShuffleIOOwnerContext ownerContext,
+ Collection resultPartitionDeploymentDescriptors);
/**
* Release local resources occupied by the given partitions.
@@ -133,23 +142,15 @@ Collection
createResultPartitionWriters(
*
The order of the {@link InputGate InputGates} in the returned collection should be the same as the iteration order
* of the passed {@code inputGateDeploymentDescriptors}.
*
- * @param ownerName the owner name, used for logs
- * @param executionAttemptID execution attempt id of the consumer
+ * @param ownerContext the owner context relevant for gate creation
* @param partitionProducerStateProvider producer state provider to query whether the producer is ready for consumption
* @param inputGateDeploymentDescriptors descriptors of the input gates to consume
- * @param parentGroup parent of shuffle specific metric group
- * @param inputGroup shuffle specific group for input metrics
- * @param buffersGroup shuffle specific group for buffer metrics
* @return collection of the {@link InputGate InputGates}
*/
Collection createInputGates(
- String ownerName,
- ExecutionAttemptID executionAttemptID,
+ ShuffleIOOwnerContext ownerContext,
PartitionProducerStateProvider partitionProducerStateProvider,
- Collection inputGateDeploymentDescriptors,
- MetricGroup parentGroup,
- MetricGroup inputGroup,
- MetricGroup buffersGroup);
+ Collection inputGateDeploymentDescriptors);
/**
* Update a gate with the newly available partition information, previously unknown.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleIOOwnerContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleIOOwnerContext.java
new file mode 100644
index 0000000000000..3579cd1ceac9f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleIOOwnerContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Context of shuffle input/output owner used to create partitions or gates belonging to the owner.
+ */
+public class ShuffleIOOwnerContext {
+ private final String ownerName;
+ private final ExecutionAttemptID executionAttemptID;
+ private final MetricGroup parentGroup;
+ private final MetricGroup outputGroup;
+ private final MetricGroup inputGroup;
+
+ public ShuffleIOOwnerContext(
+ String ownerName,
+ ExecutionAttemptID executionAttemptID,
+ MetricGroup parentGroup,
+ MetricGroup outputGroup,
+ MetricGroup inputGroup) {
+ this.ownerName = checkNotNull(ownerName);
+ this.executionAttemptID = checkNotNull(executionAttemptID);
+ this.parentGroup = checkNotNull(parentGroup);
+ this.outputGroup = checkNotNull(outputGroup);
+ this.inputGroup = checkNotNull(inputGroup);
+ }
+
+ public String getOwnerName() {
+ return ownerName;
+ }
+
+ public ExecutionAttemptID getExecutionAttemptID() {
+ return executionAttemptID;
+ }
+
+ public MetricGroup getParentGroup() {
+ return parentGroup;
+ }
+
+ public MetricGroup getOutputGroup() {
+ return outputGroup;
+ }
+
+ public MetricGroup getInputGroup() {
+ return inputGroup;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 2084819920b2e..36d40d1faca60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -28,7 +28,6 @@
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.SafetyNetCloseableRegistry;
-import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.PermanentBlobKey;
@@ -50,6 +49,7 @@
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
@@ -64,6 +64,7 @@
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
@@ -360,21 +361,13 @@ public Task(
final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')';
- // add metrics for buffers
- final MetricGroup buffersGroup = metrics.getIOMetricGroup().addGroup("buffers");
-
- // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
- final MetricGroup networkGroup = metrics.getIOMetricGroup().addGroup("Network");
- final MetricGroup outputGroup = networkGroup.addGroup("Output");
- final MetricGroup inputGroup = networkGroup.addGroup("Input");
+ final ShuffleIOOwnerContext taskShuffleContext = shuffleEnvironment
+ .createShuffleIOOwnerContext(taskNameWithSubtaskAndId, executionId, metrics.getIOMetricGroup());
// produced intermediate result partitions
final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment.createResultPartitionWriters(
- taskNameWithSubtaskAndId,
- executionId,
- resultPartitionDeploymentDescriptors,
- outputGroup,
- buffersGroup).toArray(new ResultPartitionWriter[] {});
+ taskShuffleContext,
+ resultPartitionDeploymentDescriptors).toArray(new ResultPartitionWriter[] {});
this.consumableNotifyingPartitionWriters = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
resultPartitionDeploymentDescriptors,
@@ -385,13 +378,9 @@ public Task(
// consumed intermediate result partitions
final InputGate[] gates = shuffleEnvironment.createInputGates(
- taskNameWithSubtaskAndId,
- executionId,
+ taskShuffleContext,
this,
- inputGateDeploymentDescriptors,
- metrics.getIOMetricGroup(),
- inputGroup,
- buffersGroup).toArray(new InputGate[] {});
+ inputGateDeploymentDescriptors).toArray(new InputGate[] {});
this.inputGates = new InputGate[gates.length];
int counter = 0;
@@ -399,6 +388,12 @@ public Task(
inputGates[counter++] = new InputGateWithMetrics(gate, metrics.getIOMetricGroup().getNumBytesInCounter());
}
+ if (shuffleEnvironment instanceof NettyShuffleEnvironment) {
+ //noinspection deprecation
+ ((NettyShuffleEnvironment) shuffleEnvironment)
+ .registerLegacyNetworkMetrics(metrics.getIOMetricGroup(), resultPartitionWriters, gates);
+ }
+
invokableHasBeenCanceled = new AtomicBoolean(false);
// finally, create the executing thread, but do not start it
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index ebc9888165df0..39ec9e2815313 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -608,13 +608,9 @@ private static Map createInputGateWithLocalChannel
ExecutionAttemptID consumerID = new ExecutionAttemptID();
SingleInputGate[] gates = network.createInputGates(
- "",
- consumerID,
+ network.createShuffleIOOwnerContext("", consumerID, new UnregisteredMetricsGroup()),
SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER,
- Arrays.asList(gateDescs),
- new UnregisteredMetricsGroup(),
- new UnregisteredMetricsGroup(),
- new UnregisteredMetricsGroup()).toArray(new SingleInputGate[] {});
+ Arrays.asList(gateDescs)).toArray(new SingleInputGate[] {});
Map inputGatesById = new HashMap<>();
for (int i = 0; i < numberOfGates; i++) {
inputGatesById.put(new InputGateID(ids[i], consumerID), gates[i]);