diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java index 092b9363277..a1ead2f77c0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/metrics/SourceReaderMetrics.java @@ -26,13 +26,15 @@ /** A collection class for handling metrics in {@link IncrementalSourceReader}. */ public class SourceReaderMetrics { + public static final long UNDEFINED = -1; + private final SourceReaderMetricGroup metricGroup; /** * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the * record fetched into the source operator. */ - private volatile long fetchDelay = 0L; + private volatile long fetchDelay = UNDEFINED; /** The total number of record that failed to consume, process or emit. */ private final Counter numRecordsInErrorsCounter; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java index 2970f1b6799..0ca034bd1dd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java @@ -153,6 +153,7 @@ public Offset getOffsetPosition(Map offset) { protected void emitElement(SourceRecord element, SourceOutput output) throws Exception { outputCollector.output = output; + outputCollector.currentMessageTimestamp = getMessageTimestamp(element); debeziumDeserializationSchema.deserialize(element, outputCollector); } @@ -170,10 +171,21 @@ protected void reportMetrics(SourceRecord element) { private static class OutputCollector implements Collector { private SourceOutput output; + private Long currentMessageTimestamp; @Override public void collect(T record) { - output.collect(record); + if (currentMessageTimestamp != null && currentMessageTimestamp > 0) { + // Only binlog event contains a valid timestamp. We use the output with timestamp to + // report the event time and let the source operator to report + // "currentEmitEventTimeLag" correctly. + output.collect(record, currentMessageTimestamp); + } else { + // Records in snapshot mode have a zero timestamp in the message. We use the output + // without timestamp to collect the record. Metric "currentEmitEventTimeLag" will + // not be updated in the source operator in this case. + output.collect(record); + } } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java index d435e93935b..76ef2ed66f4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -25,13 +25,15 @@ /** A collection class for handling metrics in {@link MySqlSourceReader}. */ public class MySqlSourceReaderMetrics { + public static final long UNDEFINED = -1; + private final MetricGroup metricGroup; /** * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the * record fetched into the source operator. */ - private volatile long fetchDelay = 0L; + private volatile long fetchDelay = UNDEFINED; public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index cb05d85f5ea..e3c504113ca 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -116,6 +116,7 @@ private void updateStartingOffsetForSplit(MySqlSplitState splitState, SourceReco private void emitElement(SourceRecord element, SourceOutput output) throws Exception { outputCollector.output = output; + outputCollector.currentMessageTimestamp = RecordUtils.getMessageTimestamp(element); debeziumDeserializationSchema.deserialize(element, outputCollector); } @@ -135,10 +136,21 @@ private void reportMetrics(SourceRecord element) { private static class OutputCollector implements Collector { private SourceOutput output; + private Long currentMessageTimestamp; @Override public void collect(T record) { - output.collect(record); + if (currentMessageTimestamp != null && currentMessageTimestamp > 0) { + // Only binlog event contains a valid timestamp. We use the output with timestamp to + // report the event time and let the source operator to report + // "currentEmitEventTimeLag" correctly. + output.collect(record, currentMessageTimestamp); + } else { + // Records in snapshot mode have a zero timestamp in the message. We use the output + // without timestamp to collect the record. Metric "currentEmitEventTimeLag" will + // not be updated in the source operator in this case. + output.collect(record); + } } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java index adba04e2bf6..c93c1dc644b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHook; import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks; import org.apache.flink.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory; @@ -31,9 +32,16 @@ import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.cdc.debezium.table.MetadataConverter; import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; @@ -95,13 +103,16 @@ import static java.lang.String.format; import static org.apache.flink.api.common.JobStatus.RUNNING; import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** IT tests for {@link MySqlSource}. */ @RunWith(Parameterized.class) public class MySqlSourceITCase extends MySqlSourceTestBase { - @Rule public final Timeout timeoutPerTest = Timeout.seconds(300); + public static final Duration TIMEOUT = Duration.ofSeconds(300); + + @Rule public final Timeout timeoutPerTest = Timeout.seconds(TIMEOUT.getSeconds()); private static final String DEFAULT_SCAN_STARTUP_MODE = "initial"; private final UniqueDatabase customDatabase = @@ -686,6 +697,123 @@ private void testStartingOffset( } } + @SuppressWarnings("unchecked") + @Test + public void testSourceMetrics() throws Exception { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + MySqlSource source = + MySqlSource.builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .databaseList(customDatabase.getDatabaseName()) + .tableList(customDatabase.getDatabaseName() + ".customers") + .username(customDatabase.getUsername()) + .password(customDatabase.getPassword()) + .deserializer(new StringDebeziumDeserializationSchema()) + .serverId(getServerId()) + .build(); + DataStreamSource stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source"); + CollectResultIterator iterator = addCollector(env, stream); + JobClient jobClient = env.executeAsync(); + iterator.setJobClient(jobClient); + + // ---------------------------- Snapshot phase ------------------------------ + // Wait until we receive all 21 snapshot records + int numSnapshotRecordsExpected = 21; + int numSnapshotRecordsReceived = 0; + while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && iterator.hasNext()) { + iterator.next(); + numSnapshotRecordsReceived++; + } + + // Check metrics + List metricGroups = + metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MySQL CDC Source"); + // There should be only 1 parallelism of source, so it's safe to get the only group + OperatorMetricGroup group = metricGroups.get(0); + Map metrics = metricReporter.getMetricsByGroup(group); + + // numRecordsOut + assertEquals( + numSnapshotRecordsExpected, + group.getIOMetricGroup().getNumRecordsOutCounter().getCount()); + + // currentEmitEventTimeLag should be UNDEFINED during snapshot phase + assertTrue(metrics.containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG)); + Gauge currentEmitEventTimeLag = + (Gauge) metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG); + assertEquals( + InternalSourceReaderMetricGroup.UNDEFINED, + (long) currentEmitEventTimeLag.getValue()); + + // currentFetchEventTimeLag should be UNDEFINED during snapshot phase + assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG)); + Gauge currentFetchEventTimeLag = + (Gauge) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG); + assertEquals( + MySqlSourceReaderMetrics.UNDEFINED, (long) currentFetchEventTimeLag.getValue()); + + // sourceIdleTime should be positive (we can't know the exact value) + assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME)); + Gauge sourceIdleTime = (Gauge) metrics.get(MetricNames.SOURCE_IDLE_TIME); + assertTrue(sourceIdleTime.getValue() > 0); + assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis()); + + // --------------------------------- Binlog phase ----------------------------- + makeFirstPartBinlogEvents(getConnection(), customDatabase.qualifiedTableName("customers")); + // Wait until we receive 4 changes made above + int numBinlogRecordsExpected = 4; + int numBinlogRecordsReceived = 0; + while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) { + iterator.next(); + numBinlogRecordsReceived++; + } + + // Check metrics + // numRecordsOut + assertEquals( + numSnapshotRecordsExpected + numBinlogRecordsExpected, + group.getIOMetricGroup().getNumRecordsOutCounter().getCount()); + + // currentEmitEventTimeLag should be reasonably positive (we can't know the exact value) + assertTrue(currentEmitEventTimeLag.getValue() > 0); + assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis()); + + // currentEmitEventTimeLag should be reasonably positive (we can't know the exact value) + assertTrue(currentFetchEventTimeLag.getValue() > 0); + assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis()); + + // currentEmitEventTimeLag should be reasonably positive (we can't know the exact value) + assertTrue(sourceIdleTime.getValue() > 0); + assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis()); + + jobClient.cancel().get(); + iterator.close(); + } + + private CollectResultIterator addCollector( + StreamExecutionEnvironment env, DataStream stream) { + TypeSerializer serializer = + stream.getTransformation().getOutputType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory<>(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator<>( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig()); + CollectStreamSink sink = new CollectStreamSink<>(stream, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + return iterator; + } + private MySqlSource buildSleepingSource() { ResolvedSchema physicalSchema = new ResolvedSchema( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 0ca776bebf9..4ca1e26a44b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -20,9 +20,11 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -52,6 +54,7 @@ public abstract class MySqlSourceTestBase extends TestLogger { protected static final int DEFAULT_PARALLELISM = 4; protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7); + protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics(); @Rule public final MiniClusterWithClientResource miniClusterResource = @@ -61,6 +64,8 @@ public abstract class MySqlSourceTestBase extends TestLogger { .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) .setRpcServiceSharing(RpcServiceSharing.DEDICATED) .withHaLeadershipControl() + .setConfiguration( + metricReporter.addToConfiguration(new Configuration())) .build()); @BeforeClass