From d3b9f099e5b3342c105998b000b6f876b8284960 Mon Sep 17 00:00:00 2001 From: zhangyangyu Date: Tue, 17 May 2022 14:16:02 +0800 Subject: [PATCH] use primary keys --- cdc/model/sink.go | 11 ++++------- cdc/sink/codec/avro.go | 25 ++++++++++++++++--------- cdc/sink/codec/avro_test.go | 6 +++--- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 49f1dc872b5..1131ddc317d 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -329,8 +329,8 @@ func (r *RowChangedEvent) HandleKeyColumns() []*Column { return pkeyCols } -// HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s) -func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) { +// PrimaryKeyColInfos returns the column(s) and colInfo(s) corresponding to the primary key(s) +func (r *RowChangedEvent) PrimaryKeyColInfos() ([]*Column, []rowcodec.ColInfo) { pkeyCols := make([]*Column, 0) pkeyColInfos := make([]rowcodec.ColInfo, 0) @@ -342,16 +342,13 @@ func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) { } for i, col := range cols { - if col != nil && col.Flag.IsHandleKey() { + if col != nil && col.Flag.IsPrimaryKey() { pkeyCols = append(pkeyCols, col) pkeyColInfos = append(pkeyColInfos, r.ColInfos[i]) } } - if len(pkeyCols) == 0 { - log.Panic("Cannot find handle key columns.", zap.Any("event", r)) - } - + // It is okay not to have primary keys, so the empty array is an acceptable result return pkeyCols, pkeyColInfos } diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index ada62b2e152..116847c0714 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -96,13 +96,16 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent( return errors.Trace(err) } - evlp, err := res.toEnvelope() - if err != nil { - log.Error("AppendRowChangedEvent: could not construct Avro envelope", zap.Error(err)) - return errors.Trace(err) + if res != nil { + evlp, err := res.toEnvelope() + if err != nil { + log.Error("AppendRowChangedEvent: could not construct Avro envelope", zap.Error(err)) + return errors.Trace(err) + } + mqMessage.Key = evlp + } else { + mqMessage.Key = nil } - - mqMessage.Key = evlp mqMessage.IncRowsCount() a.resultBuf = append(a.resultBuf, mqMessage) @@ -148,7 +151,7 @@ func (a *AvroEventBatchEncoder) avroEncode( ctx context.Context, e *model.RowChangedEvent, topic string, - hasKey bool, + isKey bool, ) (*avroEncodeResult, error) { var ( cols []*model.Column @@ -157,8 +160,8 @@ func (a *AvroEventBatchEncoder) avroEncode( schemaManager *AvroSchemaManager operation string ) - if hasKey { - cols, colInfos = e.HandleKeyColInfos() + if isKey { + cols, colInfos = e.PrimaryKeyColInfos() enableTiDBExtension = false schemaManager = a.keySchemaManager } else { @@ -176,6 +179,10 @@ func (a *AvroEventBatchEncoder) avroEncode( } } + if len(cols) == 0 { + return nil, nil + } + namespace := getAvroNamespace(a.namespace, e.Table) schemaGen := func() (string, error) { diff --git a/cdc/sink/codec/avro_test.go b/cdc/sink/codec/avro_test.go index 0b97f1f328c..faf371936e2 100644 --- a/cdc/sink/codec/avro_test.go +++ b/cdc/sink/codec/avro_test.go @@ -727,13 +727,13 @@ func TestAvroEncode(t *testing.T) { cols = append( cols, - &model.Column{Name: "id", Value: int64(1), Type: mysql.TypeLong, Flag: model.HandleKeyFlag}, + &model.Column{Name: "id", Value: int64(1), Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag}, ) colInfos = append( colInfos, rowcodec.ColInfo{ ID: 1000, - IsPKHandle: true, + IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong), }, @@ -763,7 +763,7 @@ func TestAvroEncode(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - keyCols, keyColInfos := event.HandleKeyColInfos() + keyCols, keyColInfos := event.PrimaryKeyColInfos() namespace := getAvroNamespace(encoder.namespace, event.Table) keySchema, err := rowToAvroSchema(