Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -92,7 +92,7 @@ public SnapshotSplitAssigner(
currentParallelism,
new ArrayList<>(),
new ArrayList<>(),
new HashMap<>(),
new LinkedHashMap<>(),
new HashMap<>(),
new HashMap<>(),
INITIAL_ASSIGNING,
Expand Down Expand Up @@ -143,7 +143,17 @@ private SnapshotSplitAssigner(
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
// When job restore from savepoint, sort the existing tables and newly added tables
// to let enumerator only send newly added tables' StreamSplitMetaEvent
this.assignedSplits =
assignedSplits.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.collect(
Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(o, o2) -> o,
LinkedHashMap::new));
this.tableSchemas = tableSchemas;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
Expand Down Expand Up @@ -230,6 +240,7 @@ private void captureNewlyAddedTables() {
tableSchemas
.entrySet()
.removeIf(schema -> tablesToRemove.contains(schema.getKey()));
LOG.info("Enumerator remove tables after restart: {}", tablesToRemove);
remainingSplits.removeIf(split -> tablesToRemove.contains(split.getTableId()));
remainingTables.removeAll(tablesToRemove);
alreadyProcessedTables.removeIf(tableId -> tablesToRemove.contains(tableId));
Expand Down Expand Up @@ -303,9 +314,7 @@ public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
"The assigner is not ready to offer finished split information, this should not be called");
}
final List<SchemalessSnapshotSplit> assignedSnapshotSplit =
assignedSplits.values().stream()
.sorted(Comparator.comparing(SourceSplitBase::splitId))
.collect(Collectors.toList());
new ArrayList<>(assignedSplits.values());
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
for (SchemalessSnapshotSplit split : assignedSnapshotSplit) {
Offset finishedOffset = splitFinishedOffsets.get(split.splitId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,22 @@ private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent
finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());
}
final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();

if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize();
final int totalFinishedSplitSizeOfEnumerator = splitAssigner.getFinishedSplitInfos().size();
if (totalFinishedSplitSizeOfReader > totalFinishedSplitSizeOfEnumerator) {
LOG.warn(
"Total finished split size of subtask {} is {}, while total finished split size of enumerator is only {}. Try to truncate it",
subTask,
totalFinishedSplitSizeOfReader,
totalFinishedSplitSizeOfEnumerator);
StreamSplitMetaEvent metadataEvent =
new StreamSplitMetaEvent(
requestEvent.getSplitId(),
requestMetaGroupId,
null,
totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else if (finishedSnapshotSplitMeta.size() > requestMetaGroupId) {
List<FinishedSnapshotSplitInfo> metaToSend =
finishedSnapshotSplitMeta.get(requestMetaGroupId);
StreamSplitMetaEvent metadataEvent =
Expand All @@ -317,13 +331,17 @@ private void sendStreamMetaRequestEvent(int subTask, StreamSplitMetaRequestEvent
requestMetaGroupId,
metaToSend.stream()
.map(FinishedSnapshotSplitInfo::serialize)
.collect(Collectors.toList()));
.collect(Collectors.toList()),
totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else {
LOG.error(
"Received invalid request meta group id {}, the invalid meta group id range is [0, {}]",
requestMetaGroupId,
finishedSnapshotSplitMeta.size() - 1);
throw new FlinkRuntimeException(
String.format(
"The enumerator received invalid request meta group id %s, the valid meta group id range is [0, %s]. Total finished split size of reader is %s, while the total finished split size of enumerator is %s.",
requestMetaGroupId,
finishedSnapshotSplitMeta.size() - 1,
totalFinishedSplitSizeOfReader,
totalFinishedSplitSizeOfEnumerator));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;

import javax.annotation.Nullable;

import java.util.List;

/**
Expand All @@ -43,10 +45,17 @@ public class StreamSplitMetaEvent implements SourceEvent {
*/
private final List<byte[]> metaGroup;

public StreamSplitMetaEvent(String splitId, int metaGroupId, List<byte[]> metaGroup) {
private final int totalFinishedSplitSize;

public StreamSplitMetaEvent(
String splitId,
int metaGroupId,
@Nullable List<byte[]> metaGroup,
int totalFinishedSplitSize) {
this.splitId = splitId;
this.metaGroupId = metaGroupId;
this.metaGroup = metaGroup;
this.totalFinishedSplitSize = totalFinishedSplitSize;
}

public String getSplitId() {
Expand All @@ -60,4 +69,8 @@ public int getMetaGroupId() {
public List<byte[]> getMetaGroup() {
return metaGroup;
}

public int getTotalFinishedSplitSize() {
return totalFinishedSplitSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ public class StreamSplitMetaRequestEvent implements SourceEvent {
private final String splitId;
private final int requestMetaGroupId;

public StreamSplitMetaRequestEvent(String splitId, int requestMetaGroupId) {
private final int totalFinishedSplitSize;

public StreamSplitMetaRequestEvent(
String splitId, int requestMetaGroupId, int totalFinishedSplitSize) {
this.splitId = splitId;
this.requestMetaGroupId = requestMetaGroupId;
this.totalFinishedSplitSize = totalFinishedSplitSize;
}

public String getSplitId() {
Expand All @@ -45,4 +49,8 @@ public String getSplitId() {
public int getRequestMetaGroupId() {
return requestMetaGroupId;
}

public int getTotalFinishedSplitSize() {
return totalFinishedSplitSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -29,11 +31,13 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/** The split to describe the change log of database table(s). */
public class StreamSplit extends SourceSplitBase {
private static final Logger LOG = LoggerFactory.getLogger(StreamSplit.class);
public static final String STREAM_SPLIT_ID = "stream-split";

private final Offset startingOffset;
Expand Down Expand Up @@ -179,9 +183,20 @@ public static StreamSplit appendFinishedSplitInfos(
*/
public static StreamSplit filterOutdatedSplitInfos(
StreamSplit streamSplit, Predicate<TableId> currentTableFilter) {

Set<TableId> tablesToRemove =
streamSplit.getFinishedSnapshotSplitInfos().stream()
.filter(i -> !currentTableFilter.test(i.getTableId()))
.map(split -> split.getTableId())
.collect(Collectors.toSet());
if (tablesToRemove.isEmpty()) {
return streamSplit;
}

LOG.info("Reader remove tables after restart: {}", tablesToRemove);
List<FinishedSnapshotSplitInfo> allFinishedSnapshotSplitInfos =
streamSplit.getFinishedSnapshotSplitInfos().stream()
.filter(i -> currentTableFilter.test(i.getTableId()))
.filter(i -> !tablesToRemove.contains(i.getTableId()))
.collect(Collectors.toList());
Map<TableId, TableChange> previousTableSchemas = streamSplit.getTableSchemas();
Map<TableId, TableChange> newTableSchemas = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,20 @@ private void fillMetaDataForStreamSplit(StreamSplitMetaEvent metadataEvent) {
StreamSplit streamSplit = uncompletedStreamSplits.get(metadataEvent.getSplitId());
if (streamSplit != null) {
final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
final int receivedTotalFinishedSplitSize = metadataEvent.getTotalFinishedSplitSize();
final int expectedMetaGroupId =
getNextMetaGroupId(
streamSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
if (receivedMetaGroupId == expectedMetaGroupId) {
if (receivedTotalFinishedSplitSize < streamSplit.getTotalFinishedSplitSize()) {
LOG.warn(
"Source reader {} receives out of bound finished split size. The received finished split size is {}, but expected is {}, truncate it",
subtaskId,
receivedTotalFinishedSplitSize,
streamSplit.getTotalFinishedSplitSize());
streamSplit = toNormalStreamSplit(streamSplit, receivedTotalFinishedSplitSize);
uncompletedStreamSplits.put(streamSplit.splitId(), streamSplit);
} else if (receivedMetaGroupId == expectedMetaGroupId) {
Set<String> existedSplitsOfLastGroup =
getExistedSplitsOfLastGroup(
streamSplit.getFinishedSnapshotSplitInfos(),
Expand Down Expand Up @@ -461,7 +470,8 @@ private void requestStreamSplitMetaIfNeeded(StreamSplit streamSplit) {
streamSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize());
StreamSplitMetaRequestEvent splitMetaRequestEvent =
new StreamSplitMetaRequestEvent(splitId, nextMetaGroupId);
new StreamSplitMetaRequestEvent(
splitId, nextMetaGroupId, streamSplit.getTotalFinishedSplitSize());
context.sendSourceEventToCoordinator(splitMetaRequestEvent);
} else {
LOG.info("The meta of stream split {} has been collected success", splitId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

import java.lang.reflect.Field;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -948,20 +947,11 @@ private StreamExecutionEnvironment getStreamExecutionEnvironmentFromSavePoint(
// Close sink upsert materialize to show more clear test output.
Configuration tableConfig = new Configuration();
tableConfig.setString("table.exec.sink.upsert-materialize", "none");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig);
if (finishedSavePointPath != null) {
// restore from savepoint
// hack for test to visit protected TestStreamEnvironment#getConfiguration() method
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> clazz =
classLoader.loadClass(
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
Field field = clazz.getDeclaredField("configuration");
field.setAccessible(true);
Configuration configuration = (Configuration) field.get(env);
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
tableConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(tableConfig);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
Expand All @@ -987,6 +977,7 @@ private String getCreateTableStatement(
+ " 'password' = '%s',"
+ " 'database' = '%s',"
+ " 'collection' = '%s',"
+ " 'chunk-meta.group.size' = '2',"
+ " 'heartbeat.interval.ms' = '100',"
+ " 'scan.full-changelog' = 'true',"
+ " 'scan.newly-added-table.enabled' = 'true'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ private void captureNewlyAddedTables() {
.entrySet()
.removeIf(schema -> tablesToRemove.contains(schema.getKey()));
remainingSplits.removeIf(split -> tablesToRemove.contains(split.getTableId()));
LOG.info("Enumerator remove tables after restart: {}", tablesToRemove);
remainingTables.removeAll(tablesToRemove);
alreadyProcessedTables.removeIf(tableId -> tablesToRemove.contains(tableId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,22 +298,40 @@ private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEven
finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());
}
final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();

if (binlogSplitMeta.size() > requestMetaGroupId) {
final int totalFinishedSplitSizeOfReader = requestEvent.getTotalFinishedSplitSize();
final int totalFinishedSplitSizeOfEnumerator = splitAssigner.getFinishedSplitInfos().size();
if (totalFinishedSplitSizeOfReader > totalFinishedSplitSizeOfEnumerator) {
LOG.warn(
"Total finished split size of subtask {} is {}, while total finished split size of Enumerator is only {}. Try to truncate it",
subTask,
totalFinishedSplitSizeOfReader,
totalFinishedSplitSizeOfEnumerator);
BinlogSplitMetaEvent metadataEvent =
new BinlogSplitMetaEvent(
requestEvent.getSplitId(),
requestMetaGroupId,
null,
totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else if (binlogSplitMeta.size() > requestMetaGroupId) {
List<FinishedSnapshotSplitInfo> metaToSend = binlogSplitMeta.get(requestMetaGroupId);
BinlogSplitMetaEvent metadataEvent =
new BinlogSplitMetaEvent(
requestEvent.getSplitId(),
requestMetaGroupId,
metaToSend.stream()
.map(FinishedSnapshotSplitInfo::serialize)
.collect(Collectors.toList()));
.collect(Collectors.toList()),
totalFinishedSplitSizeOfEnumerator);
context.sendEventToSourceReader(subTask, metadataEvent);
} else {
LOG.error(
"The enumerator received invalid request meta group id {}, the valid meta group id range is [0, {}]",
requestMetaGroupId,
binlogSplitMeta.size() - 1);
throw new FlinkRuntimeException(
String.format(
"The enumerator received invalid request meta group id %s, the valid meta group id range is [0, %s]. Total finished split size of reader is %s, while the total finished split size of enumerator is %s.",
requestMetaGroupId,
binlogSplitMeta.size() - 1,
totalFinishedSplitSizeOfReader,
totalFinishedSplitSizeOfEnumerator));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;

import javax.annotation.Nullable;

import java.util.List;

/**
Expand All @@ -43,10 +45,17 @@ public class BinlogSplitMetaEvent implements SourceEvent {
*/
private final List<byte[]> metaGroup;

public BinlogSplitMetaEvent(String splitId, int metaGroupId, List<byte[]> metaGroup) {
private final int totalFinishedSplitSize;

public BinlogSplitMetaEvent(
String splitId,
int metaGroupId,
@Nullable List<byte[]> metaGroup,
int totalFinishedSplitSize) {
this.splitId = splitId;
this.metaGroupId = metaGroupId;
this.metaGroup = metaGroup;
this.totalFinishedSplitSize = totalFinishedSplitSize;
}

public String getSplitId() {
Expand All @@ -60,4 +69,8 @@ public int getMetaGroupId() {
public List<byte[]> getMetaGroup() {
return metaGroup;
}

public int getTotalFinishedSplitSize() {
return totalFinishedSplitSize;
}
}
Loading