Skip to content

Commit

Permalink
codec(ticdc): use handlekey instead of primary key for avro (#5601)
Browse files Browse the repository at this point in the history
close #5597
  • Loading branch information
zhangyangyu authored May 26, 2022
1 parent e87ea0f commit 4ed4693
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 12 deletions.
13 changes: 5 additions & 8 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,12 @@ func (r *RowChangedEvent) HandleKeyColumns() []*Column {
}
}

if len(pkeyCols) == 0 {
log.Panic("Cannot find handle key columns.", zap.Any("event", r))
}

// It is okay not to have handle keys, so the empty array is an acceptable result
return pkeyCols
}

// PrimaryKeyColInfos returns the column(s) and colInfo(s) corresponding to the primary key(s)
func (r *RowChangedEvent) PrimaryKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
// HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s)
func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
pkeyCols := make([]*Column, 0)
pkeyColInfos := make([]rowcodec.ColInfo, 0)

Expand All @@ -345,13 +342,13 @@ func (r *RowChangedEvent) PrimaryKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
}

for i, col := range cols {
if col != nil && col.Flag.IsPrimaryKey() {
if col != nil && col.Flag.IsHandleKey() {
pkeyCols = append(pkeyCols, col)
pkeyColInfos = append(pkeyColInfos, r.ColInfos[i])
}
}

// It is okay not to have primary keys, so the empty array is an acceptable result
// It is okay not to have handle keys, so the empty array is an acceptable result
return pkeyCols, pkeyColInfos
}

Expand Down
20 changes: 20 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,26 @@ func TestRowChangedEventFuncs(t *testing.T) {
require.False(t, insertRow.IsDelete())
require.Equal(t, expectedPrimaryKeyCols, insertRow.PrimaryKeyColumns())
require.Equal(t, expectedHandleKeyCols, insertRow.HandleKeyColumns())

forceReplicaRow := &RowChangedEvent{
Table: &TableName{
Schema: "test",
Table: "t1",
},
Columns: []*Column{
{
Name: "a",
Value: 1,
Flag: 0,
}, {
Name: "b",
Value: 2,
Flag: 0,
},
},
}
require.Empty(t, forceReplicaRow.PrimaryKeyColumns())
require.Empty(t, forceReplicaRow.HandleKeyColumns())
}

func TestColumnValueString(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (a *AvroEventBatchEncoder) avroEncode(
operation string
)
if isKey {
cols, colInfos = e.PrimaryKeyColInfos()
cols, colInfos = e.HandleKeyColInfos()
enableTiDBExtension = false
schemaManager = a.keySchemaManager
} else {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/mq/codec/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,14 @@ func TestAvroEncode(t *testing.T) {
Name: "id",
Value: int64(1),
Type: mysql.TypeLong,
Flag: model.PrimaryKeyFlag,
Flag: model.HandleKeyFlag,
},
)
colInfos = append(
colInfos,
rowcodec.ColInfo{
ID: 1000,
IsPKHandle: false,
IsPKHandle: true,
VirtualGenCol: false,
Ft: types.NewFieldType(mysql.TypeLong),
},
Expand Down Expand Up @@ -770,7 +770,7 @@ func TestAvroEncode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

keyCols, keyColInfos := event.PrimaryKeyColInfos()
keyCols, keyColInfos := event.HandleKeyColInfos()
namespace := getAvroNamespace(encoder.namespace, event.Table)

keySchema, err := rowToAvroSchema(
Expand Down
68 changes: 68 additions & 0 deletions tests/mq_protocol_tests/cases/case_handle_key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cases

import (
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/tests/mq_protocol_tests/framework"
)

// HandleKeyCase is base impl of test case for non primary handle keys
type HandleKeyCase struct {
framework.Task
}

// NewHandleKeyCase create a test case which have non primary handle keys
func NewHandleKeyCase(task framework.Task) *HandleKeyCase {
return &HandleKeyCase{
Task: task,
}
}

// Name impl framework.Task interface
func (s *HandleKeyCase) Name() string {
return "Handle Key"
}

// Run impl framework.Task interface
func (s *HandleKeyCase) Run(ctx *framework.TaskContext) error {
_, err := ctx.Upstream.ExecContext(ctx.Ctx, "create table test (id int not null unique, value int)")
if err != nil {
return err
}

// Get a handle of an existing table
table := ctx.SQLHelper().GetTable("test")
// Create an SQL request, send it to the upstream, wait for completion and check the correctness of replication
err = table.Insert(map[string]interface{}{
"id": 0,
"value": 0,
}).Send().Wait().Check()
if err != nil {
return errors.AddStack(err)
}

err = table.Upsert(map[string]interface{}{
"id": 0,
"value": 1,
}).Send().Wait().Check()
if err != nil {
return err
}

err = table.Delete(map[string]interface{}{
"id": 0,
}).Send().Wait().Check()
return err
}
1 change: 1 addition & 0 deletions tests/mq_protocol_tests/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func testAvro() {
cases.NewManyTypesCase(task),
cases.NewUnsignedCase(task),
cases.NewCompositePKeyCase(task),
cases.NewHandleKeyCase(task),
}

runTests(testCases, env)
Expand Down

0 comments on commit 4ed4693

Please sign in to comment.