Skip to content

Commit

Permalink
mounter(ticdc): calculate raw bytes checksum by using handle (#11720)
Browse files Browse the repository at this point in the history
close #11713
  • Loading branch information
3AceShowHand authored Nov 7, 2024
1 parent 1da37a2 commit 64decf6
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 83 deletions.
80 changes: 41 additions & 39 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,20 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID)
}
if bytes.HasPrefix(key, recordPrefix) {
rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Key, raw.Value, raw.OldValue, baseInfo)
recordID, err := tablecodec.DecodeRowKey(raw.Key)
if err != nil {
return nil, errors.Trace(err)
}
baseInfo.RecordID = recordID

rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
if rowKV == nil {
return nil, nil
}
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, checksumKey, raw.ApproximateDataSize())
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, recordID, checksumKey, raw.ApproximateDataSize())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,28 +237,21 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra

func (m *mounter) unmarshalRowKVEntry(
tableInfo *model.TableInfo,
rawKey []byte,
rawValue []byte,
rawOldValue []byte,
base baseKVEntry,
) (*rowKVEntry, error) {
recordID, err := tablecodec.DecodeRowKey(rawKey)
if err != nil {
return nil, errors.Trace(err)
}
base.RecordID = recordID

var (
row, preRow map[int64]types.Datum
rowExist, preRowExist bool
)

row, rowExist, err = m.decodeRow(rawValue, recordID, tableInfo, false)
row, rowExist, err := m.decodeRow(rawValue, base.RecordID, tableInfo, false)
if err != nil {
return nil, errors.Trace(err)
}

preRow, preRowExist, err = m.decodeRow(rawOldValue, recordID, tableInfo, true)
preRow, preRowExist, err = m.decodeRow(rawOldValue, base.RecordID, tableInfo, true)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -490,33 +489,34 @@ func (m *mounter) verifyColumnChecksum(

checksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
log.Error("failed to calculate the checksum",
zap.Uint32("first", first), zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns), zap.Error(err))
return 0, false, err
}

// the first checksum matched, it hits in the most case.
if checksum == first {
log.Debug("checksum matched", zap.Uint32("checksum", checksum), zap.Uint32("first", first))
return checksum, true, nil
}

extra, ok := decoder.GetExtraChecksum()
if ok && checksum == extra {
log.Debug("extra checksum matched, this may happen the upstream TiDB is during the DDL execution phase",
zap.Uint32("checksum", checksum), zap.Uint32("extra", extra))
return checksum, true, nil
}

if !skipFail {
log.Error("cannot found the extra checksum, the first checksum mismatched",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
return checksum, false, nil
}

if time.Since(m.lastSkipOldValueTime) > time.Minute {
log.Warn("checksum mismatch on the old value, "+
"this may caused by Add Column / Drop Column executed, skip verification",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
m.lastSkipOldValueTime = time.Now()
}
return checksum, true, nil
Expand Down Expand Up @@ -602,7 +602,7 @@ func newDatum(value interface{}, ft types.FieldType) (types.Datum, error) {

func verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
handle kv.Handle, key kv.Key, tz *time.Location,
) (uint32, bool, error) {
expected, ok := decoder.GetChecksum()
if !ok {
Expand All @@ -621,12 +621,14 @@ func verifyRawBytesChecksum(
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
log.Error("build datum for raw checksum calculation failed",
zap.Any("col", col), zap.Any("columnInfo", columnInfo), zap.Error(err))
return 0, false, errors.Trace(err)
}
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, handle, nil)
if err != nil {
return 0, false, errors.Trace(err)
}
Expand All @@ -635,7 +637,10 @@ func verifyRawBytesChecksum(
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))
zap.Int("version", decoder.ChecksumVersion()),
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained),
zap.Any("tableInfo", tableInfo), zap.Any("columns", columns),
zap.Any("handle", handle.String()), zap.Any("tz", tz))

return expected, false, nil
}
Expand All @@ -645,7 +650,7 @@ func verifyRawBytesChecksum(
func (m *mounter) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
handle kv.Handle, key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
Expand All @@ -665,17 +670,22 @@ func (m *mounter) verifyChecksum(
// Update / Delete event correctly, after Add Column / Drop column DDL,
// since the table schema does not contain complete column information.
return m.verifyColumnChecksum(columnInfos, rawColumns, decoder, isPreRow)
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
case 1, 2:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, handle, key, m.tz)
if err != nil {
log.Error("calculate raw checksum failed",
zap.Int("version", version), zap.Any("tz", m.tz), zap.Any("handle", handle.String()),
zap.Any("key", key), zap.Any("columns", columns), zap.Error(err))
return 0, false, errors.Trace(err)
}
if !matched {
return expected, matched, err
}
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
log.Error("failed to calculate column-level checksum, after raw checksum verification passed",
zap.Any("columnsInfo", columnInfos), zap.Any("rawColumns", rawColumns),
zap.Any("tz", m.tz), zap.Error(err))
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
Expand All @@ -685,7 +695,7 @@ func (m *mounter) verifyChecksum(
}

func (m *mounter) mountRowKVEntry(
tableInfo *model.TableInfo, row *rowKVEntry, key kv.Key, dataSize int64,
tableInfo *model.TableInfo, row *rowKVEntry, handle kv.Handle, key kv.Key, dataSize int64,
) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
Expand Down Expand Up @@ -719,19 +729,15 @@ func (m *mounter) mountRowKVEntry(
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, handle, key, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
return nil, rawRow, errors.Trace(err)
}

if !matched {
log.Error("previous columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
zap.Uint32("checksum", preChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("preCols", preCols), zap.Any("rawCols", preRawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand All @@ -751,18 +757,14 @@ func (m *mounter) mountRowKVEntry(
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, handle, key, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("current columns checksum mismatch",
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
zap.Uint32("checksum", currentChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("cols", cols), zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand Down
11 changes: 0 additions & 11 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -531,16 +530,6 @@ func TestHandleJob(t *testing.T) {
job := &timodel.Job{
Type: timodel.ActionFlashbackCluster,
BinlogInfo: &timodel.HistoryInfo{},
Args: []interface{}{
998,
map[string]interface{}{},
true, /* tidb_gc_enable */
variable.On, /* tidb_enable_auto_analyze */
variable.Off, /* tidb_super_read_only */
0, /* totalRegions */
0, /* startTS */
0, /* commitTS */
},
}
skip, err := ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
Expand Down
17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ require (
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
github.com/pingcap/tidb v1.1.0-beta.0.20241014034929-94b2ac04a0c4
github.com/pingcap/tidb v1.1.0-beta.0.20241107131230-e2505e95a03c
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7
github.com/pingcap/tidb/pkg/parser v0.0.0-20241014034929-94b2ac04a0c4
github.com/prometheus/client_golang v1.20.4
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand All @@ -89,9 +89,9 @@ require (
github.com/swaggo/swag v1.16.3
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/thanhpk/randstr v1.0.6
github.com/tikv/client-go/v2 v2.0.8-0.20241008085809-c3e10ae7c8fc
github.com/tikv/client-go/v2 v2.0.8-0.20241023023120-691e80ae0ea9
github.com/tikv/pd v1.1.0-beta.0.20240407022249-7179657d129b
github.com/tikv/pd/client v0.0.0-20240926021936-642f0e919b0d
github.com/tikv/pd/client v0.0.0-20241016064947-b70107ec31e6
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -116,7 +116,7 @@ require (
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
golang.org/x/text v0.19.0
golang.org/x/time v0.5.0
golang.org/x/time v0.7.0
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291
google.golang.org/grpc v1.64.0
Expand Down Expand Up @@ -159,7 +159,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-resty/resty/v2 v2.11.0 // indirect
github.com/goccy/go-reflect v1.2.0 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/gofuzz v1.2.0 // indirect
Expand All @@ -173,6 +173,7 @@ require (
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/ks3sdklib/aws-sdk-go v1.2.9 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lance6716/pebble v0.0.0-20241104073946-6f55c09bd183 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down Expand Up @@ -251,7 +252,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
Expand Down Expand Up @@ -325,7 +326,7 @@ require (
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d // indirect
github.com/pingcap/badger v1.5.1-0.20241015064302-38533b6cbf8d // indirect
github.com/pingcap/fn v1.0.0 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
Expand Down
Loading

0 comments on commit 64decf6

Please sign in to comment.