Skip to content

Commit

Permalink
Merge branch 'master' into avro-design
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyangyu authored May 17, 2022
2 parents ae30b8c + 90e26a2 commit 9033520
Show file tree
Hide file tree
Showing 50 changed files with 3,782 additions and 1,135 deletions.
4 changes: 2 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func TestFillV1(t *testing.T) {
},
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test.tbl3"}, PartitionRule: "ts"},
{Matcher: []string{"test.tbl4"}, PartitionRule: "rowid"},
{Matcher: []string{"test.tbl3"}, DispatcherRule: "ts"},
{Matcher: []string{"test.tbl4"}, DispatcherRule: "rowid"},
},
},
Cyclic: &config.CyclicConfig{
Expand Down
26 changes: 24 additions & 2 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,35 @@ func (r *RowChangedEvent) HandleKeyColumns() []*Column {
}

if len(pkeyCols) == 0 {
// TODO redact the message
log.Panic("Cannot find handle key columns, bug?", zap.Reflect("event", r))
log.Panic("Cannot find handle key columns.", zap.Any("event", r))
}

return pkeyCols
}

// PrimaryKeyColInfos returns the column(s) and colInfo(s) corresponding to the primary key(s)
func (r *RowChangedEvent) PrimaryKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
pkeyCols := make([]*Column, 0)
pkeyColInfos := make([]rowcodec.ColInfo, 0)

var cols []*Column
if r.IsDelete() {
cols = r.PreColumns
} else {
cols = r.Columns
}

for i, col := range cols {
if col != nil && col.Flag.IsPrimaryKey() {
pkeyCols = append(pkeyCols, col)
pkeyColInfos = append(pkeyColInfos, r.ColInfos[i])
}
}

// It is okay not to have primary keys, so the empty array is an acceptable result
return pkeyCols, pkeyColInfos
}

// WithHandlePrimaryFlag set `HandleKeyFlag` and `PrimaryKeyFlag`
func (r *RowChangedEvent) WithHandlePrimaryFlag(colNames map[string]struct{}) {
for _, col := range r.Columns {
Expand Down
Loading

0 comments on commit 9033520

Please sign in to comment.