Skip to content

Commit 63ee608

Browse files
Arvid HeiseAHeise
Arvid Heise
authored andcommitted
[FLINK-23652][core/metrics] Extract Operator(IO)MetricGroup interfaces and expose them in RuntimeContext
1 parent 5d5e39b commit 63ee608

File tree

71 files changed

+332
-174
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+332
-174
lines changed

flink-connectors/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSourceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
2121
import org.apache.flink.api.common.serialization.DeserializationSchema;
2222
import org.apache.flink.api.common.typeinfo.TypeInformation;
23-
import org.apache.flink.metrics.MetricGroup;
23+
import org.apache.flink.metrics.groups.OperatorMetricGroup;
2424
import org.apache.flink.streaming.api.functions.source.SourceFunction;
2525
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
2626
import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeIdsForCheckpoint;
@@ -58,7 +58,7 @@ public class PubSubSourceTest {
5858
@Mock private PubSubSource.AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory;
5959
@Mock private AcknowledgeOnCheckpoint<String> acknowledgeOnCheckpoint;
6060
@Mock private StreamingRuntimeContext streamingRuntimeContext;
61-
@Mock private MetricGroup metricGroup;
61+
@Mock private OperatorMetricGroup metricGroup;
6262
@Mock private PubSubSubscriberFactory pubSubSubscriberFactory;
6363
@Mock private Credentials credentials;
6464
@Mock private PubSubSubscriber pubsubSubscriber;

flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
4747
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
4848
import org.apache.flink.core.fs.CloseableRegistry;
49-
import org.apache.flink.metrics.MetricGroup;
49+
import org.apache.flink.metrics.groups.OperatorMetricGroup;
5050
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
5151
import org.apache.flink.runtime.state.FunctionInitializationContext;
5252
import org.apache.flink.runtime.state.StateInitializationContextImpl;
@@ -186,7 +186,7 @@ public String getTaskName() {
186186
}
187187

188188
@Override
189-
public MetricGroup getMetricGroup() {
189+
public OperatorMetricGroup getMetricGroup() {
190190
return null;
191191
}
192192

flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcherTest.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.api.common.functions.RuntimeContext;
2121
import org.apache.flink.api.common.serialization.SimpleStringSchema;
22-
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
2322
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
2423
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
2524
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
@@ -62,8 +61,7 @@ public void testCreateRecordPublisherRespectsShardIteratorTypeLatest() throws Ex
6261
fetcher.createRecordPublisher(
6362
SENTINEL_LATEST_SEQUENCE_NUM.get(),
6463
new Properties(),
65-
createFakeShardConsumerMetricGroup(
66-
(OperatorMetricGroup) runtimeContext.getMetricGroup()),
64+
createFakeShardConsumerMetricGroup(runtimeContext.getMetricGroup()),
6765
dummyStreamShardHandle);
6866

6967
verify(kinesis).getShardIterator(dummyStreamShardHandle, LATEST.toString(), null);

flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTestUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.flink.streaming.connectors.kinesis.internals;
1919

2020
import org.apache.flink.api.common.serialization.SimpleStringSchema;
21+
import org.apache.flink.metrics.groups.OperatorMetricGroup;
2122
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
22-
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
2323
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
2424
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
2525
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisherFactory;

flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.flink.streaming.connectors.kinesis.testutils;
1919

2020
import org.apache.flink.api.common.ExecutionConfig;
21-
import org.apache.flink.metrics.MetricGroup;
21+
import org.apache.flink.metrics.groups.OperatorMetricGroup;
2222
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
2323
import org.apache.flink.runtime.jobgraph.OperatorID;
2424
import org.apache.flink.runtime.memory.MemoryManager;
@@ -53,8 +53,8 @@ public TestRuntimeContext(
5353
}
5454

5555
@Override
56-
public MetricGroup getMetricGroup() {
57-
return new UnregisteredMetricsGroup();
56+
public OperatorMetricGroup getMetricGroup() {
57+
return UnregisteredMetricsGroup.createOperatorMetricGroup();
5858
}
5959

6060
@Override

flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.apache.flink.api.common.state.ReducingStateDescriptor;
4040
import org.apache.flink.api.common.state.ValueState;
4141
import org.apache.flink.api.common.state.ValueStateDescriptor;
42-
import org.apache.flink.metrics.MetricGroup;
42+
import org.apache.flink.metrics.groups.OperatorMetricGroup;
4343

4444
import java.io.Serializable;
4545
import java.util.List;
@@ -76,7 +76,7 @@ public interface RuntimeContext {
7676
* @return The metric group for this parallel subtask.
7777
*/
7878
@PublicEvolving
79-
MetricGroup getMetricGroup();
79+
OperatorMetricGroup getMetricGroup();
8080

8181
/**
8282
* Gets the parallelism with which the parallel task runs.

flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.apache.flink.api.common.state.ValueState;
4343
import org.apache.flink.api.common.state.ValueStateDescriptor;
4444
import org.apache.flink.core.fs.Path;
45-
import org.apache.flink.metrics.MetricGroup;
45+
import org.apache.flink.metrics.groups.OperatorMetricGroup;
4646
import org.apache.flink.util.UserCodeClassLoader;
4747

4848
import java.io.Serializable;
@@ -65,15 +65,15 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
6565

6666
private final DistributedCache distributedCache;
6767

68-
private final MetricGroup metrics;
68+
private final OperatorMetricGroup metrics;
6969

7070
public AbstractRuntimeUDFContext(
7171
TaskInfo taskInfo,
7272
UserCodeClassLoader userCodeClassLoader,
7373
ExecutionConfig executionConfig,
7474
Map<String, Accumulator<?, ?>> accumulators,
7575
Map<String, Future<Path>> cpTasks,
76-
MetricGroup metrics) {
76+
OperatorMetricGroup metrics) {
7777
this.taskInfo = checkNotNull(taskInfo);
7878
this.userCodeClassLoader = userCodeClassLoader;
7979
this.executionConfig = executionConfig;
@@ -108,7 +108,7 @@ public int getIndexOfThisSubtask() {
108108
}
109109

110110
@Override
111-
public MetricGroup getMetricGroup() {
111+
public OperatorMetricGroup getMetricGroup() {
112112
return metrics;
113113
}
114114

flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
2929
import org.apache.flink.api.common.functions.RuntimeContext;
3030
import org.apache.flink.core.fs.Path;
31-
import org.apache.flink.metrics.MetricGroup;
31+
import org.apache.flink.metrics.groups.OperatorMetricGroup;
3232
import org.apache.flink.util.SimpleUserCodeClassLoader;
3333

3434
import java.util.HashMap;
@@ -56,7 +56,7 @@ public RuntimeUDFContext(
5656
ExecutionConfig executionConfig,
5757
Map<String, Future<Path>> cpTasks,
5858
Map<String, Accumulator<?, ?>> accumulators,
59-
MetricGroup metrics) {
59+
OperatorMetricGroup metrics) {
6060
this(
6161
taskInfo,
6262
userCodeClassLoader,
@@ -73,7 +73,7 @@ public RuntimeUDFContext(
7373
ExecutionConfig executionConfig,
7474
Map<String, Future<Path>> cpTasks,
7575
Map<String, Accumulator<?, ?>> accumulators,
76-
MetricGroup metrics,
76+
OperatorMetricGroup metrics,
7777
JobID jobID) {
7878
super(
7979
taskInfo,

flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import org.apache.flink.core.fs.FileSystem;
4949
import org.apache.flink.core.fs.Path;
5050
import org.apache.flink.core.fs.local.LocalFileSystem;
51-
import org.apache.flink.metrics.MetricGroup;
51+
import org.apache.flink.metrics.groups.OperatorMetricGroup;
5252
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
5353
import org.apache.flink.types.Value;
5454
import org.apache.flink.util.OptionalFailure;
@@ -197,7 +197,7 @@ private <IN> void executeDataSink(GenericDataSinkBase<?> sink, int superStep, Jo
197197
}
198198

199199
private RuntimeUDFContext createContext(int superStep, TaskInfo taskInfo, JobID jobID) {
200-
MetricGroup metrics = new UnregisteredMetricsGroup();
200+
OperatorMetricGroup metrics = UnregisteredMetricsGroup.createOperatorMetricGroup();
201201
return superStep == 0
202202
? new RuntimeUDFContext(
203203
taskInfo,
@@ -588,7 +588,7 @@ public IterationRuntimeUDFContext(
588588
ExecutionConfig executionConfig,
589589
Map<String, Future<Path>> cpTasks,
590590
Map<String, Accumulator<?, ?>> accumulators,
591-
MetricGroup metrics,
591+
OperatorMetricGroup metrics,
592592
JobID jobID) {
593593
super(taskInfo, classloader, executionConfig, cpTasks, accumulators, metrics, jobID);
594594
}

flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void testBroadcastVariableNotFound() {
5050
new ExecutionConfig(),
5151
new HashMap<>(),
5252
new HashMap<>(),
53-
new UnregisteredMetricsGroup());
53+
UnregisteredMetricsGroup.createOperatorMetricGroup());
5454

5555
assertFalse(ctx.hasBroadcastVariable("some name"));
5656

@@ -90,7 +90,7 @@ public void testBroadcastVariableSimple() {
9090
new ExecutionConfig(),
9191
new HashMap<>(),
9292
new HashMap<>(),
93-
new UnregisteredMetricsGroup());
93+
UnregisteredMetricsGroup.createOperatorMetricGroup());
9494

9595
ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
9696
ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
@@ -133,7 +133,7 @@ public void testBroadcastVariableWithInitializer() {
133133
new ExecutionConfig(),
134134
new HashMap<>(),
135135
new HashMap<>(),
136-
new UnregisteredMetricsGroup());
136+
UnregisteredMetricsGroup.createOperatorMetricGroup());
137137

138138
ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
139139

@@ -167,7 +167,7 @@ public void testResetBroadcastVariableWithInitializer() {
167167
new ExecutionConfig(),
168168
new HashMap<>(),
169169
new HashMap<>(),
170-
new UnregisteredMetricsGroup());
170+
UnregisteredMetricsGroup.createOperatorMetricGroup());
171171

172172
ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
173173

@@ -198,7 +198,7 @@ public void testBroadcastVariableWithInitializerAndMismatch() {
198198
new ExecutionConfig(),
199199
new HashMap<>(),
200200
new HashMap<>(),
201-
new UnregisteredMetricsGroup());
201+
UnregisteredMetricsGroup.createOperatorMetricGroup());
202202

203203
ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
204204

flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void testCheckRuntimeContextAccess() {
4747
new ExecutionConfig(),
4848
new HashMap<String, Future<Path>>(),
4949
new HashMap<String, Accumulator<?, ?>>(),
50-
new UnregisteredMetricsGroup()));
50+
UnregisteredMetricsGroup.createOperatorMetricGroup()));
5151

5252
assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
5353
assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(), 3);

flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void testCheckRuntimeContextAccess() {
4848
new ExecutionConfig(),
4949
new HashMap<String, Future<Path>>(),
5050
new HashMap<String, Accumulator<?, ?>>(),
51-
new UnregisteredMetricsGroup()));
51+
UnregisteredMetricsGroup.createOperatorMetricGroup()));
5252

5353
assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
5454
assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(), 3);

flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void testDataSourceWithRuntimeContext() {
110110
executionConfig,
111111
cpTasks,
112112
accumulatorMap,
113-
new UnregisteredMetricsGroup()),
113+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
114114
executionConfig);
115115

116116
assertEquals(out.output, asList(TestIOData.RICH_NAMES));
@@ -127,7 +127,7 @@ public void testDataSourceWithRuntimeContext() {
127127
executionConfig,
128128
cpTasks,
129129
accumulatorMap,
130-
new UnregisteredMetricsGroup()),
130+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
131131
executionConfig);
132132
assertEquals(out.output, asList(TestIOData.RICH_NAMES));
133133
} catch (Exception e) {

flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void testDataSourceWithRuntimeContext() {
9696
executionConfig,
9797
cpTasks,
9898
accumulatorMap,
99-
new UnregisteredMetricsGroup()),
99+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
100100
executionConfig);
101101

102102
assertEquals(true, in.hasBeenClosed());
@@ -115,7 +115,7 @@ public void testDataSourceWithRuntimeContext() {
115115
executionConfig,
116116
cpTasks,
117117
accumulatorMap,
118-
new UnregisteredMetricsGroup()),
118+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
119119
executionConfig);
120120

121121
assertEquals(true, in.hasBeenClosed());

flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private void testExecuteOnCollection(
9090
executionConfig,
9191
new HashMap<String, Future<Path>>(),
9292
new HashMap<String, Accumulator<?, ?>>(),
93-
new UnregisteredMetricsGroup()),
93+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
9494
executionConfig);
9595

9696
Assert.assertEquals(input.size(), result.size());

flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
import java.util.concurrent.Future;
4242
import java.util.concurrent.atomic.AtomicBoolean;
4343

44-
import static org.junit.Assert.*;
44+
import static org.junit.Assert.assertEquals;
45+
import static org.junit.Assert.assertTrue;
46+
import static org.junit.Assert.fail;
4547

4648
@SuppressWarnings("serial")
4749
public class InnerJoinOperatorBaseTest implements Serializable {
@@ -162,7 +164,7 @@ public void join(String first, String second, Collector<Integer> out)
162164
executionConfig,
163165
cpTasks,
164166
accumulatorMap,
165-
new UnregisteredMetricsGroup()),
167+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
166168
executionConfig);
167169

168170
executionConfig.enableObjectReuse();
@@ -176,7 +178,7 @@ public void join(String first, String second, Collector<Integer> out)
176178
executionConfig,
177179
cpTasks,
178180
accumulatorMap,
179-
new UnregisteredMetricsGroup()),
181+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
180182
executionConfig);
181183

182184
assertEquals(expected, resultSafe);

flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void close() throws Exception {
135135
executionConfig,
136136
cpTasks,
137137
accumulatorMap,
138-
new UnregisteredMetricsGroup()),
138+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
139139
executionConfig);
140140

141141
executionConfig.enableObjectReuse();
@@ -148,7 +148,7 @@ public void close() throws Exception {
148148
executionConfig,
149149
cpTasks,
150150
accumulatorMap,
151-
new UnregisteredMetricsGroup()),
151+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
152152
executionConfig);
153153

154154
assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);

flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void setup() {
8989
executionConfig,
9090
cpTasks,
9191
accumulatorMap,
92-
new UnregisteredMetricsGroup());
92+
UnregisteredMetricsGroup.createOperatorMetricGroup());
9393
}
9494

9595
@Test

flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
import java.util.concurrent.atomic.AtomicBoolean;
4242

4343
import static java.util.Arrays.asList;
44-
import static org.junit.Assert.*;
44+
import static org.junit.Assert.assertEquals;
45+
import static org.junit.Assert.assertTrue;
46+
import static org.junit.Assert.fail;
4547

4648
@SuppressWarnings("serial")
4749
public class PartitionMapOperatorTest implements java.io.Serializable {
@@ -102,7 +104,7 @@ public void close() throws Exception {
102104
executionConfig,
103105
new HashMap<String, Future<Path>>(),
104106
new HashMap<String, Accumulator<?, ?>>(),
105-
new UnregisteredMetricsGroup()),
107+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
106108
executionConfig);
107109

108110
executionConfig.enableObjectReuse();
@@ -115,7 +117,7 @@ public void close() throws Exception {
115117
executionConfig,
116118
new HashMap<String, Future<Path>>(),
117119
new HashMap<String, Accumulator<?, ?>>(),
118-
new UnregisteredMetricsGroup()),
120+
UnregisteredMetricsGroup.createOperatorMetricGroup()),
119121
executionConfig);
120122

121123
assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);

flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void testExecuteOnCollection() {
8888
executionConfig,
8989
cpTasks,
9090
accumulators,
91-
new UnregisteredMetricsGroup());
91+
UnregisteredMetricsGroup.createOperatorMetricGroup());
9292

9393
{
9494
SumCoGroup udf1 = new SumCoGroup();

0 commit comments

Comments
 (0)