Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
/** A collection class for handling metrics in {@link IncrementalSourceReader}. */
public class SourceReaderMetrics {

public static final long UNDEFINED = -1;

private final SourceReaderMetricGroup metricGroup;

/**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
private volatile long fetchDelay = UNDEFINED;

/** The total number of record that failed to consume, process or emit. */
private final Counter numRecordsInErrorsCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public Offset getOffsetPosition(Map<String, ?> offset) {

protected void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
outputCollector.output = output;
outputCollector.currentMessageTimestamp = getMessageTimestamp(element);
debeziumDeserializationSchema.deserialize(element, outputCollector);
}

Expand All @@ -170,10 +171,21 @@ protected void reportMetrics(SourceRecord element) {

private static class OutputCollector<T> implements Collector<T> {
private SourceOutput<T> output;
private Long currentMessageTimestamp;

@Override
public void collect(T record) {
output.collect(record);
if (currentMessageTimestamp != null && currentMessageTimestamp > 0) {
// Only binlog event contains a valid timestamp. We use the output with timestamp to
// report the event time and let the source operator to report
// "currentEmitEventTimeLag" correctly.
output.collect(record, currentMessageTimestamp);
} else {
// Records in snapshot mode have a zero timestamp in the message. We use the output
// without timestamp to collect the record. Metric "currentEmitEventTimeLag" will
// not be updated in the source operator in this case.
output.collect(record);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
/** A collection class for handling metrics in {@link MySqlSourceReader}. */
public class MySqlSourceReaderMetrics {

public static final long UNDEFINED = -1;

private final MetricGroup metricGroup;

/**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
private volatile long fetchDelay = UNDEFINED;

public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private void updateStartingOffsetForSplit(MySqlSplitState splitState, SourceReco

private void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception {
outputCollector.output = output;
outputCollector.currentMessageTimestamp = RecordUtils.getMessageTimestamp(element);
debeziumDeserializationSchema.deserialize(element, outputCollector);
}

Expand All @@ -135,10 +136,21 @@ private void reportMetrics(SourceRecord element) {

private static class OutputCollector<T> implements Collector<T> {
private SourceOutput<T> output;
private Long currentMessageTimestamp;

@Override
public void collect(T record) {
output.collect(record);
if (currentMessageTimestamp != null && currentMessageTimestamp > 0) {
// Only binlog event contains a valid timestamp. We use the output with timestamp to
// report the event time and let the source operator to report
// "currentEmitEventTimeLag" correctly.
output.collect(record, currentMessageTimestamp);
} else {
// Records in snapshot mode have a zero timestamp in the message. We use the output
// without timestamp to collect the record. Metric "currentEmitEventTimeLag" will
// not be updated in the source operator in this case.
output.collect(record);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHook;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
Expand All @@ -31,9 +32,16 @@
import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
Expand Down Expand Up @@ -95,13 +103,16 @@
import static java.lang.String.format;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** IT tests for {@link MySqlSource}. */
@RunWith(Parameterized.class)
public class MySqlSourceITCase extends MySqlSourceTestBase {

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
public static final Duration TIMEOUT = Duration.ofSeconds(300);

@Rule public final Timeout timeoutPerTest = Timeout.seconds(TIMEOUT.getSeconds());

private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
private final UniqueDatabase customDatabase =
Expand Down Expand Up @@ -686,6 +697,123 @@ private void testStartingOffset(
}
}

@SuppressWarnings("unchecked")
@Test
public void testSourceMetrics() throws Exception {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
MySqlSource<String> source =
MySqlSource.<String>builder()
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.databaseList(customDatabase.getDatabaseName())
.tableList(customDatabase.getDatabaseName() + ".customers")
.username(customDatabase.getUsername())
.password(customDatabase.getPassword())
.deserializer(new StringDebeziumDeserializationSchema())
.serverId(getServerId())
.build();
DataStreamSource<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
CollectResultIterator<String> iterator = addCollector(env, stream);
JobClient jobClient = env.executeAsync();
iterator.setJobClient(jobClient);

// ---------------------------- Snapshot phase ------------------------------
// Wait until we receive all 21 snapshot records
int numSnapshotRecordsExpected = 21;
int numSnapshotRecordsReceived = 0;
while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && iterator.hasNext()) {
iterator.next();
numSnapshotRecordsReceived++;
}

// Check metrics
List<OperatorMetricGroup> metricGroups =
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MySQL CDC Source");
// There should be only 1 parallelism of source, so it's safe to get the only group
OperatorMetricGroup group = metricGroups.get(0);
Map<String, Metric> metrics = metricReporter.getMetricsByGroup(group);

// numRecordsOut
assertEquals(
numSnapshotRecordsExpected,
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());

// currentEmitEventTimeLag should be UNDEFINED during snapshot phase
assertTrue(metrics.containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG));
Gauge<Long> currentEmitEventTimeLag =
(Gauge<Long>) metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
assertEquals(
InternalSourceReaderMetricGroup.UNDEFINED,
(long) currentEmitEventTimeLag.getValue());

// currentFetchEventTimeLag should be UNDEFINED during snapshot phase
assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG));
Gauge<Long> currentFetchEventTimeLag =
(Gauge<Long>) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
assertEquals(
MySqlSourceReaderMetrics.UNDEFINED, (long) currentFetchEventTimeLag.getValue());

// sourceIdleTime should be positive (we can't know the exact value)
assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME));
Gauge<Long> sourceIdleTime = (Gauge<Long>) metrics.get(MetricNames.SOURCE_IDLE_TIME);
assertTrue(sourceIdleTime.getValue() > 0);
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());

// --------------------------------- Binlog phase -----------------------------
makeFirstPartBinlogEvents(getConnection(), customDatabase.qualifiedTableName("customers"));
// Wait until we receive 4 changes made above
int numBinlogRecordsExpected = 4;
int numBinlogRecordsReceived = 0;
while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) {
iterator.next();
numBinlogRecordsReceived++;
}

// Check metrics
// numRecordsOut
assertEquals(
numSnapshotRecordsExpected + numBinlogRecordsExpected,
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(currentEmitEventTimeLag.getValue() > 0);
assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(currentFetchEventTimeLag.getValue() > 0);
assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(sourceIdleTime.getValue() > 0);
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());

jobClient.cancel().get();
iterator.close();
}

private <T> CollectResultIterator<T> addCollector(
StreamExecutionEnvironment env, DataStream<T> stream) {
TypeSerializer<T> serializer =
stream.getTransformation().getOutputType().createSerializer(env.getConfig());
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectSinkOperatorFactory<T> factory =
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator();
CollectResultIterator<T> iterator =
new CollectResultIterator<>(
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
env.getCheckpointConfig());
CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());
return iterator;
}

private MySqlSource<RowData> buildSleepingSource() {
ResolvedSchema physicalSchema =
new ResolvedSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.MiniClusterWithClientResource;
Expand Down Expand Up @@ -52,6 +54,7 @@ public abstract class MySqlSourceTestBase extends TestLogger {

protected static final int DEFAULT_PARALLELISM = 4;
protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);
protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics();

@Rule
public final MiniClusterWithClientResource miniClusterResource =
Expand All @@ -61,6 +64,8 @@ public abstract class MySqlSourceTestBase extends TestLogger {
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.setConfiguration(
metricReporter.addToConfiguration(new Configuration()))
.build());

@BeforeClass
Expand Down