-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE-1827][Mysql] Mysql connector support parallel snapshot when th… #2046
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
package com.ververica.cdc.connectors.mysql.debezium.reader; | ||
|
||
import org.apache.flink.annotation.VisibleForTesting; | ||
import org.apache.flink.util.CollectionUtil; | ||
import org.apache.flink.util.FlinkRuntimeException; | ||
|
||
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
|
@@ -49,8 +50,10 @@ | |
import javax.annotation.Nullable; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Iterator; | ||
import java.util.LinkedHashMap; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ExecutorService; | ||
|
@@ -248,7 +251,11 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException { | |
boolean reachBinlogEnd = false; | ||
SourceRecord lowWatermark = null; | ||
SourceRecord highWatermark = null; | ||
Map<Struct, SourceRecord> snapshotRecords = new LinkedHashMap<>(); | ||
|
||
SnapshotRecords snapshotRecords = | ||
containsPrimaryKey() | ||
? new PrimaryKeySnapshotRecords() | ||
: new NoPrimaryKeySnapshotRecords(); | ||
while (!reachBinlogEnd) { | ||
checkReadException(); | ||
List<DataChangeEvent> batch = queue.poll(); | ||
|
@@ -273,22 +280,15 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException { | |
break; | ||
} | ||
|
||
if (!reachBinlogStart) { | ||
snapshotRecords.put((Struct) record.key(), record); | ||
} else { | ||
if (isRequiredBinlogRecord(record)) { | ||
// upsert binlog events through the record key | ||
upsertBinlog(snapshotRecords, record); | ||
} | ||
} | ||
snapshotRecords.collect(record, reachBinlogStart); | ||
} | ||
} | ||
// snapshot split return its data once | ||
hasNextElement.set(false); | ||
|
||
final List<SourceRecord> normalizedRecords = new ArrayList<>(); | ||
normalizedRecords.add(lowWatermark); | ||
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values())); | ||
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.getRecords())); | ||
normalizedRecords.add(highWatermark); | ||
|
||
final List<SourceRecords> sourceRecordsSet = new ArrayList<>(); | ||
|
@@ -403,4 +403,58 @@ public boolean isRunning() { | |
return currentTaskRunning; | ||
} | ||
} | ||
|
||
private boolean containsPrimaryKey() { | ||
return !CollectionUtil.isNullOrEmpty(currentSnapshotSplit.getSplitKeyType().getFields()); | ||
} | ||
|
||
/** Collect records need to be sent, except low/high watermark. */ | ||
interface SnapshotRecords { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SnapshotRecords -> SnapshotRecordCollector |
||
void collect(SourceRecord record, boolean reachBinlogStart); | ||
|
||
Collection<SourceRecord> getRecords(); | ||
} | ||
|
||
/** Collect records with primary key. May upsert binlog events. */ | ||
class PrimaryKeySnapshotRecords implements SnapshotRecords { | ||
private final Map<Struct, SourceRecord> snapshotRecords = new LinkedHashMap<>(); | ||
|
||
@Override | ||
public void collect(SourceRecord record, boolean reachBinlogStart) { | ||
if (!reachBinlogStart) { | ||
snapshotRecords.put((Struct) record.key(), record); | ||
} else { | ||
if (isRequiredBinlogRecord(record)) { | ||
// upsert binlog events through the record key | ||
upsertBinlog(snapshotRecords, record); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public Collection<SourceRecord> getRecords() { | ||
return snapshotRecords.values(); | ||
} | ||
} | ||
|
||
/** Collect records without primary key. There are no binlog events. */ | ||
static class NoPrimaryKeySnapshotRecords implements SnapshotRecords { | ||
private final List<SourceRecord> snapshotRecords = new LinkedList<>(); | ||
|
||
@Override | ||
public void collect(SourceRecord record, boolean reachBinlogStart) { | ||
if (!reachBinlogStart) { | ||
snapshotRecords.add(record); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How to handle the delete and update events? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, the snapshot stage for the table without primary key only read snapshot data, then binlog data would be read only in the incremental stage. So there are no delete and update events in SnapshotSplitReader if the table has no primary key. Please correct me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand right, you want to skip the snapshot backfill task. |
||
} else { | ||
throw new IllegalStateException( | ||
"a table without primary key can not have binlog splits" | ||
+ " in initialization stage"); | ||
} | ||
} | ||
|
||
@Override | ||
public Collection<SourceRecord> getRecords() { | ||
return snapshotRecords; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,7 +148,12 @@ private void analyzeTable(TableId tableId) { | |
ChunkUtils.getChunkKeyColumn( | ||
currentSplittingTable, sourceConfig.getChunkKeyColumn()); | ||
splitType = ChunkUtils.getChunkKeyColumnType(splitColumn); | ||
minMaxOfSplitColumn = queryMinMax(jdbcConnection, tableId, splitColumn.name()); | ||
// split column would be null if table has no primary key. | ||
if (splitColumn != null) { | ||
minMaxOfSplitColumn = queryMinMax(jdbcConnection, tableId, splitColumn.name()); | ||
} else { | ||
minMaxOfSplitColumn = new Object[2]; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The chunk splitter should return a total split immediately when the table does not have the primary key. This part should not be changed. |
||
approximateRowCnt = queryApproximateRowCnt(jdbcConnection, tableId); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Fail to analyze table in chunk splitter.", e); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,20 +42,20 @@ public static RowType getChunkKeyColumnType(Table table, @Nullable String chunkK | |
return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumn)); | ||
} | ||
|
||
public static RowType getChunkKeyColumnType(Column chunkKeyColumn) { | ||
public static RowType getChunkKeyColumnType(@Nullable Column chunkKeyColumn) { | ||
if (chunkKeyColumn == null) { | ||
return (RowType) ROW().getLogicalType(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not be changed as this. I think we could use the first column as the chunk key if the chunk key is not set in the table options. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean using the first column as chunk key to split the table if it has no primary key ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I replied about this part at the first comment. |
||
return (RowType) | ||
ROW(FIELD(chunkKeyColumn.name(), MySqlTypeUtils.fromDbzColumn(chunkKeyColumn))) | ||
.getLogicalType(); | ||
} | ||
|
||
@Nullable | ||
public static Column getChunkKeyColumn(Table table, @Nullable String chunkKeyColumn) { | ||
List<Column> primaryKeys = table.primaryKeyColumns(); | ||
if (primaryKeys.isEmpty()) { | ||
throw new ValidationException( | ||
String.format( | ||
"Incremental snapshot for tables requires primary key," | ||
+ " but table %s doesn't have primary key.", | ||
table.id())); | ||
return null; | ||
} | ||
|
||
if (chunkKeyColumn != null) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better not to judge in this way. The chunk key will be set when we have to handle the big table in some cases to provide the at-least-once guarantee.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could init the SnapshotRecordCollector when the
record.key()
returns null.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review. You mean that the chunk key may be set even though the table has no primary keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If the table contains too much data, there is a performance problem with this solution.
In this case, we may provide the at-least-once semantics instead of the exactly once semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the exactly once semantic is required in some scenarios ? How about to provide an option to make it optional for users who can choose between better performance or exactly once semantic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should control this by the experimental option
scan.incremental.snapshot.chunk.key-column
.By default, we will return a single split. If users set the
scan.incremental.snapshot.chunk.key-column
, we will split the table into multi splits.