Skip to content

Commit

Permalink
*: add tidb_row_checksum() as a builtin function (#43479)
Browse files Browse the repository at this point in the history
ref #42747
  • Loading branch information
zyguan committed May 5, 2023
1 parent 8728abc commit cb609bb
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 1 deletion.
2 changes: 1 addition & 1 deletion executor/showtest/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1524,7 +1524,7 @@ func TestShowBuiltin(t *testing.T) {
res := tk.MustQuery("show builtins;")
require.NotNil(t, res)
rows := res.Rows()
const builtinFuncNum = 287
const builtinFuncNum = 288
require.Equal(t, builtinFuncNum, len(rows))
require.Equal(t, rows[0][0].(string), "abs")
require.Equal(t, rows[builtinFuncNum-1][0].(string), "yearweek")
Expand Down
1 change: 1 addition & 0 deletions expression/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ var funcs = map[string]functionClass{
ast.UUIDToBin: &uuidToBinFunctionClass{baseFunctionClass{ast.UUIDToBin, 1, 2}},
ast.BinToUUID: &binToUUIDFunctionClass{baseFunctionClass{ast.BinToUUID, 1, 2}},
ast.TiDBShard: &tidbShardFunctionClass{baseFunctionClass{ast.TiDBShard, 1, 1}},
ast.TiDBRowChecksum: &tidbRowChecksumFunctionClass{baseFunctionClass{ast.TiDBRowChecksum, 0, 0}},

ast.GetLock: &lockFunctionClass{baseFunctionClass{ast.GetLock, 2, 2}},
ast.ReleaseLock: &releaseLockFunctionClass{baseFunctionClass{ast.ReleaseLock, 1, 1}},
Expand Down
8 changes: 8 additions & 0 deletions expression/builtin_miscellaneous.go
Original file line number Diff line number Diff line change
Expand Up @@ -1467,3 +1467,11 @@ func (b *builtinTidbShardSig) evalInt(row chunk.Row) (int64, bool, error) {
hashed = hashed % tidbShardBucketCount
return int64(hashed), false, nil
}

type tidbRowChecksumFunctionClass struct {
baseFunctionClass
}

func (c *tidbRowChecksumFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) {
return nil, ErrNotSupportedYet.GenWithStack("FUNCTION tidb_row_checksum can only be used as a select field in a fast point plan")
}
1 change: 1 addition & 0 deletions expression/integration_serial_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_test(
shard_count = 50,
deps = [
"//config",
"//expression",
"//parser/mysql",
"//parser/terror",
"//planner/core",
Expand Down
57 changes: 57 additions & 0 deletions expression/integration_serial_test/integration_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ package integration_serial_test

import (
"context"
"encoding/binary"
"fmt"
"hash/crc32"
"math"
"strings"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -4414,3 +4417,57 @@ func TestPartitionPruningRelaxOP(t *testing.T) {
tk.MustQuery("SELECT COUNT(*) FROM t1 WHERE d < '2018-01-01'").Check(testkit.Rows("6"))
tk.MustQuery("SELECT COUNT(*) FROM t1 WHERE d > '2018-01-01'").Check(testkit.Rows("12"))
}

func TestTiDBRowChecksumBuiltin(t *testing.T) {
store := testkit.CreateMockStore(t)

checksum := func(cols ...interface{}) uint32 {
buf := make([]byte, 0, 64)
for _, col := range cols {
switch x := col.(type) {
case int:
buf = binary.LittleEndian.AppendUint64(buf, uint64(x))
case string:
buf = binary.LittleEndian.AppendUint32(buf, uint32(len(x)))
buf = append(buf, []byte(x)...)
}
}
return crc32.ChecksumIEEE(buf)
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("create table t (id int primary key, c int)")

// row with 2 checksums
tk.MustExec("insert into t values (1, 10)")
tk.MustExec("alter table t change column c c varchar(10)")
checksum1 := fmt.Sprintf("%d,%d", checksum(1, 10), checksum(1, "10"))
// row with 1 checksum
tk.Session().GetSessionVars().EnableRowLevelChecksum = true
tk.MustExec("insert into t values (2, '20')")
checksum2 := fmt.Sprintf("%d", checksum(2, "20"))
// row without checksum
tk.Session().GetSessionVars().EnableRowLevelChecksum = false
tk.MustExec("insert into t values (3, '30')")
checksum3 := "<nil>"

// fast point-get
tk.MustQuery("select tidb_row_checksum() from t where id = 1").Check(testkit.Rows(checksum1))
tk.MustQuery("select tidb_row_checksum() from t where id = 2").Check(testkit.Rows(checksum2))
tk.MustQuery("select tidb_row_checksum() from t where id = 3").Check(testkit.Rows(checksum3))
// fast batch-point-get
tk.MustQuery("select tidb_row_checksum() from t where id in (1, 2, 3)").Check(testkit.Rows(checksum1, checksum2, checksum3))

// non-fast point-get
tk.MustGetDBError("select length(tidb_row_checksum()) from t where id = 1", expression.ErrNotSupportedYet)
tk.MustGetDBError("select c from t where id = 1 and tidb_row_checksum() is not null", expression.ErrNotSupportedYet)
// non-fast batch-point-get
tk.MustGetDBError("select length(tidb_row_checksum()) from t where id in (1, 2, 3)", expression.ErrNotSupportedYet)
tk.MustGetDBError("select c from t where id in (1, 2, 3) and tidb_row_checksum() is not null", expression.ErrNotSupportedYet)

// other plans
tk.MustGetDBError("select tidb_row_checksum() from t", expression.ErrNotSupportedYet)
tk.MustGetDBError("select tidb_row_checksum() from t where id > 0", expression.ErrNotSupportedYet)
}
1 change: 1 addition & 0 deletions parser/ast/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ const (
BinToUUID = "bin_to_uuid"
VitessHash = "vitess_hash"
TiDBShard = "tidb_shard"
TiDBRowChecksum = "tidb_row_checksum"
GetLock = "get_lock"
ReleaseLock = "release_lock"

Expand Down
3 changes: 3 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ const ExtraPidColID = -2
// Must be after ExtraPidColID!
const ExtraPhysTblID = -3

// ExtraRowChecksumID is the column ID of column which holds the row checksum info.
const ExtraRowChecksumID = -4

const (
// TableInfoVersion0 means the table info version is 0.
// Upgrade from v2.1.1 or v2.1.2 to v2.1.3 and later, and then execute a "change/modify column" statement
Expand Down
37 changes: 37 additions & 0 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,11 @@ func buildSchemaFromFields(
}
continue
}
if name, column, ok := tryExtractRowChecksumColumn(field, len(columns)); ok {
names = append(names, name)
columns = append(columns, column)
continue
}
colNameExpr, ok := field.Expr.(*ast.ColumnNameExpr)
if !ok {
return nil, nil
Expand Down Expand Up @@ -1305,6 +1310,38 @@ func buildSchemaFromFields(
return schema, names
}

func tryExtractRowChecksumColumn(field *ast.SelectField, idx int) (*types.FieldName, *expression.Column, bool) {
f, ok := field.Expr.(*ast.FuncCallExpr)
if !ok || f.FnName.L != ast.TiDBRowChecksum || len(f.Args) != 0 {
return nil, nil, false
}
origName := f.FnName
origName.L += "()"
origName.O += "()"
asName := origName
if field.AsName.L != "" {
asName = field.AsName
}
cs, cl := types.DefaultCharsetForType(mysql.TypeString)
ftype := ptypes.NewFieldType(mysql.TypeString)
ftype.SetCharset(cs)
ftype.SetCollate(cl)
ftype.SetFlen(mysql.MaxBlobWidth)
ftype.SetDecimal(0)
name := &types.FieldName{
OrigColName: origName,
ColName: asName,
}
column := &expression.Column{
RetType: ftype,
ID: model.ExtraRowChecksumID,
UniqueID: model.ExtraRowChecksumID,
Index: idx,
OrigName: origName.L,
}
return name, column, true
}

// getSingleTableNameAndAlias return the ast node of queried table name and the alias string.
// `tblName` is `nil` if there are multiple tables in the query.
// `tblAlias` will be the real table name if there is no table alias in the query.
Expand Down
8 changes: 8 additions & 0 deletions util/rowcodec/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ func (decoder *ChunkDecoder) DecodeToChunk(rowData []byte, handle kv.Handle, chk
chk.AppendNull(colIdx)
continue
}
if col.ID == model.ExtraRowChecksumID {
if v := decoder.row.getChecksumInfo(); len(v) > 0 {
chk.AppendString(colIdx, v)
} else {
chk.AppendNull(colIdx)
}
continue
}

idx, isNil, notFound := decoder.row.findColID(col.ID)
if !notFound && !isNil {
Expand Down
12 changes: 12 additions & 0 deletions util/rowcodec/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package rowcodec

import (
"encoding/binary"
"strconv"
)

const (
Expand Down Expand Up @@ -100,6 +101,17 @@ func (r *row) setChecksums(checksums ...uint32) {
}
}

func (r *row) getChecksumInfo() string {
var s string
if r.hasChecksum() {
s = strconv.FormatUint(uint64(r.checksum1), 10)
if r.hasExtraChecksum() {
s += "," + strconv.FormatUint(uint64(r.checksum2), 10)
}
}
return s
}

func (r *row) getData(i int) []byte {
var start, end uint32
if r.large() {
Expand Down

0 comments on commit cb609bb

Please sign in to comment.