Skip to content

Commit

Permalink
sink/codec(cdc): refactor Avro codec (#5339)
Browse files Browse the repository at this point in the history
ref #5338
  • Loading branch information
zhangyangyu committed May 17, 2022
1 parent 119dbe0 commit c8e7fff
Show file tree
Hide file tree
Showing 39 changed files with 3,521 additions and 1,000 deletions.
26 changes: 24 additions & 2 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,35 @@ func (r *RowChangedEvent) HandleKeyColumns() []*Column {
}

if len(pkeyCols) == 0 {
// TODO redact the message
log.Panic("Cannot find handle key columns, bug?", zap.Reflect("event", r))
log.Panic("Cannot find handle key columns.", zap.Any("event", r))
}

return pkeyCols
}

// PrimaryKeyColInfos returns the column(s) and colInfo(s) corresponding to the primary key(s)
func (r *RowChangedEvent) PrimaryKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
pkeyCols := make([]*Column, 0)
pkeyColInfos := make([]rowcodec.ColInfo, 0)

var cols []*Column
if r.IsDelete() {
cols = r.PreColumns
} else {
cols = r.Columns
}

for i, col := range cols {
if col != nil && col.Flag.IsPrimaryKey() {
pkeyCols = append(pkeyCols, col)
pkeyColInfos = append(pkeyColInfos, r.ColInfos[i])
}
}

// It is okay not to have primary keys, so the empty array is an acceptable result
return pkeyCols, pkeyColInfos
}

// WithHandlePrimaryFlag set `HandleKeyFlag` and `PrimaryKeyFlag`
func (r *RowChangedEvent) WithHandlePrimaryFlag(colNames map[string]struct{}) {
for _, col := range r.Columns {
Expand Down
Loading

0 comments on commit c8e7fff

Please sign in to comment.