Skip to content

Commit

Permalink
[mysql] Snapshot split the chunks asynchronously (ververica#915)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyun2024 committed Mar 15, 2022
1 parent 2ff83fd commit 73e8520
Showing 1 changed file with 67 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema;
import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
Expand All @@ -46,6 +48,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
Expand Down Expand Up @@ -75,6 +80,9 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private ChunkSplitter chunkSplitter;
private boolean isTableIdCaseSensitive;

private ExecutorService executor;
private Object lock;

@Nullable private Long checkpointIdToFinish;

public MySqlSnapshotSplitAssigner(
Expand Down Expand Up @@ -137,6 +145,7 @@ private MySqlSnapshotSplitAssigner(

@Override
public void open() {
lock = new Object();
chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive);

// the legacy state didn't snapshot remaining tables, discovery remaining table here
Expand All @@ -152,6 +161,7 @@ public void open() {
}
}
captureNewlyAddedTables();
asynchronouslySplitIfNeed();
}

private void captureNewlyAddedTables() {
Expand Down Expand Up @@ -180,27 +190,62 @@ private void captureNewlyAddedTables() {
}
}

private void asynchronouslySplitIfNeed() {
if (!remainingTables.isEmpty()) {
if (executor == null) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("snapshot-split").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
}

executor.submit(
() -> {
Iterator<TableId> iterator = remainingTables.iterator();
while (iterator.hasNext()) {
TableId nextTable = iterator.next();
// split the given table into chunks (snapshot splits)
Collection<MySqlSnapshotSplit> splits =
chunkSplitter.generateSplits(nextTable);
synchronized (lock) {
remainingSplits.addAll(splits);
iterator.remove();
lock.notify();
}
}
});
}
}

@Override
public Optional<MySqlSplit> getNext() {
if (!remainingSplits.isEmpty()) {
// return remaining splits firstly
Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator();
MySqlSnapshotSplit split = iterator.next();
iterator.remove();
assignedSplits.put(split.splitId(), split);
return Optional.of(split);
} else {
// it's turn for next table
TableId nextTable = remainingTables.pollFirst();
if (nextTable != null) {
// split the given table into chunks (snapshot splits)
Collection<MySqlSnapshotSplit> splits = chunkSplitter.generateSplits(nextTable);
remainingSplits.addAll(splits);
alreadyProcessedTables.add(nextTable);
return getNext();
} else {
return Optional.empty();
synchronized (lock) {
// return remaining splits firstly
Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator();
MySqlSnapshotSplit split = iterator.next();
remainingSplits.remove(split);
assignedSplits.put(split.splitId(), split);
addAlreadyProcessedTablesIfNotExists(split.getTableId());
return Optional.of(split);
}
} else if (!remainingTables.isEmpty()) {
// wait for the asynchronous split to complete
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return getNext();
} else {
return Optional.empty();
}
}

private void addAlreadyProcessedTablesIfNotExists(TableId tableId) {
if (!alreadyProcessedTables.contains(tableId)) {
alreadyProcessedTables.add(tableId);
}
}

Expand Down Expand Up @@ -319,7 +364,11 @@ public void wakeup() {
}

@Override
public void close() {}
public void close() {
if (executor != null) {
executor.shutdown();
}
}

/** Indicates there is no more splits available in this assigner. */
public boolean noMoreSplits() {
Expand Down

0 comments on commit 73e8520

Please sign in to comment.