Skip to content

Commit

Permalink
server: fix decode issue for prefetch point plan index keys (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 18, 2024
1 parent 3506917 commit e456f72
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ go_library(
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/printer",
"//pkg/util/resourcegrouptag",
"//pkg/util/sqlexec",
"//pkg/util/sys/linux",
"//pkg/util/timeutil",
Expand All @@ -109,6 +110,7 @@ go_library(
"@com_github_soheilhy_cmux//:cmux",
"@com_github_stretchr_testify//require",
"@com_github_tiancaiamao_appdash//traceapp",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_sourcegraph_sourcegraph_appdash_data//:appdash-data",
"@org_golang_google_grpc//:grpc",
Expand Down
32 changes: 29 additions & 3 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,13 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/resourcegrouptag"
tlsutil "github.com/pingcap/tidb/pkg/util/tls"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1768,7 +1771,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
cc.ctx.GetSessionVars().InMultiStmts = true

// Only pre-build point plans for multi-statement query
pointPlans, err = cc.prefetchPointPlanKeys(ctx, stmts)
pointPlans, err = cc.prefetchPointPlanKeys(ctx, stmts, sql)
if err != nil {
for _, stmt := range stmts {
cc.onExtensionStmtEnd(stmt, false, err)
Expand Down Expand Up @@ -1844,7 +1847,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
// prefetchPointPlanKeys extracts the point keys in multi-statement query,
// use BatchGet to get the keys, so the values will be cached in the snapshot cache, save RPC call cost.
// For pessimistic transaction, the keys will be batch locked.
func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.StmtNode) ([]plannercore.Plan, error) {
func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.StmtNode, sqls string) ([]plannercore.Plan, error) {
txn, err := cc.ctx.Txn(false)
if err != nil {
return nil, err
Expand All @@ -1868,6 +1871,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
pointPlans := make([]plannercore.Plan, len(stmts))
var idxKeys []kv.Key //nolint: prealloc
var rowKeys []kv.Key //nolint: prealloc
isCommonHandle := make(map[string]bool, 0)

handlePlan := func(p plannercore.PhysicalPlan, resetStmtCtxFn func()) error {
var tableID int64
Expand All @@ -1885,6 +1889,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return err1
}
idxKeys = append(idxKeys, idxKey)
isCommonHandle[string(hack.String(idxKey))] = v.TblInfo.IsCommonHandle
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(tableID, v.Handle))
}
Expand All @@ -1907,6 +1912,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return err1
}
idxKeys = append(idxKeys, idxKey)
isCommonHandle[string(hack.String(idxKey))] = v.TblInfo.IsCommonHandle
}
} else {
for i, handle := range v.Handles {
Expand Down Expand Up @@ -1971,12 +1977,14 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}
snapshot := txn.GetSnapshot()
setResourceGroupTaggerForMultiStmtPrefetch(snapshot, sqls)
idxVals, err1 := snapshot.BatchGet(ctx, idxKeys)
if err1 != nil {
return nil, err1
}
for idxKey, idxVal := range idxVals {
h, err2 := tablecodec.DecodeHandleInUniqueIndexValue(idxVal, false)
isCommonHd := isCommonHandle[idxKey]
h, err2 := tablecodec.DecodeHandleInUniqueIndexValue(idxVal, isCommonHd)
if err2 != nil {
return nil, err2
}
Expand All @@ -2000,6 +2008,24 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}

func setResourceGroupTaggerForMultiStmtPrefetch(snapshot kv.Snapshot, sqls string) {
if !topsqlstate.TopSQLEnabled() {
return
}
normalized, digest := parser.NormalizeDigest(sqls)
topsql.AttachAndRegisterSQLInfo(context.Background(), normalized, digest, false)
snapshot.SetOption(kv.ResourceGroupTagger, tikvrpc.ResourceGroupTagger(func(req *tikvrpc.Request) {
if req == nil {
return
}
if len(normalized) == 0 {
return
}
req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, nil,
resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req)))
}))
}

// The first return value indicates whether the call of handleStmt has no side effect and can be retried.
// Currently, the first return value is used to fall back to TiKV when TiFlash is down.
func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2075,6 +2075,18 @@ func (cli *TestServerClient) RunTestMultiStatements(t *testing.T) {
// the create table + drop table statements will return errors.
dbt.MustExec("CREATE DATABASE multistmtuse")
dbt.MustExec("use multistmtuse; create table if not exists t1 (id int); drop table t1;")

// Test issue #50012
dbt.MustExec("create database if not exists test;")
dbt.MustExec("use test;")
dbt.MustExec("CREATE TABLE t (a bigint(20), b int(10), PRIMARY KEY (b, a), UNIQUE KEY uk_a (a));")
dbt.MustExec("insert into t values (1, 1);")
dbt.MustExec("begin;")
rs := dbt.MustQuery("delete from t where a = 1; select 1;")
rs.Close()
rs = dbt.MustQuery("update t set b = 2 where a = 1; select 1;")
rs.Close()
dbt.MustExec("commit;")
})
}

Expand Down

0 comments on commit e456f72

Please sign in to comment.