Skip to content

Commit

Permalink
txn: use handle to encode checksum instead of the key (#57139) (#57187)
Browse files Browse the repository at this point in the history
close #57174
  • Loading branch information
ti-chi-bot authored Nov 7, 2024
1 parent 7e87622 commit d39b71b
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
ec := w.exprCtx.GetEvalCtx().ErrCtx()
var checksum rowcodec.Checksum
if w.checksumNeeded {
checksum = rowcodec.RawChecksum{Key: recordKey}
checksum = rowcodec.RawChecksum{Handle: handle}
}
newRowVal, err := tablecodec.EncodeRow(sysTZ, newRow, newColumnIDs, nil, nil, checksum, rd)
err = ec.HandleError(err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestRowFormatWithChecksums(t *testing.T) {
data, err := h.GetMvccByEncodedKey(encodedKey)
require.NoError(t, err)
// row value with checksums
expected := []byte{0x80, 0x2, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0xa9, 0x7a, 0xf4, 0xc8}
expected := []byte{0x80, 0x2, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0x5b, 0x6a, 0x78, 0x7c}
require.Equal(t, expected, data.Info.Writes[0].ShortValue)
tk.MustExec("drop table if exists t")
}
Expand All @@ -284,7 +284,7 @@ func TestRowLevelChecksumWithMultiSchemaChange(t *testing.T) {
data, err := h.GetMvccByEncodedKey(encodedKey)
require.NoError(t, err)
// checksum skipped and with a null col vv
expected := []byte{0x80, 0x2, 0x3, 0x0, 0x1, 0x0, 0x1, 0x2, 0x4, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0xeb, 0x42, 0xda, 0x20}
expected := []byte{0x80, 0x2, 0x3, 0x0, 0x1, 0x0, 0x1, 0x2, 0x4, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0xc5, 0x73, 0x5f, 0x1f}
require.Equal(t, expected, data.Info.Writes[0].ShortValue)
tk.MustExec("drop table if exists t")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (t *TableCommon) updateRecord(sctx table.MutateContext, txn kv.Transaction,

key := t.RecordKey(h)
tc, ec := evalCtx.TypeCtx(), evalCtx.ErrCtx()
err = encodeRowBuffer.WriteMemBufferEncoded(sctx.GetRowEncodingConfig(), tc.Location(), ec, memBuffer, key)
err = encodeRowBuffer.WriteMemBufferEncoded(sctx.GetRowEncodingConfig(), tc.Location(), ec, memBuffer, key, h)
if err != nil {
return err
}
Expand Down Expand Up @@ -886,7 +886,7 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, txn kv.Transaction, r
}
}

err = encodeRowBuffer.WriteMemBufferEncoded(sctx.GetRowEncodingConfig(), tc.Location(), ec, memBuffer, key, flags...)
err = encodeRowBuffer.WriteMemBufferEncoded(sctx.GetRowEncodingConfig(), tc.Location(), ec, memBuffer, key, recordID, flags...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/table/tblctx/buffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func (b *EncodeRowBuffer) AddColVal(colID int64, val types.Datum) {
// WriteMemBufferEncoded writes the encoded row to the memBuffer.
func (b *EncodeRowBuffer) WriteMemBufferEncoded(
cfg RowEncodingConfig, loc *time.Location, ec errctx.Context,
memBuffer kv.MemBuffer, key kv.Key, flags ...kv.FlagsOp,
memBuffer kv.MemBuffer, key kv.Key, handle kv.Handle, flags ...kv.FlagsOp,
) error {
var checksum rowcodec.Checksum
if cfg.IsRowLevelChecksumEnabled {
checksum = rowcodec.RawChecksum{Key: key}
checksum = rowcodec.RawChecksum{Handle: handle}
}

stmtBufs := b.writeStmtBufs
Expand Down
6 changes: 3 additions & 3 deletions pkg/table/tblctx/buffers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestEncodeRow(t *testing.T) {

var checksum rowcodec.Checksum
if cfg.IsRowLevelChecksumEnabled {
checksum = rowcodec.RawChecksum{Key: kv.Key("key1")}
checksum = rowcodec.RawChecksum{Handle: kv.IntHandle(1)}
}

expectedVal, err := tablecodec.EncodeRow(
Expand All @@ -125,7 +125,7 @@ func TestEncodeRow(t *testing.T) {
}
err = buffer.WriteMemBufferEncoded(
cfg, c.loc, errctx.StrictNoWarningContext,
memBuffer, kv.Key("key1"), c.flags...,
memBuffer, kv.Key("key1"), kv.IntHandle(1), c.flags...,
)
require.NoError(t, err)
memBuffer.AssertExpectations(t)
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestEncodeBufferReserve(t *testing.T) {
require.Equal(t, 2, len(buffer.row))
require.NoError(t, buffer.WriteMemBufferEncoded(RowEncodingConfig{
RowEncoder: &rowcodec.Encoder{Enable: true},
}, time.UTC, errctx.StrictNoWarningContext, mb, kv.Key("key1")))
}, time.UTC, errctx.StrictNoWarningContext, mb, kv.Key("key1"), kv.IntHandle(1)))
encodedCap := cap(buffer.writeStmtBufs.RowValBuf)
require.Greater(t, encodedCap, 0)
require.Equal(t, 4, len(buffer.writeStmtBufs.AddRowValues))
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/rowcodec/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -242,7 +243,7 @@ const checksumVersionRaw byte = 1

// RawChecksum indicates encode the raw bytes checksum and append it to the raw bytes.
type RawChecksum struct {
Key []byte
Handle kv.Handle
}

func (c RawChecksum) encode(encoder *Encoder, buf []byte) ([]byte, error) {
Expand All @@ -253,7 +254,7 @@ func (c RawChecksum) encode(encoder *Encoder, buf []byte) ([]byte, error) {
valueBytes := encoder.toBytes(buf)
valueBytes = append(valueBytes, encoder.checksumHeader)
encoder.checksum1 = crc32.Checksum(valueBytes, crc32.IEEETable)
encoder.checksum1 = crc32.Update(encoder.checksum1, crc32.IEEETable, c.Key)
encoder.checksum1 = crc32.Update(encoder.checksum1, crc32.IEEETable, c.Handle.Encoded())
valueBytes = binary.LittleEndian.AppendUint32(valueBytes, encoder.checksum1)
return valueBytes, nil
}
4 changes: 2 additions & 2 deletions pkg/util/rowcodec/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (r *row) initOffsets32() {
// CalculateRawChecksum calculates the bytes-level checksum by using the given elements.
// this is mainly used by the TiCDC to implement E2E checksum functionality.
func (r *row) CalculateRawChecksum(
loc *time.Location, colIDs []int64, values []*types.Datum, key kv.Key, buf []byte,
loc *time.Location, colIDs []int64, values []*types.Datum, handle kv.Handle, buf []byte,
) (uint32, error) {
r.flags |= rowFlagChecksum
r.checksumHeader &^= checksumFlagExtra // revert extra checksum flag
Expand All @@ -325,6 +325,6 @@ func (r *row) CalculateRawChecksum(
buf = r.toBytes(buf)
buf = append(buf, r.checksumHeader)
rawChecksum := crc32.Checksum(buf, crc32.IEEETable)
rawChecksum = crc32.Update(rawChecksum, crc32.IEEETable, key)
rawChecksum = crc32.Update(rawChecksum, crc32.IEEETable, handle.Encoded())
return rawChecksum, nil
}
2 changes: 1 addition & 1 deletion pkg/util/rowcodec/rowcodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,7 @@ func TestEncodeDecodeRowWithChecksum(t *testing.T) {
require.Zero(t, checksum)

rawChecksum := rowcodec.RawChecksum{
Key: []byte("0x1"),
Handle: kv.IntHandle(1),
}
raw, err = enc.Encode(time.UTC, nil, nil, rawChecksum, nil)
require.NoError(t, err)
Expand Down

0 comments on commit d39b71b

Please sign in to comment.