Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: reimplement row level checksum utilities #43141

Merged
merged 3 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions util/rowcodec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
flaky = True,
deps = [
"//kv",
"//parser/model",
"//parser/mysql",
"//sessionctx/stmtctx",
"//tablecodec",
Expand Down
21 changes: 21 additions & 0 deletions util/rowcodec/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -28,6 +29,26 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
)

func BenchmarkChecksum(b *testing.B) {
b.ReportAllocs()
datums := types.MakeDatums(1, "abc", 1.1)
tp1 := types.NewFieldType(mysql.TypeLong)
tp2 := types.NewFieldType(mysql.TypeVarchar)
tp3 := types.NewFieldType(mysql.TypeDouble)
cols := []rowcodec.ColData{
{&model.ColumnInfo{ID: 1, FieldType: *tp1}, &datums[0]},
{&model.ColumnInfo{ID: 2, FieldType: *tp2}, &datums[1]},
{&model.ColumnInfo{ID: 3, FieldType: *tp3}, &datums[2]},
}
row := rowcodec.RowData{Cols: cols}
for i := 0; i < b.N; i++ {
_, err := row.Checksum()
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkEncode(b *testing.B) {
b.ReportAllocs()
oldRow := types.MakeDatums(1, "abc", 1.1)
Expand Down
114 changes: 114 additions & 0 deletions util/rowcodec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package rowcodec

import (
"encoding/binary"
"hash/crc32"
"math"
"reflect"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
data "github.com/pingcap/tidb/types"
)

// CodecVer is the constant number that represent the new row format.
Expand All @@ -30,6 +34,7 @@ const CodecVer = 128
var (
errInvalidCodecVer = errors.New("invalid codec version")
errInvalidChecksumVer = errors.New("invalid checksum version")
errInvalidChecksumTyp = errors.New("invalid type for checksum")
)

// First byte in the encoded value which specifies the encoding type.
Expand Down Expand Up @@ -240,3 +245,112 @@ func IsNewFormat(rowData []byte) bool {
func FieldTypeFromModelColumn(col *model.ColumnInfo) *types.FieldType {
return col.FieldType.Clone()
}

// ColData combines the column info as well as its datum. It's used to calculate checksum.
type ColData struct {
*model.ColumnInfo
Datum *data.Datum
}

// Encode encodes the column datum into bytes for checksum. If buf provided, append encoded data to it.
func (c ColData) Encode(buf []byte) ([]byte, error) {
return appendDatumForChecksum(buf, c.Datum, c.GetType())
}

// RowData is a list of ColData for row checksum calculation.
type RowData struct {
// Cols is a list of ColData which is expected to be sorted by id before calling Encode/Checksum.
Cols []ColData
// Data stores the result of Encode. However, it mostly acts as a buffer for encoding columns on checksum
// calculation.
Data []byte
}

// Len implements sort.Interface for RowData.
func (r RowData) Len() int { return len(r.Cols) }

// Less implements sort.Interface for RowData.
func (r RowData) Less(i int, j int) bool { return r.Cols[i].ID < r.Cols[j].ID }

// Swap implements sort.Interface for RowData.
func (r RowData) Swap(i int, j int) { r.Cols[i], r.Cols[j] = r.Cols[j], r.Cols[i] }
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved

// Encode encodes all columns into bytes (for test purpose).
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
func (r *RowData) Encode() ([]byte, error) {
var err error
if len(r.Data) > 0 {
r.Data = r.Data[:0]
}
for _, col := range r.Cols {
r.Data, err = col.Encode(r.Data)
if err != nil {
return nil, err
}
}
return r.Data, nil
}

// Checksum calculates the checksum of columns. Callers should make sure columns are sorted by id.
func (r *RowData) Checksum() (checksum uint32, err error) {
for _, col := range r.Cols {
if len(r.Data) > 0 {
r.Data = r.Data[:0]
}
r.Data, err = col.Encode(r.Data)
if err != nil {
return 0, err
}
checksum = crc32.Update(checksum, crc32.IEEETable, r.Data)
}
return checksum, nil
}

func appendDatumForChecksum(buf []byte, dat *data.Datum, typ byte) (out []byte, err error) {
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
if x := recover(); x != nil {
// catch panic when datum and type mismatch
err = errors.Annotate(x.(error), "encode datum for checksum")
}
}()
if dat.IsNull() {
return buf, nil
}
switch typ {
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear:
out = binary.LittleEndian.AppendUint64(buf, dat.GetUint64())
case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
out = appendLengthValue(buf, dat.GetBytes())
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDate, mysql.TypeNewDate:
out = appendLengthValue(buf, []byte(dat.GetMysqlTime().String()))
case mysql.TypeDuration:
out = appendLengthValue(buf, []byte(dat.GetMysqlDuration().String()))
case mysql.TypeFloat, mysql.TypeDouble:
v := dat.GetFloat64()
if math.IsInf(v, 0) || math.IsNaN(v) {
v = 0 // because ticdc has such a transform
}
out = binary.LittleEndian.AppendUint64(buf, math.Float64bits(v))
case mysql.TypeNewDecimal:
out = appendLengthValue(buf, []byte(dat.GetMysqlDecimal().String()))
case mysql.TypeEnum:
out = binary.LittleEndian.AppendUint64(buf, dat.GetMysqlEnum().Value)
case mysql.TypeSet:
out = binary.LittleEndian.AppendUint64(buf, dat.GetMysqlSet().Value)
case mysql.TypeBit:
// ticdc transforms a bit value as the following way, no need to handle truncate error here.
v, _ := dat.GetBinaryLiteral().ToInt(nil)
out = binary.LittleEndian.AppendUint64(buf, v)
case mysql.TypeJSON:
out = appendLengthValue(buf, []byte(dat.GetMysqlJSON().String()))
case mysql.TypeNull, mysql.TypeGeometry:
out = buf
default:
return buf, errInvalidChecksumTyp
}
return
}

func appendLengthValue(buf []byte, val []byte) []byte {
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(val)))
return append(buf, val...)
}
47 changes: 4 additions & 43 deletions util/rowcodec/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,52 +34,19 @@ type Encoder struct {
values []*types.Datum
// Enable indicates whether this encoder should be use.
Enable bool
// WithChecksum indicates whether to append checksum to the encoded row data.
WithChecksum bool
}

// Encode encodes a row from a datums slice.
func (encoder *Encoder) Encode(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, buf []byte) ([]byte, error) {
func (encoder *Encoder) Encode(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, buf []byte, checksums ...uint32) ([]byte, error) {
encoder.reset()
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return nil, err
}
return encoder.row.toBytes(buf[:0]), nil
}

// EncodeWithExtraChecksum likes Encode but also appends an extra checksum if checksum is enabled.
func (encoder *Encoder) EncodeWithExtraChecksum(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, checksum uint32, buf []byte) ([]byte, error) {
encoder.reset()
if encoder.hasChecksum() {
encoder.setExtraChecksum(checksum)
}
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return nil, err
}
return encoder.row.toBytes(buf[:0]), nil
}

// Checksum caclulates the checksum of datumns.
func (encoder *Encoder) Checksum(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum) (uint32, error) {
encoder.reset()
encoder.flags |= rowFlagChecksum
err := encoder.encodeDatums(sc, colIDs, values)
if err != nil {
return 0, err
}
return encoder.checksum1, nil
}

func (encoder *Encoder) encodeDatums(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum) error {
encoder.appendColVals(colIDs, values)
numCols, notNullIdx := encoder.reformatCols()
err := encoder.encodeRowCols(sc, numCols, notNullIdx)
if err != nil {
return err
return nil, err
}
return nil
encoder.setChecksums(checksums...)
return encoder.row.toBytes(buf[:0]), nil
}

func (encoder *Encoder) reset() {
Expand All @@ -94,9 +61,6 @@ func (encoder *Encoder) reset() {
encoder.checksumHeader = 0
encoder.checksum1 = 0
encoder.checksum2 = 0
if encoder.WithChecksum {
encoder.flags |= rowFlagChecksum
}
}

func (encoder *Encoder) appendColVals(colIDs []int64, values []types.Datum) {
Expand Down Expand Up @@ -193,9 +157,6 @@ func (encoder *Encoder) encodeRowCols(sc *stmtctx.StatementContext, numCols, not
r.offsets[i] = uint16(len(r.data))
}
}
if r.hasChecksum() {
return r.calcChecksum()
}
return nil
}

Expand Down
27 changes: 13 additions & 14 deletions util/rowcodec/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package rowcodec

import (
"encoding/binary"
"hash/crc32"
)

const (
Expand Down Expand Up @@ -90,19 +89,15 @@ func (r *row) hasChecksum() bool { return r.flags&rowFlagChecksum > 0 }

func (r *row) hasExtraChecksum() bool { return r.checksumHeader&checksumFlagExtra > 0 }

func (r *row) checksumVersion() int { return int(r.checksumHeader & checksumMaskVersion) }

func (r *row) calcChecksum() error {
if r.checksumVersion() != 0 {
return errInvalidChecksumVer
func (r *row) setChecksums(checksums ...uint32) {
if len(checksums) > 0 {
r.flags |= rowFlagChecksum
r.checksum1 = checksums[0]
if len(checksums) > 1 {
r.checksumHeader |= checksumFlagExtra
r.checksum2 = checksums[1]
}
}
r.checksum1 = crc32.ChecksumIEEE(r.data)
return nil
}

func (r *row) setExtraChecksum(v uint32) {
r.checksumHeader |= checksumFlagExtra
r.checksum2 = v
}

func (r *row) getData(i int) []byte {
Expand Down Expand Up @@ -156,7 +151,7 @@ func (r *row) fromBytes(rowData []byte) error {

if r.hasChecksum() {
r.checksumHeader = rowData[cursor]
if r.checksumVersion() != 0 {
if r.ChecksumVersion() != 0 {
return errInvalidChecksumVer
}
cursor++
Expand Down Expand Up @@ -244,6 +239,10 @@ func (r *row) findColID(colID int64) (idx int, isNil, notFound bool) {
return
}

// ChecksumVersion returns the version of checksum. Note that it's valid only if checksum has been encoded in the row
// value (callers can check it by `GetChecksum`).
func (r *row) ChecksumVersion() int { return int(r.checksumHeader & checksumMaskVersion) }

// GetChecksum returns the checksum of row data (not null columns).
func (r *row) GetChecksum() (uint32, bool) {
if !r.hasChecksum() {
Expand Down
Loading