diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 9b4f9620315..2165eaf919c 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -325,15 +325,12 @@ func (r *RowChangedEvent) HandleKeyColumns() []*Column { } } - if len(pkeyCols) == 0 { - log.Panic("Cannot find handle key columns.", zap.Any("event", r)) - } - + // It is okay not to have handle keys, so the empty array is an acceptable result return pkeyCols } -// PrimaryKeyColInfos returns the column(s) and colInfo(s) corresponding to the primary key(s) -func (r *RowChangedEvent) PrimaryKeyColInfos() ([]*Column, []rowcodec.ColInfo) { +// HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s) +func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) { pkeyCols := make([]*Column, 0) pkeyColInfos := make([]rowcodec.ColInfo, 0) @@ -345,13 +342,13 @@ func (r *RowChangedEvent) PrimaryKeyColInfos() ([]*Column, []rowcodec.ColInfo) { } for i, col := range cols { - if col != nil && col.Flag.IsPrimaryKey() { + if col != nil && col.Flag.IsHandleKey() { pkeyCols = append(pkeyCols, col) pkeyColInfos = append(pkeyColInfos, r.ColInfos[i]) } } - // It is okay not to have primary keys, so the empty array is an acceptable result + // It is okay not to have handle keys, so the empty array is an acceptable result return pkeyCols, pkeyColInfos } diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index e97d9fec46b..4fdb88902ea 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -141,6 +141,26 @@ func TestRowChangedEventFuncs(t *testing.T) { require.False(t, insertRow.IsDelete()) require.Equal(t, expectedPrimaryKeyCols, insertRow.PrimaryKeyColumns()) require.Equal(t, expectedHandleKeyCols, insertRow.HandleKeyColumns()) + + forceReplicaRow := &RowChangedEvent{ + Table: &TableName{ + Schema: "test", + Table: "t1", + }, + Columns: []*Column{ + { + Name: "a", + Value: 1, + Flag: 0, + }, { + Name: "b", + Value: 2, + Flag: 0, + }, + }, + } + require.Empty(t, forceReplicaRow.PrimaryKeyColumns()) + require.Empty(t, forceReplicaRow.HandleKeyColumns()) } func TestColumnValueString(t *testing.T) { diff --git a/cdc/sink/mq/codec/avro.go b/cdc/sink/mq/codec/avro.go index a0d0e0042b9..a7e18db618a 100644 --- a/cdc/sink/mq/codec/avro.go +++ b/cdc/sink/mq/codec/avro.go @@ -163,7 +163,7 @@ func (a *AvroEventBatchEncoder) avroEncode( operation string ) if isKey { - cols, colInfos = e.PrimaryKeyColInfos() + cols, colInfos = e.HandleKeyColInfos() enableTiDBExtension = false schemaManager = a.keySchemaManager } else { diff --git a/cdc/sink/mq/codec/avro_test.go b/cdc/sink/mq/codec/avro_test.go index 87e99de4ec1..f235f95b109 100644 --- a/cdc/sink/mq/codec/avro_test.go +++ b/cdc/sink/mq/codec/avro_test.go @@ -733,14 +733,14 @@ func TestAvroEncode(t *testing.T) { Name: "id", Value: int64(1), Type: mysql.TypeLong, - Flag: model.PrimaryKeyFlag, + Flag: model.HandleKeyFlag, }, ) colInfos = append( colInfos, rowcodec.ColInfo{ ID: 1000, - IsPKHandle: false, + IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong), }, @@ -770,7 +770,7 @@ func TestAvroEncode(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - keyCols, keyColInfos := event.PrimaryKeyColInfos() + keyCols, keyColInfos := event.HandleKeyColInfos() namespace := getAvroNamespace(encoder.namespace, event.Table) keySchema, err := rowToAvroSchema( diff --git a/tests/mq_protocol_tests/cases/case_handle_key.go b/tests/mq_protocol_tests/cases/case_handle_key.go new file mode 100644 index 00000000000..50f50a13e0e --- /dev/null +++ b/tests/mq_protocol_tests/cases/case_handle_key.go @@ -0,0 +1,68 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/tests/mq_protocol_tests/framework" +) + +// HandleKeyCase is base impl of test case for non primary handle keys +type HandleKeyCase struct { + framework.Task +} + +// NewHandleKeyCase create a test case which have non primary handle keys +func NewHandleKeyCase(task framework.Task) *HandleKeyCase { + return &HandleKeyCase{ + Task: task, + } +} + +// Name impl framework.Task interface +func (s *HandleKeyCase) Name() string { + return "Handle Key" +} + +// Run impl framework.Task interface +func (s *HandleKeyCase) Run(ctx *framework.TaskContext) error { + _, err := ctx.Upstream.ExecContext(ctx.Ctx, "create table test (id int not null unique, value int)") + if err != nil { + return err + } + + // Get a handle of an existing table + table := ctx.SQLHelper().GetTable("test") + // Create an SQL request, send it to the upstream, wait for completion and check the correctness of replication + err = table.Insert(map[string]interface{}{ + "id": 0, + "value": 0, + }).Send().Wait().Check() + if err != nil { + return errors.AddStack(err) + } + + err = table.Upsert(map[string]interface{}{ + "id": 0, + "value": 1, + }).Send().Wait().Check() + if err != nil { + return err + } + + err = table.Delete(map[string]interface{}{ + "id": 0, + }).Send().Wait().Check() + return err +} diff --git a/tests/mq_protocol_tests/main.go b/tests/mq_protocol_tests/main.go index de2f19e2569..7c48a609f59 100644 --- a/tests/mq_protocol_tests/main.go +++ b/tests/mq_protocol_tests/main.go @@ -42,6 +42,7 @@ func testAvro() { cases.NewManyTypesCase(task), cases.NewUnsignedCase(task), cases.NewCompositePKeyCase(task), + cases.NewHandleKeyCase(task), } runTests(testCases, env)