Skip to content

Commit

Permalink
use primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyangyu committed May 17, 2022
1 parent 756553f commit d3b9f09
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 19 deletions.
11 changes: 4 additions & 7 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ func (r *RowChangedEvent) HandleKeyColumns() []*Column {
return pkeyCols
}

// HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s)
func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
// PrimaryKeyColInfos returns the column(s) and colInfo(s) corresponding to the primary key(s)
func (r *RowChangedEvent) PrimaryKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
pkeyCols := make([]*Column, 0)
pkeyColInfos := make([]rowcodec.ColInfo, 0)

Expand All @@ -342,16 +342,13 @@ func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
}

for i, col := range cols {
if col != nil && col.Flag.IsHandleKey() {
if col != nil && col.Flag.IsPrimaryKey() {
pkeyCols = append(pkeyCols, col)
pkeyColInfos = append(pkeyColInfos, r.ColInfos[i])
}
}

if len(pkeyCols) == 0 {
log.Panic("Cannot find handle key columns.", zap.Any("event", r))
}

// It is okay not to have primary keys, so the empty array is an acceptable result
return pkeyCols, pkeyColInfos
}

Expand Down
25 changes: 16 additions & 9 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,16 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(
return errors.Trace(err)
}

evlp, err := res.toEnvelope()
if err != nil {
log.Error("AppendRowChangedEvent: could not construct Avro envelope", zap.Error(err))
return errors.Trace(err)
if res != nil {
evlp, err := res.toEnvelope()
if err != nil {
log.Error("AppendRowChangedEvent: could not construct Avro envelope", zap.Error(err))
return errors.Trace(err)
}
mqMessage.Key = evlp
} else {
mqMessage.Key = nil
}

mqMessage.Key = evlp
mqMessage.IncRowsCount()
a.resultBuf = append(a.resultBuf, mqMessage)

Expand Down Expand Up @@ -148,7 +151,7 @@ func (a *AvroEventBatchEncoder) avroEncode(
ctx context.Context,
e *model.RowChangedEvent,
topic string,
hasKey bool,
isKey bool,
) (*avroEncodeResult, error) {
var (
cols []*model.Column
Expand All @@ -157,8 +160,8 @@ func (a *AvroEventBatchEncoder) avroEncode(
schemaManager *AvroSchemaManager
operation string
)
if hasKey {
cols, colInfos = e.HandleKeyColInfos()
if isKey {
cols, colInfos = e.PrimaryKeyColInfos()
enableTiDBExtension = false
schemaManager = a.keySchemaManager
} else {
Expand All @@ -176,6 +179,10 @@ func (a *AvroEventBatchEncoder) avroEncode(
}
}

if len(cols) == 0 {
return nil, nil
}

namespace := getAvroNamespace(a.namespace, e.Table)

schemaGen := func() (string, error) {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/codec/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,13 +727,13 @@ func TestAvroEncode(t *testing.T) {

cols = append(
cols,
&model.Column{Name: "id", Value: int64(1), Type: mysql.TypeLong, Flag: model.HandleKeyFlag},
&model.Column{Name: "id", Value: int64(1), Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag},
)
colInfos = append(
colInfos,
rowcodec.ColInfo{
ID: 1000,
IsPKHandle: true,
IsPKHandle: false,
VirtualGenCol: false,
Ft: types.NewFieldType(mysql.TypeLong),
},
Expand Down Expand Up @@ -763,7 +763,7 @@ func TestAvroEncode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

keyCols, keyColInfos := event.HandleKeyColInfos()
keyCols, keyColInfos := event.PrimaryKeyColInfos()
namespace := getAvroNamespace(encoder.namespace, event.Table)

keySchema, err := rowToAvroSchema(
Expand Down

0 comments on commit d3b9f09

Please sign in to comment.