Skip to content

Commit

Permalink
util: reimplement row level checksum utilities (#43141)
Browse files Browse the repository at this point in the history
ref #42747
  • Loading branch information
zyguan committed Apr 18, 2023
1 parent 6d6c607 commit 47e7432
Show file tree
Hide file tree
Showing 6 changed files with 535 additions and 175 deletions.
1 change: 1 addition & 0 deletions util/rowcodec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
flaky = True,
deps = [
"//kv",
"//parser/model",
"//parser/mysql",
"//sessionctx/stmtctx",
"//tablecodec",
Expand Down
21 changes: 21 additions & 0 deletions util/rowcodec/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -28,6 +29,26 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
)

func BenchmarkChecksum(b *testing.B) {
b.ReportAllocs()
datums := types.MakeDatums(1, "abc", 1.1)
tp1 := types.NewFieldType(mysql.TypeLong)
tp2 := types.NewFieldType(mysql.TypeVarchar)
tp3 := types.NewFieldType(mysql.TypeDouble)
cols := []rowcodec.ColData{
{&model.ColumnInfo{ID: 1, FieldType: *tp1}, &datums[0]},
{&model.ColumnInfo{ID: 2, FieldType: *tp2}, &datums[1]},
{&model.ColumnInfo{ID: 3, FieldType: *tp3}, &datums[2]},
}
row := rowcodec.RowData{Cols: cols}
for i := 0; i < b.N; i++ {
_, err := row.Checksum()
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkEncode(b *testing.B) {
b.ReportAllocs()
oldRow := types.MakeDatums(1, "abc", 1.1)
Expand Down
114 changes: 114 additions & 0 deletions util/rowcodec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package rowcodec

import (
"encoding/binary"
"hash/crc32"
"math"
"reflect"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
data "github.com/pingcap/tidb/types"
)

// CodecVer is the constant number that represent the new row format.
Expand All @@ -30,6 +34,7 @@ const CodecVer = 128
var (
errInvalidCodecVer = errors.New("invalid codec version")
errInvalidChecksumVer = errors.New("invalid checksum version")
errInvalidChecksumTyp = errors.New("invalid type for checksum")
)

// First byte in the encoded value which specifies the encoding type.
Expand Down Expand Up @@ -240,3 +245,112 @@ func IsNewFormat(rowData []byte) bool {
func FieldTypeFromModelColumn(col *model.ColumnInfo) *types.FieldType {
return col.FieldType.Clone()
}

// ColData combines the column info as well as its datum. It's used to calculate checksum.
type ColData struct {
*model.ColumnInfo
Datum *data.Datum
}

// Encode encodes the column datum into bytes for checksum. If buf provided, append encoded data to it.
func (c ColData) Encode(buf []byte) ([]byte, error) {
return appendDatumForChecksum(buf, c.Datum, c.GetType())
}

// RowData is a list of ColData for row checksum calculation.
type RowData struct {
// Cols is a list of ColData which is expected to be sorted by id before calling Encode/Checksum.
Cols []ColData
// Data stores the result of Encode. However, it mostly acts as a buffer for encoding columns on checksum
// calculation.
Data []byte
}

// Len implements sort.Interface for RowData.
func (r RowData) Len() int { return len(r.Cols) }

// Less implements sort.Interface for RowData.
func (r RowData) Less(i int, j int) bool { return r.Cols[i].ID < r.Cols[j].ID }

// Swap implements sort.Interface for RowData.
func (r RowData) Swap(i int, j int) { r.Cols[i], r.Cols[j] = r.Cols[j], r.Cols[i] }

// Encode encodes all columns into bytes (for test purpose).
func (r *RowData) Encode() ([]byte, error) {
var err error
if len(r.Data) > 0 {
r.Data = r.Data[:0]
}
for _, col := range r.Cols {
r.Data, err = col.Encode(r.Data)
if err != nil {
return nil, err
}
}
return r.Data, nil
}

// Checksum calculates the checksum of columns. Callers should make sure columns are sorted by id.
func (r *RowData) Checksum() (checksum uint32, err error) {
for _, col := range r.Cols {
if len(r.Data) > 0 {
r.Data = r.Data[:0]
}
r.Data, err = col.Encode(r.Data)
if err != nil {
return 0, err
}
checksum = crc32.Update(checksum, crc32.IEEETable, r.Data)
}
return checksum, nil
}

func appendDatumForChecksum(buf []byte, dat *data.Datum, typ byte) (out []byte, err error) {
defer func() {
if x := recover(); x != nil {
// catch panic when datum and type mismatch
err = errors.Annotate(x.(error), "encode datum for checksum")
}
}()
if dat.IsNull() {
return buf, nil
}
switch typ {
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear:
out = binary.LittleEndian.AppendUint64(buf, dat.GetUint64())
case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
out = appendLengthValue(buf, dat.GetBytes())
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDate, mysql.TypeNewDate:
out = appendLengthValue(buf, []byte(dat.GetMysqlTime().String()))
case mysql.TypeDuration:
out = appendLengthValue(buf, []byte(dat.GetMysqlDuration().String()))
case mysql.TypeFloat, mysql.TypeDouble:
v := dat.GetFloat64()
if math.IsInf(v, 0) || math.IsNaN(v) {
v = 0 // because ticdc has such a transform
}
out = binary.LittleEndian.AppendUint64(buf, math.Float64bits(v))
case mysql.TypeNewDecimal:
out = appendLengthValue(buf, []byte(dat.GetMysqlDecimal().String()))
case mysql.TypeEnum:
out = binary.LittleEndian.AppendUint64(buf, dat.GetMysqlEnum().Value)
case mysql.TypeSet:
out = binary.LittleEndian.AppendUint64(buf, dat.GetMysqlSet().Value)
case mysql.TypeBit:
// ticdc transforms a bit value as the following way, no need to handle truncate error here.
v, _ := dat.GetBinaryLiteral().ToInt(nil)
out = binary.LittleEndian.AppendUint64(buf, v)
case mysql.TypeJSON:
out = appendLengthValue(buf, []byte(dat.GetMysqlJSON().String()))
case mysql.TypeNull, mysql.TypeGeometry:
out = buf
default:
return buf, errInvalidChecksumTyp
}
return
}

func appendLengthValue(buf []byte, val []byte) []byte {
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(val)))
return append(buf, val...)
}
47 changes: 4 additions & 43 deletions util/rowcodec/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,52 +34,19 @@ type Encoder struct {
values []*types.Datum
// Enable indicates whether this encoder should be use.
Enable bool
// WithChecksum indicates whether to append checksum to the encoded row data.
WithChecksum bool
}

// Encode encodes a row from a datums slice.
func (encoder *Encoder) Encode(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, buf []byte) ([]byte, error) {
func (encoder *Encoder) Encode(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, buf []byte, checksums ...uint32) ([]byte, error) {
encoder.reset()
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return nil, err
}
return encoder.row.toBytes(buf[:0]), nil
}

// EncodeWithExtraChecksum likes Encode but also appends an extra checksum if checksum is enabled.
func (encoder *Encoder) EncodeWithExtraChecksum(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, checksum uint32, buf []byte) ([]byte, error) {
encoder.reset()
if encoder.hasChecksum() {
encoder.setExtraChecksum(checksum)
}
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return nil, err
}
return encoder.row.toBytes(buf[:0]), nil
}

// Checksum caclulates the checksum of datumns.
func (encoder *Encoder) Checksum(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum) (uint32, error) {
encoder.reset()
encoder.flags |= rowFlagChecksum
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return 0, err
}
return encoder.checksum1, nil
}

func (encoder *Encoder) encodeDatums(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum) error {
encoder.appendColVals(colIDs, values)
numCols, notNullIdx := encoder.reformatCols()
err := encoder.encodeRowCols(sc, numCols, notNullIdx)
if err != nil {
return err
return nil, err
}
return nil
encoder.setChecksums(checksums...)
return encoder.row.toBytes(buf[:0]), nil
}

func (encoder *Encoder) reset() {
Expand All @@ -94,9 +61,6 @@ func (encoder *Encoder) reset() {
encoder.checksumHeader = 0
encoder.checksum1 = 0
encoder.checksum2 = 0
if encoder.WithChecksum {
encoder.flags |= rowFlagChecksum
}
}

func (encoder *Encoder) appendColVals(colIDs []int64, values []types.Datum) {
Expand Down Expand Up @@ -193,9 +157,6 @@ func (encoder *Encoder) encodeRowCols(sc *stmtctx.StatementContext, numCols, not
r.offsets[i] = uint16(len(r.data))
}
}
if r.hasChecksum() {
return r.calcChecksum()
}
return nil
}

Expand Down
27 changes: 13 additions & 14 deletions util/rowcodec/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package rowcodec

import (
"encoding/binary"
"hash/crc32"
)

const (
Expand Down Expand Up @@ -90,19 +89,15 @@ func (r *row) hasChecksum() bool { return r.flags&rowFlagChecksum > 0 }

func (r *row) hasExtraChecksum() bool { return r.checksumHeader&checksumFlagExtra > 0 }

func (r *row) checksumVersion() int { return int(r.checksumHeader & checksumMaskVersion) }

func (r *row) calcChecksum() error {
if r.checksumVersion() != 0 {
return errInvalidChecksumVer
func (r *row) setChecksums(checksums ...uint32) {
if len(checksums) > 0 {
r.flags |= rowFlagChecksum
r.checksum1 = checksums[0]
if len(checksums) > 1 {
r.checksumHeader |= checksumFlagExtra
r.checksum2 = checksums[1]
}
}
r.checksum1 = crc32.ChecksumIEEE(r.data)
return nil
}

func (r *row) setExtraChecksum(v uint32) {
r.checksumHeader |= checksumFlagExtra
r.checksum2 = v
}

func (r *row) getData(i int) []byte {
Expand Down Expand Up @@ -156,7 +151,7 @@ func (r *row) fromBytes(rowData []byte) error {

if r.hasChecksum() {
r.checksumHeader = rowData[cursor]
if r.checksumVersion() != 0 {
if r.ChecksumVersion() != 0 {
return errInvalidChecksumVer
}
cursor++
Expand Down Expand Up @@ -244,6 +239,10 @@ func (r *row) findColID(colID int64) (idx int, isNil, notFound bool) {
return
}

// ChecksumVersion returns the version of checksum. Note that it's valid only if checksum has been encoded in the row
// value (callers can check it by `GetChecksum`).
func (r *row) ChecksumVersion() int { return int(r.checksumHeader & checksumMaskVersion) }

// GetChecksum returns the checksum of row data (not null columns).
func (r *row) GetChecksum() (uint32, bool) {
if !r.hasChecksum() {
Expand Down
Loading

0 comments on commit 47e7432

Please sign in to comment.