From 15947c1cf2f7facae3829fe7e172e5b43c7b554f Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 12 Mar 2024 17:06:39 +0800 Subject: [PATCH 1/5] logutil: adapt new redact functions (#51642) close pingcap/tidb#51310, close pingcap/tidb#51641 --- DEPS.bzl | 12 +-- br/pkg/redact/redact.go | 9 +- go.mod | 2 +- go.sum | 4 +- pkg/executor/set_test.go | 4 +- pkg/executor/test/admintest/BUILD.bazel | 1 + pkg/executor/test/admintest/admin_test.go | 17 ++-- pkg/server/conn.go | 2 +- pkg/server/conn_stmt.go | 2 +- pkg/session/nontransactional.go | 2 +- pkg/session/session.go | 10 +- pkg/sessionctx/variable/sysvar.go | 2 +- pkg/store/driver/txn/driver_test.go | 2 +- pkg/util/dbterror/terror_test.go | 2 +- pkg/util/logutil/consistency/BUILD.bazel | 1 + pkg/util/logutil/consistency/reporter.go | 109 +++++++++------------- pkg/util/redact/redact.go | 23 ++++- pkg/util/redact/redact_test.go | 11 ++- 18 files changed, 117 insertions(+), 98 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 90b2bebe419d3..98aa97dde172c 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5806,13 +5806,13 @@ def go_deps(): name = "com_github_pingcap_errors", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/errors", - sha256 = "b4db3d3c222d9039c84baacbbd9c46aa0346f3f04d2577a77475a64ecfefebf9", - strip_prefix = "github.com/pingcap/errors@v0.11.5-0.20231212100244-799fae176cfb", + sha256 = "0edb07dbd73a90f97e06e11e54b270d64d5cabe6142025682d840fe302087b23", + strip_prefix = "github.com/pingcap/errors@v0.11.5-0.20240311024730-e056997136bb", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20231212100244-799fae176cfb.zip", - "http://ats.apps.svc/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20231212100244-799fae176cfb.zip", - "https://cache.hawkingrei.com/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20231212100244-799fae176cfb.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20231212100244-799fae176cfb.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20240311024730-e056997136bb.zip", + "http://ats.apps.svc/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20240311024730-e056997136bb.zip", + "https://cache.hawkingrei.com/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20240311024730-e056997136bb.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/errors/com_github_pingcap_errors-v0.11.5-0.20240311024730-e056997136bb.zip", ], ) go_repository( diff --git a/br/pkg/redact/redact.go b/br/pkg/redact/redact.go index 495bceb08c44d..4b89c9c7c6ea9 100644 --- a/br/pkg/redact/redact.go +++ b/br/pkg/redact/redact.go @@ -11,12 +11,17 @@ import ( // InitRedact inits the enableRedactLog func InitRedact(redactLog bool) { - errors.RedactLogEnabled.Store(redactLog) + mode := errors.RedactLogDisable + if redactLog { + mode = errors.RedactLogEnable + } + errors.RedactLogEnabled.Store(mode) } // NeedRedact returns whether to redact log func NeedRedact() bool { - return errors.RedactLogEnabled.Load() + mode := errors.RedactLogEnabled.Load() + return mode != errors.RedactLogDisable && mode != "" } // String receives string argument and return omitted information if redact log enabled diff --git a/go.mod b/go.mod index fbe4c5c6d983d..d3e5679749026 100644 --- a/go.mod +++ b/go.mod @@ -81,7 +81,7 @@ require ( github.com/otiai10/copy v1.2.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d - github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb + github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/fn v1.0.0 github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f diff --git a/go.sum b/go.sum index 9fc9ab33941d8..6d9c66b155935 100644 --- a/go.sum +++ b/go.sum @@ -706,8 +706,8 @@ github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d/go.mod h1:p8QnkZn github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb h1:yqyP+k0mgRPpXJQDOCrtaG2YZym0ZDD+vt5JzlBUkrw= -github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= +github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb h1:3pSi4EDG6hg0orE1ndHkXvX6Qdq2cZn8gAPir8ymKZk= +github.com/pingcap/errors v0.11.5-0.20240311024730-e056997136bb/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g= diff --git a/pkg/executor/set_test.go b/pkg/executor/set_test.go index 69c8ee7b1399d..8532ddc776566 100644 --- a/pkg/executor/set_test.go +++ b/pkg/executor/set_test.go @@ -464,10 +464,10 @@ func TestSetVar(t *testing.T) { tk.MustQuery(`select @@session.tidb_redact_log;`).Check(testkit.Rows("ON")) tk.MustExec("set session tidb_redact_log = oFf") tk.MustQuery(`select @@session.tidb_redact_log;`).Check(testkit.Rows("OFF")) - tk.MustExec("set session tidb_redact_log = On") - tk.MustQuery(`select @@session.tidb_redact_log;`).Check(testkit.Rows("ON")) tk.MustExec("set session tidb_redact_log = marker") tk.MustQuery(`select @@session.tidb_redact_log;`).Check(testkit.Rows("MARKER")) + tk.MustExec("set session tidb_redact_log = On") + tk.MustQuery(`select @@session.tidb_redact_log;`).Check(testkit.Rows("ON")) tk.MustQuery("select @@tidb_dml_batch_size;").Check(testkit.Rows("0")) tk.MustExec("set @@session.tidb_dml_batch_size = 120") diff --git a/pkg/executor/test/admintest/BUILD.bazel b/pkg/executor/test/admintest/BUILD.bazel index fca843b147323..40cdf88fe1826 100644 --- a/pkg/executor/test/admintest/BUILD.bazel +++ b/pkg/executor/test/admintest/BUILD.bazel @@ -30,6 +30,7 @@ go_test( "//pkg/util/logutil", "//pkg/util/logutil/consistency", "//pkg/util/mock", + "//pkg/util/redact", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", diff --git a/pkg/executor/test/admintest/admin_test.go b/pkg/executor/test/admintest/admin_test.go index 52cac62fb5e5d..70bac0122b983 100644 --- a/pkg/executor/test/admintest/admin_test.go +++ b/pkg/executor/test/admintest/admin_test.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/logutil/consistency" "github.com/pingcap/tidb/pkg/util/mock" + "github.com/pingcap/tidb/pkg/util/redact" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -1055,6 +1056,8 @@ func TestCheckFailReport(t *testing.T) { store := testkit.CreateMockStore(t) tk := newInconsistencyKit(t, testkit.NewAsyncTestKit(t, store), newDefaultOpt()) + rmode := tk.sctx.GetSessionVars().EnableRedactNew + // row more than unique index func() { defer tk.rebuild() @@ -1072,7 +1075,7 @@ func TestCheckFailReport(t *testing.T) { hook.Logs[0].CheckField(t, zap.String("table_name", "admin_test"), zap.String("index_name", "uk1"), - zap.Stringer("row_id", kv.IntHandle(1)), + zap.Stringer("row_id", redact.Stringer(rmode, kv.IntHandle(1))), ) hook.Logs[0].CheckFieldNotEmpty(t, "row_mvcc") }() @@ -1094,7 +1097,7 @@ func TestCheckFailReport(t *testing.T) { hook.Logs[0].CheckField(t, zap.String("table_name", "admin_test"), zap.String("index_name", "k2"), - zap.Stringer("row_id", kv.IntHandle(1)), + zap.Stringer("row_id", redact.Stringer(rmode, kv.IntHandle(1))), ) hook.Logs[0].CheckFieldNotEmpty(t, "row_mvcc") }() @@ -1118,7 +1121,7 @@ func TestCheckFailReport(t *testing.T) { logEntry.CheckField(t, zap.String("table_name", "admin_test"), zap.String("index_name", "k2"), - zap.Stringer("row_id", kv.IntHandle(1)), + zap.Stringer("row_id", redact.Stringer(rmode, kv.IntHandle(1))), ) logEntry.CheckFieldNotEmpty(t, "row_mvcc") logEntry.CheckFieldNotEmpty(t, "index_mvcc") @@ -1162,7 +1165,7 @@ func TestCheckFailReport(t *testing.T) { logEntry.CheckField(t, zap.String("table_name", "admin_test"), zap.String("index_name", "uk1"), - zap.Stringer("row_id", kv.IntHandle(1)), + zap.Stringer("row_id", redact.Stringer(rmode, kv.IntHandle(1))), ) logEntry.CheckFieldNotEmpty(t, "row_mvcc") logEntry.CheckFieldNotEmpty(t, "index_mvcc") @@ -1205,7 +1208,7 @@ func TestCheckFailReport(t *testing.T) { logEntry.CheckField(t, zap.String("table_name", "admin_test"), zap.String("index_name", "uk1"), - zap.Stringer("row_id", kv.IntHandle(1)), + zap.Stringer("row_id", redact.Stringer(rmode, kv.IntHandle(1))), ) logEntry.CheckFieldNotEmpty(t, "row_mvcc") logEntry.CheckFieldNotEmpty(t, "index_mvcc") @@ -1231,7 +1234,7 @@ func TestCheckFailReport(t *testing.T) { logEntry.CheckField(t, zap.String("table_name", "admin_test"), zap.String("index_name", "k2"), - zap.Stringer("row_id", kv.IntHandle(1)), + zap.Stringer("row_id", redact.Stringer(rmode, kv.IntHandle(1))), ) logEntry.CheckFieldNotEmpty(t, "row_mvcc") logEntry.CheckFieldNotEmpty(t, "index_mvcc") @@ -1271,7 +1274,7 @@ func TestCheckFailReport(t *testing.T) { logEntry.CheckField(t, zap.String("table_name", "admin_test"), zap.String("index_name", "uk1"), - zap.Stringer("row_id", kv.IntHandle(282574488403969)), + zap.Stringer("row_id", redact.Stringer(rmode, kv.IntHandle(282574488403969))), ) logEntry.CheckFieldNotEmpty(t, "row_mvcc") logEntry.CheckFieldNotEmpty(t, "index_mvcc") diff --git a/pkg/server/conn.go b/pkg/server/conn.go index 976246bc23d11..01b01e7d85699 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -1187,7 +1187,7 @@ func errStrForLog(err error, redactMode string) string { } else { ret = errors.ErrorStack(err) } - return redact.Redact(redactMode, ret) + return redact.String(redactMode, ret) } func (cc *clientConn) addMetrics(cmd byte, startTime time.Time, err error) { diff --git a/pkg/server/conn_stmt.go b/pkg/server/conn_stmt.go index 85682bd8d52ad..4b234a0594e45 100644 --- a/pkg/server/conn_stmt.go +++ b/pkg/server/conn_stmt.go @@ -583,7 +583,7 @@ func (cc *clientConn) preparedStmt2String(stmtID uint32) string { } sql := parser.Normalize(cc.preparedStmt2StringNoArgs(stmtID), sv.EnableRedactNew) if m := sv.EnableRedactNew; m != "ON" { - sql += redact.Redact(sv.EnableRedactNew, sv.PlanCacheParams.String()) + sql += redact.String(sv.EnableRedactNew, sv.PlanCacheParams.String()) } return sql } diff --git a/pkg/session/nontransactional.go b/pkg/session/nontransactional.go index d8f1fcec190e1..c1a5787c18b56 100644 --- a/pkg/session/nontransactional.go +++ b/pkg/session/nontransactional.go @@ -69,7 +69,7 @@ type statementBuildInfo struct { } func (j job) String(redacted string) string { - return fmt.Sprintf("job id: %d, estimated size: %d, sql: %s", j.jobID, j.jobSize, redact.Redact(redacted, j.sql)) + return fmt.Sprintf("job id: %d, estimated size: %d, sql: %s", j.jobID, j.jobSize, redact.String(redacted, j.sql)) } // HandleNonTransactionalDML is the entry point for a non-transactional DML statement diff --git a/pkg/session/session.go b/pkg/session/session.go index 8298a93c05f33..7b63c4a1666cb 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -933,7 +933,7 @@ func (s *session) tryReplaceWriteConflictError(oldErr error) (newErr error) { if !kv.ErrWriteConflict.Equal(oldErr) { return nil } - if errors.RedactLogEnabled.Load() { + if errors.RedactLogEnabled.Load() == errors.RedactLogEnable { return nil } originErr := errors.Cause(oldErr) @@ -1174,7 +1174,7 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { // We print the queries at the first try only. sql := sqlForLog(st.GetTextToLog(false)) if sessVars.EnableRedactNew != "ON" { - sql += redact.Redact(sessVars.EnableRedactNew, sessVars.PlanCacheParams.String()) + sql += redact.String(sessVars.EnableRedactNew, sessVars.PlanCacheParams.String()) } logutil.Logger(ctx).Warn("retrying", zap.Int64("schemaVersion", schemaVersion), @@ -1669,7 +1669,7 @@ func (s *session) Parse(ctx context.Context, sql string) ([]ast.StmtNode, error) // Only print log message when this SQL is from the user. // Mute the warning for internal SQLs. if !s.sessionVars.InRestrictedSQL { - logutil.Logger(ctx).Warn("parse SQL failed", zap.Error(err), zap.String("SQL", redact.Redact(s.sessionVars.EnableRedactNew, sql))) + logutil.Logger(ctx).Warn("parse SQL failed", zap.Error(err), zap.String("SQL", redact.String(s.sessionVars.EnableRedactNew, sql))) s.sessionVars.StmtCtx.AppendError(err) } return nil, err @@ -1719,7 +1719,7 @@ func (s *session) ParseWithParams(ctx context.Context, sql string, args ...any) if err != nil { s.rollbackOnError(ctx) logSQL := sql[:min(500, len(sql))] - logutil.Logger(ctx).Warn("parse SQL failed", zap.Error(err), zap.String("SQL", redact.Redact(s.sessionVars.EnableRedactNew, logSQL))) + logutil.Logger(ctx).Warn("parse SQL failed", zap.Error(err), zap.String("SQL", redact.String(s.sessionVars.EnableRedactNew, logSQL))) return nil, util.SyntaxError(err) } durParse := time.Since(parseStartTime) @@ -3935,7 +3935,7 @@ func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { query = executor.QueryReplacer.Replace(query) if vars.EnableRedactNew != "ON" { - query += redact.Redact(vars.EnableRedactNew, vars.PlanCacheParams.String()) + query += redact.String(vars.EnableRedactNew, vars.PlanCacheParams.String()) } logutil.BgLogger().Info("GENERAL_LOG", zap.Uint64("conn", vars.ConnectionID), diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index a44ba536f3edf..090324c1d9cc3 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -2154,7 +2154,7 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBRedactLog, Value: DefTiDBRedactLog, Type: TypeEnum, PossibleValues: []string{Off, On, Marker}, SetSession: func(s *SessionVars, val string) error { s.EnableRedactLog = val != Off s.EnableRedactNew = val - errors.RedactLogEnabled.Store(s.EnableRedactLog) + errors.RedactLogEnabled.Store(val) return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBShardAllocateStep, Value: strconv.Itoa(DefTiDBShardAllocateStep), Type: TypeInt, MinValue: 1, MaxValue: uint64(math.MaxInt64), SetSession: func(s *SessionVars, val string) error { diff --git a/pkg/store/driver/txn/driver_test.go b/pkg/store/driver/txn/driver_test.go index 9933ad9ffb6e2..fe2e55833f577 100644 --- a/pkg/store/driver/txn/driver_test.go +++ b/pkg/store/driver/txn/driver_test.go @@ -70,7 +70,7 @@ func TestWriteConflictPrettyFormat(t *testing.T) { // test log redaction original := errors.RedactLogEnabled.Load() - errors.RedactLogEnabled.Store(true) + errors.RedactLogEnabled.Store(errors.RedactLogEnable) defer func() { errors.RedactLogEnabled.Store(original) }() expectedStr = "[kv:9007]Write conflict, " + "txnStartTS=399402937522847774, conflictStartTS=399402937719455772, conflictCommitTS=399402937719455773, " + diff --git a/pkg/util/dbterror/terror_test.go b/pkg/util/dbterror/terror_test.go index 534fe84002090..101d1f4c44d66 100644 --- a/pkg/util/dbterror/terror_test.go +++ b/pkg/util/dbterror/terror_test.go @@ -29,7 +29,7 @@ func genErrMsg(pattern string, a ...any) string { func TestErrorRedact(t *testing.T) { original := errors.RedactLogEnabled.Load() - errors.RedactLogEnabled.Store(true) + errors.RedactLogEnabled.Store(errors.RedactLogEnable) defer func() { errors.RedactLogEnabled.Store(original) }() class := ErrClass{} diff --git a/pkg/util/logutil/consistency/BUILD.bazel b/pkg/util/logutil/consistency/BUILD.bazel index 94b90c118d775..a45516fa0762c 100644 --- a/pkg/util/logutil/consistency/BUILD.bazel +++ b/pkg/util/logutil/consistency/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/types", "//pkg/util/dbterror", "//pkg/util/logutil", + "//pkg/util/redact", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_zap//:zap", diff --git a/pkg/util/logutil/consistency/reporter.go b/pkg/util/logutil/consistency/reporter.go index 44a3cb7015ef0..a69cfc3485c92 100644 --- a/pkg/util/logutil/consistency/reporter.go +++ b/pkg/util/logutil/consistency/reporter.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/redact" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) @@ -185,69 +186,54 @@ func decodeMvccRecordValue(bs []byte, colMap map[int64]*types.FieldType, tb *mod // ReportLookupInconsistent reports inconsistent when index rows is more than record rows. func (r *Reporter) ReportLookupInconsistent(ctx context.Context, idxCnt, tblCnt int, missHd, fullHd []kv.Handle, missRowIdx []RecordData) error { - if r.Sctx.GetSessionVars().EnableRedactLog { - logutil.Logger(ctx).Error("indexLookup found data inconsistency", - zap.String("table_name", r.Tbl.Name.O), - zap.String("index_name", r.Idx.Name.O), - zap.Int("index_cnt", idxCnt), - zap.Int("table_cnt", tblCnt), - zap.Stack("stack")) - } else { - const maxFullHandleCnt = 50 - displayFullHdCnt := len(fullHd) - if displayFullHdCnt > maxFullHandleCnt { - displayFullHdCnt = maxFullHandleCnt - } - fs := []zap.Field{ - zap.String("table_name", r.Tbl.Name.O), - zap.String("index_name", r.Idx.Name.O), - zap.Int("index_cnt", idxCnt), zap.Int("table_cnt", tblCnt), - zap.String("missing_handles", fmt.Sprint(missHd)), - zap.String("total_handles", fmt.Sprint(fullHd[:displayFullHdCnt])), - } + rmode := r.Sctx.GetSessionVars().EnableRedactNew + + const maxFullHandleCnt = 50 + displayFullHdCnt := min(len(fullHd), maxFullHandleCnt) + fs := []zap.Field{ + zap.String("table_name", r.Tbl.Name.O), + zap.String("index_name", r.Idx.Name.O), + zap.Int("index_cnt", idxCnt), zap.Int("table_cnt", tblCnt), + zap.String("missing_handles", redact.String(rmode, fmt.Sprint(missHd))), + zap.String("total_handles", redact.String(rmode, fmt.Sprint(fullHd[:displayFullHdCnt]))), + } + if rmode != "ON" { store, ok := r.Sctx.GetStore().(helper.Storage) if ok { for i, hd := range missHd { - fs = append(fs, zap.String("row_mvcc_"+strconv.Itoa(i), GetMvccByKey(store, r.HandleEncode(hd), DecodeRowMvccData(r.Tbl)))) + fs = append(fs, zap.String("row_mvcc_"+strconv.Itoa(i), redact.String(rmode, GetMvccByKey(store, r.HandleEncode(hd), DecodeRowMvccData(r.Tbl))))) } for i := range missRowIdx { - fs = append(fs, zap.String("index_mvcc_"+strconv.Itoa(i), GetMvccByKey(store, r.IndexEncode(&missRowIdx[i]), DecodeIndexMvccData(r.Idx)))) + fs = append(fs, zap.String("index_mvcc_"+strconv.Itoa(i), redact.String(rmode, GetMvccByKey(store, r.IndexEncode(&missRowIdx[i]), DecodeIndexMvccData(r.Idx))))) } } - - logutil.Logger(ctx).Error("indexLookup found data inconsistency", fs...) } + fs = append(fs, zap.Stack("stack")) + logutil.Logger(ctx).Error("indexLookup found data inconsistency", fs...) return ErrLookupInconsistent.GenWithStackByArgs(r.Tbl.Name.O, r.Idx.Name.O, idxCnt, tblCnt) } // ReportAdminCheckInconsistentWithColInfo reports inconsistent when the value of index row is different from record row. func (r *Reporter) ReportAdminCheckInconsistentWithColInfo(ctx context.Context, handle kv.Handle, colName string, idxDat, tblDat fmt.Stringer, err error, idxRow *RecordData) error { - if r.Sctx.GetSessionVars().EnableRedactLog { - logutil.Logger(ctx).Error("admin check found data inconsistency", - zap.String("table_name", r.Tbl.Name.O), - zap.String("index", r.Idx.Name.O), - zap.String("col", colName), - zap.Error(err), - zap.Stack("stack"), - ) - } else { - fs := []zap.Field{ - zap.String("table_name", r.Tbl.Name.O), - zap.String("index_name", r.Idx.Name.O), - zap.String("col", colName), - zap.Stringer("row_id", handle), - zap.Stringer("idxDatum", idxDat), - zap.Stringer("rowDatum", tblDat), - } + rmode := r.Sctx.GetSessionVars().EnableRedactNew + fs := []zap.Field{ + zap.String("table_name", r.Tbl.Name.O), + zap.String("index_name", r.Idx.Name.O), + zap.String("col", colName), + zap.Stringer("row_id", redact.Stringer(rmode, handle)), + zap.Stringer("idxDatum", redact.Stringer(rmode, idxDat)), + zap.Stringer("rowDatum", redact.Stringer(rmode, tblDat)), + } + if rmode != "ON" { store, ok := r.Sctx.GetStore().(helper.Storage) if ok { - fs = append(fs, zap.String("row_mvcc", GetMvccByKey(store, r.HandleEncode(handle), DecodeRowMvccData(r.Tbl)))) - fs = append(fs, zap.String("index_mvcc", GetMvccByKey(store, r.IndexEncode(idxRow), DecodeIndexMvccData(r.Idx)))) + fs = append(fs, zap.String("row_mvcc", redact.String(rmode, GetMvccByKey(store, r.HandleEncode(handle), DecodeRowMvccData(r.Tbl))))) + fs = append(fs, zap.String("index_mvcc", redact.String(rmode, GetMvccByKey(store, r.IndexEncode(idxRow), DecodeIndexMvccData(r.Idx))))) } - fs = append(fs, zap.Error(err)) - fs = append(fs, zap.Stack("stack")) - logutil.Logger(ctx).Error("admin check found data inconsistency", fs...) } + fs = append(fs, zap.Error(err)) + fs = append(fs, zap.Stack("stack")) + logutil.Logger(ctx).Error("admin check found data inconsistency", fs...) return ErrAdminCheckInconsistentWithColInfo.GenWithStackByArgs(r.Tbl.Name.O, r.Idx.Name.O, colName, fmt.Sprint(handle), fmt.Sprint(idxDat), fmt.Sprint(tblDat), err) } @@ -266,29 +252,24 @@ func (r *RecordData) String() string { // ReportAdminCheckInconsistent reports inconsistent when single index row not found in record rows. func (r *Reporter) ReportAdminCheckInconsistent(ctx context.Context, handle kv.Handle, idxRow, tblRow *RecordData) error { - if r.Sctx.GetSessionVars().EnableRedactLog { - logutil.Logger(ctx).Error("admin check found data inconsistency", - zap.String("table_name", r.Tbl.Name.O), - zap.String("index", r.Idx.Name.O), - zap.Stack("stack"), - ) - } else { - fs := []zap.Field{ - zap.String("table_name", r.Tbl.Name.O), - zap.String("index_name", r.Idx.Name.O), - zap.Stringer("row_id", handle), - zap.Stringer("index", idxRow), - zap.Stringer("row", tblRow), - } + rmode := r.Sctx.GetSessionVars().EnableRedactNew + fs := []zap.Field{ + zap.String("table_name", r.Tbl.Name.O), + zap.String("index_name", r.Idx.Name.O), + zap.Stringer("row_id", redact.Stringer(rmode, handle)), + zap.Stringer("index", redact.Stringer(rmode, idxRow)), + zap.Stringer("row", redact.Stringer(rmode, tblRow)), + } + if rmode != "ON" { store, ok := r.Sctx.GetStore().(helper.Storage) if ok { - fs = append(fs, zap.String("row_mvcc", GetMvccByKey(store, r.HandleEncode(handle), DecodeRowMvccData(r.Tbl)))) + fs = append(fs, zap.String("row_mvcc", redact.String(rmode, GetMvccByKey(store, r.HandleEncode(handle), DecodeRowMvccData(r.Tbl))))) if idxRow != nil { - fs = append(fs, zap.String("index_mvcc", GetMvccByKey(store, r.IndexEncode(idxRow), DecodeIndexMvccData(r.Idx)))) + fs = append(fs, zap.String("index_mvcc", redact.String(rmode, GetMvccByKey(store, r.IndexEncode(idxRow), DecodeIndexMvccData(r.Idx))))) } } - fs = append(fs, zap.Stack("stack")) - logutil.Logger(ctx).Error("admin check found data inconsistency", fs...) } + fs = append(fs, zap.Stack("stack")) + logutil.Logger(ctx).Error("admin check found data inconsistency", fs...) return ErrAdminCheckInconsistent.GenWithStackByArgs(r.Tbl.Name.O, r.Idx.Name.O, fmt.Sprint(handle), fmt.Sprint(idxRow), fmt.Sprint(tblRow)) } diff --git a/pkg/util/redact/redact.go b/pkg/util/redact/redact.go index 8f42c64e6caa1..2bae8a8991f6c 100644 --- a/pkg/util/redact/redact.go +++ b/pkg/util/redact/redact.go @@ -15,11 +15,16 @@ package redact import ( + "fmt" "strings" ) -// Redact will redact the input string according to 'mode'. Check 'tidb_redact_log': https://github.com/pingcap/tidb/blob/acf9e3128693a5a13f31027f05f4de41edf8d7b2/pkg/sessionctx/variable/sysvar.go#L2154. -func Redact(mode string, input string) string { +var ( + _ fmt.Stringer = redactStringer{} +) + +// String will redact the input string according to 'mode'. Check 'tidb_redact_log': https://github.com/pingcap/tidb/blob/acf9e3128693a5a13f31027f05f4de41edf8d7b2/pkg/sessionctx/variable/sysvar.go#L2154. +func String(mode string, input string) string { switch mode { case "MARKER": b := &strings.Builder{} @@ -41,3 +46,17 @@ func Redact(mode string, input string) string { return "" } } + +type redactStringer struct { + mode string + stringer fmt.Stringer +} + +func (s redactStringer) String() string { + return String(s.mode, s.stringer.String()) +} + +// Stringer will redact the input stringer according to 'mode', similar to String(). +func Stringer(mode string, input fmt.Stringer) redactStringer { + return redactStringer{mode, input} +} diff --git a/pkg/util/redact/redact_test.go b/pkg/util/redact/redact_test.go index 4214b618fdf0e..4f1afeb7e3faf 100644 --- a/pkg/util/redact/redact_test.go +++ b/pkg/util/redact/redact_test.go @@ -20,6 +20,14 @@ import ( "github.com/stretchr/testify/require" ) +type testStringer struct { + str string +} + +func (s *testStringer) String() string { + return s.str +} + func TestRedact(t *testing.T) { for _, c := range []struct { mode string @@ -32,6 +40,7 @@ func TestRedact(t *testing.T) { {"MARKER", "f‹xcv", "‹f‹‹xcv›"}, {"MARKER", "f›xcv", "‹f››xcv›"}, } { - require.Equal(t, c.output, Redact(c.mode, c.input)) + require.Equal(t, c.output, String(c.mode, c.input)) + require.Equal(t, c.output, Stringer(c.mode, &testStringer{c.input}).String()) } } From 5745d3df9de4afa98909eb972c8456f8135d65f5 Mon Sep 17 00:00:00 2001 From: jiyfhust Date: Tue, 12 Mar 2024 18:12:40 +0800 Subject: [PATCH 2/5] ddl: make prefix index compatible with mysql 8.0 when prefix length equal to column length (#48296) close pingcap/tidb#48295 --- pkg/ddl/column.go | 2 +- pkg/ddl/index.go | 7 +++- tests/integrationtest/r/table/index.result | 47 ++++++++++++++++++++++ tests/integrationtest/t/table/index.test | 12 ++++++ 4 files changed, 66 insertions(+), 2 deletions(-) diff --git a/pkg/ddl/column.go b/pkg/ddl/column.go index 0ce3042037c69..bf3b0a1ffad03 100644 --- a/pkg/ddl/column.go +++ b/pkg/ddl/column.go @@ -631,7 +631,7 @@ func SetIdxColNameOffset(idxCol *model.IndexColumn, changingCol *model.ColumnInf idxCol.Name = changingCol.Name idxCol.Offset = changingCol.Offset canPrefix := types.IsTypePrefixable(changingCol.GetType()) - if !canPrefix || (changingCol.GetFlen() < idxCol.Length) { + if !canPrefix || (changingCol.GetFlen() <= idxCol.Length) { idxCol.Length = types.UnspecifiedLength } } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index ee068bf657f9e..ca7e11b46ce31 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -113,7 +113,12 @@ func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, inde mvIndex = true } indexColLen := ip.Length - indexColumnLength, err := getIndexColumnLength(col, ip.Length) + if indexColLen != types.UnspecifiedLength && + types.IsTypeChar(col.FieldType.GetType()) && + indexColLen == col.FieldType.GetFlen() { + indexColLen = types.UnspecifiedLength + } + indexColumnLength, err := getIndexColumnLength(col, indexColLen) if err != nil { return nil, false, err } diff --git a/tests/integrationtest/r/table/index.result b/tests/integrationtest/r/table/index.result index 7cec18d6bd831..fa9a834826d9e 100644 --- a/tests/integrationtest/r/table/index.result +++ b/tests/integrationtest/r/table/index.result @@ -31,3 +31,50 @@ Error 1062 (23000): Duplicate entry 'q' for key 't.idx_a' update ignore t set a = 'qcc' where a = 'rcc'; Level Code Message Warning 1062 Duplicate entry 'q' for key 't.idx_a' +drop table if exists t; +create table t (id int, a varchar(64), b varchar(64), c varchar(64), index idx_a(a(64))); +show create table t; +Table Create Table +t CREATE TABLE `t` ( + `id` int(11) DEFAULT NULL, + `a` varchar(64) DEFAULT NULL, + `b` varchar(64) DEFAULT NULL, + `c` varchar(64) DEFAULT NULL, + KEY `idx_a` (`a`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin +alter table t add index idx_b(b(64)); +show create table t; +Table Create Table +t CREATE TABLE `t` ( + `id` int(11) DEFAULT NULL, + `a` varchar(64) DEFAULT NULL, + `b` varchar(64) DEFAULT NULL, + `c` varchar(64) DEFAULT NULL, + KEY `idx_a` (`a`), + KEY `idx_b` (`b`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin +alter table t add index idx_c(c(32)); +show create table t; +Table Create Table +t CREATE TABLE `t` ( + `id` int(11) DEFAULT NULL, + `a` varchar(64) DEFAULT NULL, + `b` varchar(64) DEFAULT NULL, + `c` varchar(64) DEFAULT NULL, + KEY `idx_a` (`a`), + KEY `idx_b` (`b`), + KEY `idx_c` (`c`(32)) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin +alter table t modify column c varchar(32); +show create table t; +Table Create Table +t CREATE TABLE `t` ( + `id` int(11) DEFAULT NULL, + `a` varchar(64) DEFAULT NULL, + `b` varchar(64) DEFAULT NULL, + `c` varchar(32) DEFAULT NULL, + KEY `idx_a` (`a`), + KEY `idx_b` (`b`), + KEY `idx_c` (`c`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin +drop table t; diff --git a/tests/integrationtest/t/table/index.test b/tests/integrationtest/t/table/index.test index 3d760a3a7dbf6..0eb979f46998b 100644 --- a/tests/integrationtest/t/table/index.test +++ b/tests/integrationtest/t/table/index.test @@ -33,3 +33,15 @@ update t set a = 'qcc' where a = 'rcc'; --enable_warnings; update ignore t set a = 'qcc' where a = 'rcc'; --disable_warnings; + +# Test Issue 48295. +drop table if exists t; +create table t (id int, a varchar(64), b varchar(64), c varchar(64), index idx_a(a(64))); +show create table t; +alter table t add index idx_b(b(64)); +show create table t; +alter table t add index idx_c(c(32)); +show create table t; +alter table t modify column c varchar(32); +show create table t; +drop table t; From 661f6d61aeeac5036c227dff7dcc9b179dec5381 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Tue, 12 Mar 2024 18:48:39 +0800 Subject: [PATCH 3/5] json: don't resize slice to get extra zero when the length is unspecified (#51586) close pingcap/tidb#51547 --- pkg/expression/builtin_cast.go | 2 +- pkg/expression/builtin_cast_vec.go | 2 +- .../integrationtest/r/expression/json.result | 21 +++++++++++++++++++ tests/integrationtest/t/expression/json.test | 13 ++++++++++++ 4 files changed, 36 insertions(+), 2 deletions(-) diff --git a/pkg/expression/builtin_cast.go b/pkg/expression/builtin_cast.go index e87109bea088d..36a5a7ce9b625 100644 --- a/pkg/expression/builtin_cast.go +++ b/pkg/expression/builtin_cast.go @@ -878,7 +878,7 @@ func (b *builtinCastStringAsJSONSig) evalJSON(ctx EvalContext, row chunk.Row) (r typ := b.args[0].GetType() if types.IsBinaryStr(typ) { buf := []byte(val) - if typ.GetType() == mysql.TypeString { + if typ.GetType() == mysql.TypeString && typ.GetFlen() > 0 { // the tailing zero should also be in the opaque json buf = make([]byte, typ.GetFlen()) copy(buf, val) diff --git a/pkg/expression/builtin_cast_vec.go b/pkg/expression/builtin_cast_vec.go index 56c60c8b2158c..bce2383171338 100644 --- a/pkg/expression/builtin_cast_vec.go +++ b/pkg/expression/builtin_cast_vec.go @@ -853,7 +853,7 @@ func (b *builtinCastStringAsJSONSig) vecEvalJSON(ctx EvalContext, input *chunk.C val := buf.GetBytes(i) resultBuf := val - if typ.GetType() == mysql.TypeString { + if typ.GetType() == mysql.TypeString && typ.GetFlen() > 0 { // only for BINARY: the tailing zero should also be in the opaque json resultBuf = make([]byte, typ.GetFlen()) copy(resultBuf, val) diff --git a/tests/integrationtest/r/expression/json.result b/tests/integrationtest/r/expression/json.result index a2092ffe22376..2f1cfdece1660 100644 --- a/tests/integrationtest/r/expression/json.result +++ b/tests/integrationtest/r/expression/json.result @@ -628,3 +628,24 @@ a cast(a as signed) "-1" -1 "18446744073709551615" -1 "18446744073709552000" -1 +select cast(binary 'aa' as json); +cast(binary 'aa' as json) +"base64:type254:YWE=" +drop table if exists t; +create table t (vb VARBINARY(10), b BINARY(10), vc VARCHAR(10), c CHAR(10)); +insert into t values ('1', '1', '1', '1'); +select cast(vb as json), cast(b as json), cast(vc as json), cast(c as json) from t; +cast(vb as json) cast(b as json) cast(vc as json) cast(c as json) +"base64:type15:MQ==" "base64:type254:MQAAAAAAAAAAAA==" 1 1 +select 1 from t where cast(vb as json) = '1'; +1 +select 1 from t where cast(b as json) = '1'; +1 +select 1 from t where cast(vc as json) = '1'; +1 +select 1 from t where cast(c as json) = '1'; +1 +select 1 from t where cast(BINARY vc as json) = '1'; +1 +select 1 from t where cast(BINARY c as json) = '1'; +1 diff --git a/tests/integrationtest/t/expression/json.test b/tests/integrationtest/t/expression/json.test index 005b3754d2553..714aab87f2edd 100644 --- a/tests/integrationtest/t/expression/json.test +++ b/tests/integrationtest/t/expression/json.test @@ -380,3 +380,16 @@ insert into t values ('"18446744073709552000"'); select a, cast(a as unsigned) from t; -- sorted_result select a, cast(a as signed) from t; + +# TestCastBinaryStringToJSON +select cast(binary 'aa' as json); +drop table if exists t; +create table t (vb VARBINARY(10), b BINARY(10), vc VARCHAR(10), c CHAR(10)); +insert into t values ('1', '1', '1', '1'); +select cast(vb as json), cast(b as json), cast(vc as json), cast(c as json) from t; +select 1 from t where cast(vb as json) = '1'; +select 1 from t where cast(b as json) = '1'; +select 1 from t where cast(vc as json) = '1'; +select 1 from t where cast(c as json) = '1'; +select 1 from t where cast(BINARY vc as json) = '1'; +select 1 from t where cast(BINARY c as json) = '1'; From 0639290a96ad9530405e3be155f488c422ddf326 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 12 Mar 2024 19:39:39 +0800 Subject: [PATCH 4/5] restore: shift rewrite ranges before split ranges (#51653) close pingcap/tidb#51685 --- br/pkg/restore/batcher.go | 26 ++++---- br/pkg/restore/batcher_test.go | 4 +- br/pkg/restore/client.go | 7 +-- br/pkg/restore/merge.go | 63 +++++++++++++++++-- br/pkg/restore/merge_test.go | 102 +++++++++++++++++++++++++++++-- br/pkg/restore/pipeline_items.go | 11 ++-- br/pkg/restore/range.go | 41 ++----------- br/pkg/restore/range_test.go | 20 ++++-- br/pkg/restore/split.go | 5 +- br/pkg/restore/split_test.go | 20 +++--- br/pkg/restore/util.go | 7 +-- br/pkg/task/restore_raw.go | 6 +- br/pkg/task/restore_txn.go | 6 +- 13 files changed, 226 insertions(+), 92 deletions(-) diff --git a/br/pkg/restore/batcher.go b/br/pkg/restore/batcher.go index 57bd477822875..033ef13f457f0 100644 --- a/br/pkg/restore/batcher.go +++ b/br/pkg/restore/batcher.go @@ -36,7 +36,6 @@ const ( type Batcher struct { cachedTables []TableWithRange cachedTablesMu *sync.Mutex - rewriteRules *RewriteRules // autoCommitJoiner is for joining the background batch sender. autoCommitJoiner chan<- struct{} @@ -114,7 +113,6 @@ func NewBatcher( outCh := DefaultOutputTableChan() sendChan := make(chan SendType, 2) b := &Batcher{ - rewriteRules: EmptyRewriteRule(), sendErr: errCh, outCh: outCh, sender: sender, @@ -227,8 +225,10 @@ type DrainResult struct { TablesToSend []CreatedTable // BlankTablesAfterSend are tables that will be full-restored after this batch send. BlankTablesAfterSend []CreatedTable - RewriteRules *RewriteRules - Ranges []rtree.Range + // RewriteRules are the rewrite rules for the tables. + // the key is the table id after rewritten. + RewriteRulesMap map[int64]*RewriteRules + Ranges []rtree.Range // Record which part of ranges belongs to the table TableEndOffsetInRanges []int } @@ -240,14 +240,19 @@ func (result DrainResult) Files() []TableIDWithFiles { for i, endOffset := range result.TableEndOffsetInRanges { tableID := result.TablesToSend[i].Table.ID ranges := result.Ranges[startOffset:endOffset] - files := make([]*backuppb.File, 0, len(result.Ranges)*2) + // each range has at least a default file + a write file + files := make([]*backuppb.File, 0, len(ranges)*2) for _, rg := range ranges { files = append(files, rg.Files...) } - + var rules *RewriteRules + if r, ok := result.RewriteRulesMap[tableID]; ok { + rules = r + } tableIDWithFiles = append(tableIDWithFiles, TableIDWithFiles{ - TableID: tableID, - Files: files, + TableID: tableID, + Files: files, + RewriteRules: rules, }) // update start offset @@ -261,7 +266,7 @@ func newDrainResult() DrainResult { return DrainResult{ TablesToSend: make([]CreatedTable, 0), BlankTablesAfterSend: make([]CreatedTable, 0), - RewriteRules: EmptyRewriteRule(), + RewriteRulesMap: EmptyRewriteRulesMap(), Ranges: make([]rtree.Range, 0), TableEndOffsetInRanges: make([]int, 0), } @@ -329,7 +334,7 @@ func (b *Batcher) drainRanges() DrainResult { thisTableLen := len(thisTable.Range) collected := len(result.Ranges) - result.RewriteRules.Append(*thisTable.RewriteRule) + result.RewriteRulesMap[thisTable.Table.ID] = thisTable.RewriteRule result.TablesToSend = append(result.TablesToSend, thisTable.CreatedTable) // the batch is full, we should stop here! @@ -423,7 +428,6 @@ func (b *Batcher) Add(tbs TableWithRange) { zap.Int("batch size", b.Len()), ) b.cachedTables = append(b.cachedTables, tbs) - b.rewriteRules.Append(*tbs.RewriteRule) atomic.AddInt32(&b.size, int32(len(tbs.Range))) b.cachedTablesMu.Unlock() diff --git a/br/pkg/restore/batcher_test.go b/br/pkg/restore/batcher_test.go index d6d7cab7e90c7..dad4634becf73 100644 --- a/br/pkg/restore/batcher_test.go +++ b/br/pkg/restore/batcher_test.go @@ -39,7 +39,9 @@ func (sender *drySender) RestoreBatch(ranges restore.DrainResult) { defer sender.mu.Unlock() log.Info("fake restore range", rtree.ZapRanges(ranges.Ranges)) sender.nBatch++ - sender.rewriteRules.Append(*ranges.RewriteRules) + for _, r := range ranges.RewriteRulesMap { + sender.rewriteRules.Append(*r) + } sender.ranges = append(sender.ranges, ranges.Ranges...) sender.sink.EmitTables(ranges.BlankTablesAfterSend...) } diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 1fe97552743f5..d043c41887047 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1407,10 +1407,9 @@ func getGroupFiles(files []*backuppb.File, supportMulti bool) [][]*backuppb.File // SplitRanges implements TiKVRestorer. func (rc *Client) SplitRanges(ctx context.Context, ranges []rtree.Range, - rewriteRules *RewriteRules, updateCh glue.Progress, isRawKv bool) error { - return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv) + return SplitRanges(ctx, rc, ranges, updateCh, isRawKv) } func (rc *Client) WrapLogFilesIterWithSplitHelper(logIter LogIter, rules map[int64]*RewriteRules, g glue.Glue, store kv.Storage) (LogIter, error) { @@ -1470,7 +1469,6 @@ func (rc *Client) WrapLogFilesIterWithCheckpoint( func (rc *Client) RestoreSSTFiles( ctx context.Context, tableIDWithFiles []TableIDWithFiles, - rewriteRules *RewriteRules, updateCh glue.Progress, ) (err error) { start := time.Now() @@ -1505,6 +1503,7 @@ LOOPFORTABLE: for _, tableIDWithFile := range tableIDWithFiles { tableID := tableIDWithFile.TableID files := tableIDWithFile.Files + rules := tableIDWithFile.RewriteRules fileCount += len(files) for rangeFiles, leftFiles = drainFilesByRange(files); len(rangeFiles) != 0; rangeFiles, leftFiles = drainFilesByRange(leftFiles) { filesReplica := rangeFiles @@ -1529,7 +1528,7 @@ LOOPFORTABLE: updateCh.Inc() } }() - return rc.fileImporter.ImportSSTFiles(ectx, fs, rewriteRules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()) + return rc.fileImporter.ImportSSTFiles(ectx, fs, rules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()) }(filesGroup); importErr != nil { return errors.Trace(importErr) } diff --git a/br/pkg/restore/merge.go b/br/pkg/restore/merge.go index deaa1d1c48bd8..7f588d1483804 100644 --- a/br/pkg/restore/merge.go +++ b/br/pkg/restore/merge.go @@ -7,8 +7,13 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/rtree" + "github.com/pingcap/tidb/pkg/tablecodec" + "go.uber.org/zap" ) const ( @@ -29,13 +34,16 @@ type MergeRangesStat struct { MergedRegionBytesAvg int } -// MergeFileRanges returns ranges of the files are merged based on +// MergeAndRewriteFileRanges returns ranges of the files are merged based on // splitSizeBytes and splitKeyCount. // // By merging small ranges, it speeds up restoring a backup that contains many // small ranges (regions) as it reduces split region and scatter region. -func MergeFileRanges( - files []*backuppb.File, splitSizeBytes, splitKeyCount uint64, +func MergeAndRewriteFileRanges( + files []*backuppb.File, + rewriteRules *RewriteRules, + splitSizeBytes, + splitKeyCount uint64, ) ([]rtree.Range, *MergeRangesStat, error) { if len(files) == 0 { return []rtree.Range{}, &MergeRangesStat{}, nil @@ -78,12 +86,20 @@ func MergeFileRanges( for _, f := range filesMap[key] { rangeSize += f.Size_ } - if out := rangeTree.InsertRange(rtree.Range{ + rg := &rtree.Range{ StartKey: files[0].GetStartKey(), EndKey: files[0].GetEndKey(), Files: files, Size: rangeSize, - }); out != nil { + } + // rewrite Range for split. + // so that splitRanges no need to handle rewrite rules any more. + tmpRng, err := RewriteRange(rg, rewriteRules) + if err != nil { + return nil, nil, errors.Annotatef(berrors.ErrRestoreInvalidRange, + "unable to rewrite range files %+v", files) + } + if out := rangeTree.InsertRange(*tmpRng); out != nil { return nil, nil, errors.Annotatef(berrors.ErrRestoreInvalidRange, "duplicate range %s files %+v", out, files) } @@ -107,3 +123,40 @@ func MergeFileRanges( MergedRegionBytesAvg: int(mergedRegionBytesAvg), }, nil } + +func RewriteRange(rg *rtree.Range, rewriteRules *RewriteRules) (*rtree.Range, error) { + if rewriteRules == nil { + return rg, nil + } + startID := tablecodec.DecodeTableID(rg.StartKey) + endID := tablecodec.DecodeTableID(rg.EndKey) + var rule *import_sstpb.RewriteRule + if startID != endID { + log.Warn("table id does not match", + logutil.Key("startKey", rg.StartKey), + logutil.Key("endKey", rg.EndKey), + zap.Int64("startID", startID), + zap.Int64("endID", endID)) + return nil, errors.Annotate(berrors.ErrRestoreTableIDMismatch, "table id mismatch") + } + rg.StartKey, rule = replacePrefix(rg.StartKey, rewriteRules) + if rule == nil { + log.Warn("cannot find rewrite rule", logutil.Key("key", rg.StartKey)) + } else { + log.Debug( + "rewrite start key", + logutil.Key("key", rg.StartKey), logutil.RewriteRule(rule)) + } + oldKey := rg.EndKey + rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules) + if rule == nil { + log.Warn("cannot find rewrite rule", logutil.Key("key", rg.EndKey)) + } else { + log.Debug( + "rewrite end key", + logutil.Key("origin-key", oldKey), + logutil.Key("key", rg.EndKey), + logutil.RewriteRule(rule)) + } + return rg, nil +} diff --git a/br/pkg/restore/merge_test.go b/br/pkg/restore/merge_test.go index 404d187e1a59b..a9c185070e2a4 100644 --- a/br/pkg/restore/merge_test.go +++ b/br/pkg/restore/merge_test.go @@ -12,9 +12,11 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/tidb/br/pkg/conn" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/restore" + "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" @@ -208,7 +210,7 @@ func TestMergeRanges(t *testing.T) { for _, f := range cs.files { files = append(files, fb.build(f[0], f[1], f[2], f[3], f[4])...) } - rngs, stat, err := restore.MergeFileRanges(files, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) + rngs, stat, err := restore.MergeAndRewriteFileRanges(files, nil, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) require.NoErrorf(t, err, "%+v", cs) require.Equalf(t, cs.stat.TotalRegions, stat.TotalRegions, "%+v", cs) require.Equalf(t, cs.stat.MergedRegions, stat.MergedRegions, "%+v", cs) @@ -230,8 +232,8 @@ func TestMergeRawKVRanges(t *testing.T) { files = append(files, fb.build(1, 0, 2, 1, 1)...) // RawKV does not have write cf files = files[1:] - _, stat, err := restore.MergeFileRanges( - files, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) + _, stat, err := restore.MergeAndRewriteFileRanges( + files, nil, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) require.NoError(t, err) require.Equal(t, 1, stat.TotalRegions) require.Equal(t, 1, stat.MergedRegions) @@ -243,8 +245,8 @@ func TestInvalidRanges(t *testing.T) { files = append(files, fb.build(1, 0, 1, 1, 1)...) files[0].Name = "invalid.sst" files[0].Cf = "invalid" - _, _, err := restore.MergeFileRanges( - files, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) + _, _, err := restore.MergeAndRewriteFileRanges( + files, nil, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) require.Error(t, err) require.Equal(t, berrors.ErrRestoreInvalidBackup, errors.Cause(err)) } @@ -265,7 +267,7 @@ func benchmarkMergeRanges(b *testing.B, filesCount int) { } var err error for i := 0; i < b.N; i++ { - _, _, err = restore.MergeFileRanges(files, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) + _, _, err = restore.MergeAndRewriteFileRanges(files, nil, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) if err != nil { b.Error(err) } @@ -291,3 +293,91 @@ func BenchmarkMergeRanges50k(b *testing.B) { func BenchmarkMergeRanges100k(b *testing.B) { benchmarkMergeRanges(b, 100000) } +func TestRewriteRange(t *testing.T) { + // Define test cases + cases := []struct { + rg *rtree.Range + rewriteRules *restore.RewriteRules + expectedRange *rtree.Range + expectedError error + }{ + // Test case 1: No rewrite rules + { + rg: &rtree.Range{ + StartKey: []byte("startKey"), + EndKey: []byte("endKey"), + }, + rewriteRules: nil, + expectedRange: &rtree.Range{StartKey: []byte("startKey"), EndKey: []byte("endKey")}, + expectedError: nil, + }, + // Test case 2: Rewrite rule found for both start key and end key + { + rg: &rtree.Range{ + StartKey: append(tablecodec.GenTableIndexPrefix(1), []byte("startKey")...), + EndKey: append(tablecodec.GenTableIndexPrefix(1), []byte("endKey")...), + }, + rewriteRules: &restore.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.GenTableIndexPrefix(1), + NewKeyPrefix: tablecodec.GenTableIndexPrefix(2), + }, + }, + }, + expectedRange: &rtree.Range{ + StartKey: append(tablecodec.GenTableIndexPrefix(2), []byte("startKey")...), + EndKey: append(tablecodec.GenTableIndexPrefix(2), []byte("endKey")...), + }, + expectedError: nil, + }, + // Test case 3: Rewrite rule found for end key + { + rg: &rtree.Range{ + StartKey: append(tablecodec.GenTableIndexPrefix(1), []byte("startKey")...), + EndKey: append(tablecodec.GenTableIndexPrefix(1), []byte("endKey")...), + }, + rewriteRules: &restore.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: append(tablecodec.GenTableIndexPrefix(1), []byte("endKey")...), + NewKeyPrefix: append(tablecodec.GenTableIndexPrefix(2), []byte("newEndKey")...), + }, + }, + }, + expectedRange: &rtree.Range{ + StartKey: append(tablecodec.GenTableIndexPrefix(1), []byte("startKey")...), + EndKey: append(tablecodec.GenTableIndexPrefix(2), []byte("newEndKey")...), + }, + expectedError: nil, + }, + // Test case 4: Table ID mismatch + { + rg: &rtree.Range{ + StartKey: []byte("t1_startKey"), + EndKey: []byte("t2_endKey"), + }, + rewriteRules: &restore.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: []byte("t1_startKey"), + NewKeyPrefix: []byte("t2_newStartKey"), + }, + }, + }, + expectedRange: nil, + expectedError: errors.Annotate(berrors.ErrRestoreTableIDMismatch, "table id mismatch"), + }, + } + + // Run test cases + for _, tc := range cases { + actualRange, actualError := restore.RewriteRange(tc.rg, tc.rewriteRules) + if tc.expectedError != nil { + require.EqualError(t, tc.expectedError, actualError.Error()) + } else { + require.NoError(t, actualError) + } + require.Equal(t, tc.expectedRange, actualRange) + } +} diff --git a/br/pkg/restore/pipeline_items.go b/br/pkg/restore/pipeline_items.go index 4eaea8cc98dbf..594e571925de0 100644 --- a/br/pkg/restore/pipeline_items.go +++ b/br/pkg/restore/pipeline_items.go @@ -161,6 +161,7 @@ func DefaultOutputTableChan() chan *CreatedTable { type TableWithRange struct { CreatedTable + // Range has been rewrited by rewrite rules. Range []rtree.Range } @@ -168,6 +169,10 @@ type TableIDWithFiles struct { TableID int64 Files []*backuppb.File + // RewriteRules is the rewrite rules for the specify table. + // because these rules belongs to the *one table*. + // we can hold them here. + RewriteRules *RewriteRules } // Exhaust drains all remaining errors in the channel, into a slice of errors. @@ -203,13 +208,11 @@ type TiKVRestorer interface { // After spliting, it also scatters the fresh regions. SplitRanges(ctx context.Context, ranges []rtree.Range, - rewriteRules *RewriteRules, updateCh glue.Progress, isRawKv bool) error // RestoreSSTFiles import the files to the TiKV. RestoreSSTFiles(ctx context.Context, tableIDWithFiles []TableIDWithFiles, - rewriteRules *RewriteRules, updateCh glue.Progress) error } @@ -351,7 +354,7 @@ func (b *tikvSender) splitWorker(ctx context.Context, // hence the checksum would fail. done := b.registerTableIsRestoring(result.TablesToSend) pool.ApplyOnErrorGroup(eg, func() error { - err := b.client.SplitRanges(ectx, result.Ranges, result.RewriteRules, b.updateCh, false) + err := b.client.SplitRanges(ectx, result.Ranges, b.updateCh, false) if err != nil { log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err)) return err @@ -421,7 +424,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul // There has been a worker in the `RestoreSSTFiles` procedure. // Spawning a raw goroutine won't make too many requests to TiKV. eg.Go(func() error { - e := b.client.RestoreSSTFiles(ectx, files, r.result.RewriteRules, b.updateCh) + e := b.client.RestoreSSTFiles(ectx, files, b.updateCh) if e != nil { log.Error("restore batch meet error", logutil.ShortError(e), zapTableIDWithFiles(files)) r.done() diff --git a/br/pkg/restore/range.go b/br/pkg/restore/range.go index 874398f1174e4..c36b1a82b7536 100644 --- a/br/pkg/restore/range.go +++ b/br/pkg/restore/range.go @@ -9,8 +9,6 @@ import ( berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/rtree" - "github.com/pingcap/tidb/pkg/tablecodec" - "go.uber.org/zap" ) // Range record start and end key for localStoreDir.DB @@ -21,41 +19,9 @@ type Range struct { } // SortRanges checks if the range overlapped and sort them. -func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range, error) { +func SortRanges(ranges []rtree.Range) ([]rtree.Range, error) { rangeTree := rtree.NewRangeTree() for _, rg := range ranges { - if rewriteRules != nil { - startID := tablecodec.DecodeTableID(rg.StartKey) - endID := tablecodec.DecodeTableID(rg.EndKey) - var rule *import_sstpb.RewriteRule - if startID != endID { - log.Warn("table id does not match", - logutil.Key("startKey", rg.StartKey), - logutil.Key("endKey", rg.EndKey), - zap.Int64("startID", startID), - zap.Int64("endID", endID)) - return nil, errors.Annotate(berrors.ErrRestoreTableIDMismatch, "table id mismatch") - } - rg.StartKey, rule = replacePrefix(rg.StartKey, rewriteRules) - if rule == nil { - log.Warn("cannot find rewrite rule", logutil.Key("key", rg.StartKey)) - } else { - log.Debug( - "rewrite start key", - logutil.Key("key", rg.StartKey), logutil.RewriteRule(rule)) - } - oldKey := rg.EndKey - rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules) - if rule == nil { - log.Warn("cannot find rewrite rule", logutil.Key("key", rg.EndKey)) - } else { - log.Debug( - "rewrite end key", - logutil.Key("origin-key", oldKey), - logutil.Key("key", rg.EndKey), - logutil.RewriteRule(rule)) - } - } if out := rangeTree.InsertRange(rg); out != nil { log.Error("insert ranges overlapped", logutil.Key("startKeyOut", out.StartKey), @@ -81,6 +47,11 @@ func (r *RewriteRules) Append(other RewriteRules) { r.Data = append(r.Data, other.Data...) } +// EmptyRewriteRule make a map of new, empty rewrite rules. +func EmptyRewriteRulesMap() map[int64]*RewriteRules { + return make(map[int64]*RewriteRules) +} + // EmptyRewriteRule make a new, empty rewrite rule. func EmptyRewriteRule() *RewriteRules { return &RewriteRules{ diff --git a/br/pkg/restore/range_test.go b/br/pkg/restore/range_test.go index 322789ec023c1..2c84e2b7f0d72 100644 --- a/br/pkg/restore/range_test.go +++ b/br/pkg/restore/range_test.go @@ -33,7 +33,11 @@ func TestSortRange(t *testing.T) { EndKey: append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...), Files: nil, }, } - rs1, err := SortRanges(ranges1, rewriteRules) + for i, rg := range ranges1 { + tmp, _ := RewriteRange(&rg, rewriteRules) + ranges1[i] = *tmp + } + rs1, err := SortRanges(ranges1) require.NoErrorf(t, err, "sort range1 failed: %v", err) rangeEquals(t, rs1, []rtree.Range{ { @@ -48,13 +52,19 @@ func TestSortRange(t *testing.T) { EndKey: append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...), Files: nil, }, } - _, err = SortRanges(ranges2, rewriteRules) - require.Error(t, err) - require.Regexp(t, "table id mismatch.*", err.Error()) + for _, rg := range ranges2 { + _, err := RewriteRange(&rg, rewriteRules) + require.Error(t, err) + require.Regexp(t, "table id mismatch.*", err.Error()) + } ranges3 := initRanges() rewriteRules1 := initRewriteRules() - rs3, err := SortRanges(ranges3, rewriteRules1) + for i, rg := range ranges3 { + tmp, _ := RewriteRange(&rg, rewriteRules1) + ranges3[i] = *tmp + } + rs3, err := SortRanges(ranges3) require.NoErrorf(t, err, "sort range1 failed: %v", err) rangeEquals(t, rs3, []rtree.Range{ {StartKey: []byte("bbd"), EndKey: []byte("bbf"), Files: nil}, diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 43052e8ef6588..bc5691ed0b285 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -64,7 +64,6 @@ type OnSplitFunc func(key [][]byte) func (rs *RegionSplitter) ExecuteSplit( ctx context.Context, ranges []rtree.Range, - rewriteRules *RewriteRules, storeCount int, isRawKv bool, onSplit OnSplitFunc, @@ -82,7 +81,7 @@ func (rs *RegionSplitter) ExecuteSplit( // Sort the range for getting the min and max key of the ranges // TODO: this sort may not needed if we sort tables after creatation outside. - sortedRanges, errSplit := SortRanges(ranges, rewriteRules) + sortedRanges, errSplit := SortRanges(ranges) if errSplit != nil { return errors.Trace(errSplit) } @@ -625,7 +624,7 @@ func (helper *LogSplitHelper) splitRegionByPoints( startKey = point } - return regionSplitter.ExecuteSplit(ctx, ranges, nil, 3, false, func([][]byte) {}) + return regionSplitter.ExecuteSplit(ctx, ranges, 3, false, func([][]byte) {}) } select { case <-ctx.Done(): diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index a2a266bf51bdc..b46629a7235e3 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -249,11 +249,10 @@ func TestScanEmptyRegion(t *testing.T) { ranges := initRanges() // make ranges has only one ranges = ranges[0:1] - rewriteRules := initRewriteRules() regionSplitter := NewRegionSplitter(client) ctx := context.Background() - err := regionSplitter.ExecuteSplit(ctx, ranges, rewriteRules, 1, false, func(key [][]byte) {}) + err := regionSplitter.ExecuteSplit(ctx, ranges, 1, false, func(key [][]byte) {}) // should not return error with only one range entry require.NoError(t, err) } @@ -375,11 +374,16 @@ func runWaitScatter(t *testing.T, client *TestClient) { func runTestSplitAndScatterWith(t *testing.T, client *TestClient) { ranges := initRanges() - rewriteRules := initRewriteRules() regionSplitter := NewRegionSplitter(client) - ctx := context.Background() - err := regionSplitter.ExecuteSplit(ctx, ranges, rewriteRules, 1, false, func(key [][]byte) {}) + + rules := initRewriteRules() + for i, rg := range ranges { + tmp, err := RewriteRange(&rg, rules) + require.NoError(t, err) + ranges[i] = *tmp + } + err := regionSplitter.ExecuteSplit(ctx, ranges, 1, false, func(key [][]byte) {}) require.NoError(t, err) regions := client.GetAllRegions() if !validateRegions(regions) { @@ -426,7 +430,7 @@ func TestRawSplit(t *testing.T) { ctx := context.Background() regionSplitter := NewRegionSplitter(client) - err := regionSplitter.ExecuteSplit(ctx, ranges, nil, 1, true, func(key [][]byte) {}) + err := regionSplitter.ExecuteSplit(ctx, ranges, 1, true, func(key [][]byte) {}) require.NoError(t, err) regions := client.GetAllRegions() expectedKeys := []string{"", "aay", "bba", "bbh", "cca", ""} @@ -627,7 +631,7 @@ type fakeRestorer struct { tableIDIsInsequence bool } -func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules, updateCh glue.Progress, isRawKv bool) error { +func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, updateCh glue.Progress, isRawKv bool) error { f.mu.Lock() defer f.mu.Unlock() @@ -644,7 +648,7 @@ func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, re return nil } -func (f *fakeRestorer) RestoreSSTFiles(ctx context.Context, tableIDWithFiles []TableIDWithFiles, rewriteRules *RewriteRules, updateCh glue.Progress) error { +func (f *fakeRestorer) RestoreSSTFiles(ctx context.Context, tableIDWithFiles []TableIDWithFiles, updateCh glue.Progress) error { f.mu.Lock() defer f.mu.Unlock() diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index d5dfd0ffddb3e..e8d0ad3cee8e4 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -366,8 +366,8 @@ func GoValidateFileRanges( } } // Merge small ranges to reduce split and scatter regions. - ranges, stat, err := MergeFileRanges( - files, splitSizeBytes, splitKeyCount) + ranges, stat, err := MergeAndRewriteFileRanges( + files, t.RewriteRule, splitSizeBytes, splitKeyCount) if err != nil { errCh <- err return @@ -507,7 +507,6 @@ func SplitRanges( ctx context.Context, client *Client, ranges []rtree.Range, - rewriteRules *RewriteRules, updateCh glue.Progress, isRawKv bool, ) error { @@ -518,7 +517,7 @@ func SplitRanges( isRawKv, )) - return splitter.ExecuteSplit(ctx, ranges, rewriteRules, client.GetStoreCount(), isRawKv, func(keys [][]byte) { + return splitter.ExecuteSplit(ctx, ranges, client.GetStoreCount(), isRawKv, func(keys [][]byte) { for range keys { updateCh.Inc() } diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 0e4a89a99fe5e..3e79525e6a1fb 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -137,8 +137,8 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR } summary.CollectInt("restore files", len(files)) - ranges, _, err := restore.MergeFileRanges( - files, kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value) + ranges, _, err := restore.MergeAndRewriteFileRanges( + files, nil, kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value) if err != nil { return errors.Trace(err) } @@ -153,7 +153,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR !cfg.LogProgress) // RawKV restore does not need to rewrite keys. - err = restore.SplitRanges(ctx, client, ranges, nil, updateCh, true) + err = restore.SplitRanges(ctx, client, ranges, updateCh, true) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/restore_txn.go b/br/pkg/task/restore_txn.go index 15086e7ca72b5..56c221a877105 100644 --- a/br/pkg/task/restore_txn.go +++ b/br/pkg/task/restore_txn.go @@ -78,8 +78,8 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) } summary.CollectInt("restore files", len(files)) - ranges, _, err := restore.MergeFileRanges( - files, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) + ranges, _, err := restore.MergeAndRewriteFileRanges( + files, nil, conn.DefaultMergeRegionSizeBytes, conn.DefaultMergeRegionKeyCount) if err != nil { return errors.Trace(err) } @@ -93,7 +93,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) !cfg.LogProgress) // RawKV restore does not need to rewrite keys. - err = restore.SplitRanges(ctx, client, ranges, nil, updateCh, false) + err = restore.SplitRanges(ctx, client, ranges, updateCh, false) if err != nil { return errors.Trace(err) } From a1d5a5a79e429b1d11e05f752b4678c4ac19b149 Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Tue, 12 Mar 2024 19:39:46 +0800 Subject: [PATCH 5/5] disttask: fix flaky pause test (#51689) close pingcap/tidb#51684 --- .../integrationtests/framework_pause_and_resume_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/disttask/framework/integrationtests/framework_pause_and_resume_test.go b/pkg/disttask/framework/integrationtests/framework_pause_and_resume_test.go index 5889c86deb363..fde32079a0df7 100644 --- a/pkg/disttask/framework/integrationtests/framework_pause_and_resume_test.go +++ b/pkg/disttask/framework/integrationtests/framework_pause_and_resume_test.go @@ -37,7 +37,12 @@ func CheckSubtasksState(ctx context.Context, t *testing.T, taskID int64, state p require.NoError(t, err) historySubTasksCnt, err := testutil.GetSubtasksFromHistoryByTaskID(ctx, mgr, taskID) require.NoError(t, err) - require.Equal(t, expectedCnt, cntByStatesStepOne[state]+cntByStatesStepTwo[state]+int64(historySubTasksCnt)) + // all subtasks moved to history. + if historySubTasksCnt != 0 { + require.Equal(t, expectedCnt, int64(historySubTasksCnt)) + } else { + require.Equal(t, expectedCnt, cntByStatesStepOne[state]+cntByStatesStepTwo[state]) + } } func TestFrameworkPauseAndResume(t *testing.T) {