-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mysql] Snapshot split the chunks asynchronously (ververica#915) #931
Changes from 1 commit
00b97c6
a17db12
2ad9f42
c8836ba
3290014
c0edc17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,6 +21,8 @@ | |||||
import org.apache.flink.util.FlinkRuntimeException; | ||||||
import org.apache.flink.util.Preconditions; | ||||||
|
||||||
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; | ||||||
|
||||||
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; | ||||||
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema; | ||||||
import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState; | ||||||
|
@@ -46,6 +48,10 @@ | |||||
import java.util.List; | ||||||
import java.util.Map; | ||||||
import java.util.Optional; | ||||||
import java.util.concurrent.CopyOnWriteArrayList; | ||||||
import java.util.concurrent.ExecutorService; | ||||||
import java.util.concurrent.Executors; | ||||||
import java.util.concurrent.ThreadFactory; | ||||||
import java.util.stream.Collectors; | ||||||
|
||||||
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables; | ||||||
|
@@ -75,6 +81,9 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { | |||||
private ChunkSplitter chunkSplitter; | ||||||
private boolean isTableIdCaseSensitive; | ||||||
|
||||||
private ExecutorService executor; | ||||||
private Object lock; | ||||||
|
||||||
@Nullable private Long checkpointIdToFinish; | ||||||
|
||||||
public MySqlSnapshotSplitAssigner( | ||||||
|
@@ -126,7 +135,7 @@ private MySqlSnapshotSplitAssigner( | |||||
this.sourceConfig = sourceConfig; | ||||||
this.currentParallelism = currentParallelism; | ||||||
this.alreadyProcessedTables = alreadyProcessedTables; | ||||||
this.remainingSplits = remainingSplits; | ||||||
this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits); | ||||||
this.assignedSplits = assignedSplits; | ||||||
this.splitFinishedOffsets = splitFinishedOffsets; | ||||||
this.assignerStatus = assignerStatus; | ||||||
|
@@ -137,6 +146,7 @@ private MySqlSnapshotSplitAssigner( | |||||
|
||||||
@Override | ||||||
public void open() { | ||||||
lock = new Object(); | ||||||
chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive); | ||||||
|
||||||
// the legacy state didn't snapshot remaining tables, discovery remaining table here | ||||||
|
@@ -152,6 +162,7 @@ public void open() { | |||||
} | ||||||
} | ||||||
captureNewlyAddedTables(); | ||||||
asynchronouslySplitIfNeed(); | ||||||
} | ||||||
|
||||||
private void captureNewlyAddedTables() { | ||||||
|
@@ -180,27 +191,60 @@ private void captureNewlyAddedTables() { | |||||
} | ||||||
} | ||||||
|
||||||
private void asynchronouslySplitIfNeed() { | ||||||
if (!remainingTables.isEmpty()) { | ||||||
if (executor == null) { | ||||||
ThreadFactory threadFactory = | ||||||
new ThreadFactoryBuilder().setNameFormat("snapshot-split").build(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
this.executor = Executors.newSingleThreadExecutor(threadFactory); | ||||||
} | ||||||
|
||||||
executor.submit( | ||||||
() -> { | ||||||
Iterator<TableId> iterator = remainingTables.iterator(); | ||||||
while (iterator.hasNext()) { | ||||||
TableId nextTable = iterator.next(); | ||||||
// split the given table into chunks (snapshot splits) | ||||||
Collection<MySqlSnapshotSplit> splits = | ||||||
chunkSplitter.generateSplits(nextTable); | ||||||
synchronized (lock) { | ||||||
remainingSplits.addAll(splits); | ||||||
iterator.remove(); | ||||||
lock.notify(); | ||||||
} | ||||||
} | ||||||
}); | ||||||
} | ||||||
} | ||||||
|
||||||
@Override | ||||||
public Optional<MySqlSplit> getNext() { | ||||||
if (!remainingSplits.isEmpty()) { | ||||||
// return remaining splits firstly | ||||||
Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator(); | ||||||
MySqlSnapshotSplit split = iterator.next(); | ||||||
iterator.remove(); | ||||||
remainingSplits.remove(split); | ||||||
assignedSplits.put(split.splitId(), split); | ||||||
addAlreadyProcessedTablesIfNotExists(split.getTableId()); | ||||||
return Optional.of(split); | ||||||
} else { | ||||||
// it's turn for next table | ||||||
TableId nextTable = remainingTables.pollFirst(); | ||||||
if (nextTable != null) { | ||||||
// split the given table into chunks (snapshot splits) | ||||||
Collection<MySqlSnapshotSplit> splits = chunkSplitter.generateSplits(nextTable); | ||||||
remainingSplits.addAll(splits); | ||||||
alreadyProcessedTables.add(nextTable); | ||||||
return getNext(); | ||||||
} else { | ||||||
return Optional.empty(); | ||||||
} else if (!remainingTables.isEmpty()) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a race condition that |
||||||
// wait for the asynchronous split to complete | ||||||
synchronized (lock) { | ||||||
try { | ||||||
lock.wait(); | ||||||
} catch (InterruptedException e) { | ||||||
e.printStackTrace(); | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please throw |
||||||
} | ||||||
return getNext(); | ||||||
} else { | ||||||
return Optional.empty(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we release the executor resource after splitting finished ? |
||||||
} | ||||||
} | ||||||
|
||||||
private void addAlreadyProcessedTablesIfNotExists(TableId tableId) { | ||||||
if (!alreadyProcessedTables.contains(tableId)) { | ||||||
alreadyProcessedTables.add(tableId); | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -319,7 +363,11 @@ public void wakeup() { | |||||
} | ||||||
|
||||||
@Override | ||||||
public void close() {} | ||||||
public void close() { | ||||||
if (executor != null) { | ||||||
executor.shutdown(); | ||||||
} | ||||||
} | ||||||
|
||||||
/** Indicates there is no more splits available in this assigner. */ | ||||||
public boolean noMoreSplits() { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.