-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-34688][cdc-connector] CDC framework split snapshot chunks asynchronously #3510
Conversation
@leonardBang , @ruanhang1993 , @Jiabao-Sun , @GOODBOY008 , CC |
Thanks @loserwang1024 for the improvement, @GOODBOY008 would you like to help review this PR when you have time>? |
public interface ChunkSplitter { | ||
|
||
/** | ||
* Called to open the chunk splitter to acquire any resources, like threads or jdbc connections. | ||
*/ | ||
void open(); | ||
|
||
/** Generates all snapshot splits (chunks) for the give data collection. */ | ||
Collection<SnapshotSplit> generateSplits(TableId tableId); | ||
Collection<SnapshotSplit> generateSplits(TableId tableId) throws Exception; | ||
|
||
/** Get whether the splitter has more chunks for current table. */ | ||
boolean hasNextChunk(); | ||
|
||
/** | ||
* Creates a snapshot of the state of this chunk splitter, to be stored in a checkpoint. | ||
* | ||
* <p>This method takes the ID of the checkpoint for which the state is snapshotted. Most | ||
* implementations should be able to ignore this parameter, because for the contents of the | ||
* snapshot, it doesn't matter for which checkpoint it gets created. This parameter can be | ||
* interesting for source connectors with external systems where those systems are themselves | ||
* aware of checkpoints; for example in cases where the enumerator notifies that system about a | ||
* specific checkpoint being triggered. | ||
* | ||
* @param checkpointId The ID of the checkpoint for which the snapshot is created. | ||
* @return an object containing the state of the split enumerator. | ||
*/ | ||
ChunkSplitterState snapshotState(long checkpointId); | ||
|
||
TableId getCurrentSplittingTableId(); | ||
|
||
/** | ||
* Called to open the chunk splitter to release any resources, like threads or jdbc connections. | ||
*/ | ||
void close() throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @loserwang1024 for this great work.
I have a small suggestion for the ChunkSplitter
interface.
The methods open, hasNextChunk, close, getCurrentSplittingTableId, close
are more like those of an Iterator
or Cursor
. Perhaps ChunkSplitter.generateSplits
could be designed to open a Cursor
, which can unify the iteration logic and support both one-time splitting and partial splitting. By using cursors, we might be able to support simultaneous spliting of multiple tables.
However, maintaining the state might become complex, as we need to keep track of the state of all open cursors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, It seem more flexible to support simultaneous splitting of multiple tables..However, we should maintain each table's spitter progress information in state. It may be rather than heavy.
@loserwang1024 Please rebase to master and solove confilcts. |
7589535
to
eb0ecc0
Compare
cb94500
to
39265b8
Compare
39265b8
to
0e7edd5
Compare
@GOODBOY008 done it, and all the tests are passed. |
@loserwang1024 Could you kindly rebase your PR to latest master branch to resolve potential conflicts? |
8424716
to
2a6c946
Compare
done it |
2a6c946
to
5d44e11
Compare
5d44e11
to
dba2648
Compare
...st/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
Outdated
Show resolved
Hide resolved
...org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java
Show resolved
Hide resolved
...a/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
Outdated
Show resolved
Hide resolved
} else if (!remainingTables.isEmpty()) { | ||
try { | ||
// wait for the asynchronous split to complete | ||
lock.wait(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock
has already used the synchronized
keyword,wait/notify
doesn't seem necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use wait
to release this lock. Then the lock can be gotten by other thread.
.../org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…s asynchronously (apache#3510)
As shown in https://issues.apache.org/jira/browse/FLINK-34688 :
In Mysql CDC, MysqlSnapshotSplitAssigner splits snapshot chunks asynchronously(#931). But CDC framework lacks it.
If table is too big to split, the enumerator will be stuck, and checkpoint will be influenced( sometime will checkpoint timeout occurs).