Skip to content

Commit

Permalink
Merge branch 'master' into fix_validate_password
Browse files Browse the repository at this point in the history
  • Loading branch information
CbcWestwolf authored Dec 1, 2022
2 parents 79501fc + 78011c5 commit de2769d
Show file tree
Hide file tree
Showing 97 changed files with 11,802 additions and 9,960 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2915,8 +2915,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=",
version = "v0.0.0-20221129023506-621ec37aac7a",
sum = "h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=",
version = "v0.0.0-20221130022225-6c56ac56fe5f",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down
20 changes: 20 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,23 @@ http_archive(
load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps")

protobuf_deps()

http_archive(
name = "remote_java_tools",
sha256 = "5cd59ea6bf938a1efc1e11ea562d37b39c82f76781211b7cd941a2346ea8484d",
urls = [
"http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip",
"https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip",
"https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools-v11.9.zip",
],
)

http_archive(
name = "remote_java_tools_linux",
sha256 = "512582cac5b7ea7974a77b0da4581b21f546c9478f206eedf54687eeac035989",
urls = [
"http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip",
"https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip",
"https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools_linux-v11.9.zip",
],
)
1 change: 1 addition & 0 deletions autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//util/logutil",
"//util/mathutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/autoid",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
Expand Down
26 changes: 22 additions & 4 deletions autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/autoid"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -401,9 +402,16 @@ func (s *Service) getAlloc(dbID, tblID int64, isUnsigned bool) *autoIDValue {

func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*autoid.AutoIDResponse, error) {
if s.leaderShip != nil && !s.leaderShip.IsOwner() {
logutil.BgLogger().Info("[autoid service] Alloc AutoID fail, not leader")
return nil, errors.New("not leader")
}

failpoint.Inject("mockErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mock reload failed"))
}
})

val := s.getAlloc(req.DbID, req.TblID, req.IsUnsigned)

if req.N == 0 {
Expand All @@ -426,10 +434,13 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*
val.end = currentEnd
return nil
})
if err != nil {
return &autoid.AutoIDResponse{Errmsg: []byte(err.Error())}, nil
}
return &autoid.AutoIDResponse{
Min: currentEnd,
Max: currentEnd,
}, err
}, nil
}

val.Lock()
Expand All @@ -443,10 +454,13 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*
min, max, err = val.alloc4Signed(ctx, s.store, req.DbID, req.TblID, req.IsUnsigned, req.N, req.Increment, req.Offset)
}

if err != nil {
return &autoid.AutoIDResponse{Errmsg: []byte(err.Error())}, nil
}
return &autoid.AutoIDResponse{
Min: min,
Max: max,
}, err
}, nil
}

func (alloc *autoIDValue) forceRebase(ctx context.Context, store kv.Storage, dbID, tblID, requiredBase int64, isUnsigned bool) error {
Expand Down Expand Up @@ -478,14 +492,15 @@ func (alloc *autoIDValue) forceRebase(ctx context.Context, store kv.Storage, dbI
// req.N = 0 is handled specially, it is used to return the current auto ID value.
func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoid.RebaseResponse, error) {
if s.leaderShip != nil && !s.leaderShip.IsOwner() {
logutil.BgLogger().Info("[autoid service] Rebase() fail, not leader")
return nil, errors.New("not leader")
}

val := s.getAlloc(req.DbID, req.TblID, req.IsUnsigned)
if req.Force {
err := val.forceRebase(ctx, s.store, req.DbID, req.TblID, req.Base, req.IsUnsigned)
if err != nil {
return nil, errors.Trace(err)
return &autoid.RebaseResponse{Errmsg: []byte(err.Error())}, nil
}
}

Expand All @@ -495,5 +510,8 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi
} else {
err = val.rebase4Signed(ctx, s.store, req.DbID, req.TblID, req.Base)
}
return &autoid.RebaseResponse{}, err
if err != nil {
return &autoid.RebaseResponse{Errmsg: []byte(err.Error())}, nil
}
return &autoid.RebaseResponse{}, nil
}
17 changes: 17 additions & 0 deletions bindinfo/bind_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,23 @@ func (c *bindCache) GetBindRecord(hash, normdOrigSQL, db string) *BindRecord {
return nil
}

// GetBindRecordBySQLDigest gets the BindRecord from the cache.
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
func (c *bindCache) GetBindRecordBySQLDigest(sqlDigest string) (*BindRecord, error) {
c.lock.Lock()
defer c.lock.Unlock()
bindings := c.get(bindCacheKey(sqlDigest))
if len(bindings) > 1 {
// currently, we only allow one binding for a sql
return nil, errors.New("more than 1 binding matched")
}
if len(bindings) == 0 || len(bindings[0].Bindings) == 0 {
return nil, errors.New("can't find any binding for `" + sqlDigest + "`")
}
return bindings[0], nil
}

// GetAllBindRecords return all the bindRecords from the bindCache.
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
Expand Down
84 changes: 84 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,3 +1304,87 @@ func TestBindSQLDigest(t *testing.T) {
require.Equal(t, res[0][9], sqlDigestWithDB.String())
}
}

func TestDropBindBySQLDigest(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(pk int primary key, a int, b int, key(a), key(b))")

cases := []struct {
origin string
hint string
}{
// agg hints
{"select count(1) from t", "select /*+ hash_agg() */ count(1) from t"},
{"select count(1) from t", "select /*+ stream_agg() */ count(1) from t"},
// join hints
{"select * from t t1, t t2 where t1.a=t2.a", "select /*+ merge_join(t1, t2) */ * from t t1, t t2 where t1.a=t2.a"},
{"select * from t t1, t t2 where t1.a=t2.a", "select /*+ tidb_smj(t1, t2) */ * from t t1, t t2 where t1.a=t2.a"},
{"select * from t t1, t t2 where t1.a=t2.a", "select /*+ hash_join(t1, t2) */ * from t t1, t t2 where t1.a=t2.a"},
{"select * from t t1, t t2 where t1.a=t2.a", "select /*+ tidb_hj(t1, t2) */ * from t t1, t t2 where t1.a=t2.a"},
{"select * from t t1, t t2 where t1.a=t2.a", "select /*+ inl_join(t1, t2) */ * from t t1, t t2 where t1.a=t2.a"},
{"select * from t t1, t t2 where t1.a=t2.a", "select /*+ tidb_inlj(t1, t2) */ * from t t1, t t2 where t1.a=t2.a"},
{"select * from t t1, t t2 where t1.a=t2.a", "select /*+ inl_hash_join(t1, t2) */ * from t t1, t t2 where t1.a=t2.a"},
// index hints
{"select * from t", "select * from t use index(primary)"},
{"select * from t", "select /*+ use_index(primary) */ * from t"},
{"select * from t", "select * from t use index(a)"},
{"select * from t", "select /*+ use_index(a) */ * from t use index(a)"},
{"select * from t", "select * from t use index(b)"},
{"select * from t", "select /*+ use_index(b) */ * from t use index(b)"},
{"select a, b from t where a=1 or b=1", "select /*+ use_index_merge(t, a, b) */ a, b from t where a=1 or b=1"},
{"select * from t where a=1", "select /*+ ignore_index(t, a) */ * from t where a=1"},
// push-down hints
{"select * from t limit 10", "select /*+ limit_to_cop() */ * from t limit 10"},
{"select a, count(*) from t group by a", "select /*+ agg_to_cop() */ a, count(*) from t group by a"},
// index-merge hints
{"select a, b from t where a>1 or b>1", "select /*+ no_index_merge() */ a, b from t where a>1 or b>1"},
{"select a, b from t where a>1 or b>1", "select /*+ use_index_merge(t, a, b) */ a, b from t where a>1 or b>1"},
// runtime hints
{"select * from t", "select /*+ memory_quota(1024 MB) */ * from t"},
{"select * from t", "select /*+ max_execution_time(1000) */ * from t"},
// storage hints
{"select * from t", "select /*+ read_from_storage(tikv[t]) */ * from t"},
// others
{"select t1.a, t1.b from t t1 where t1.a in (select t2.a from t t2)", "select /*+ use_toja(true) */ t1.a, t1.b from t t1 where t1.a in (select t2.a from t t2)"},
}

h := dom.BindHandle()
// global scope
for _, c := range cases {
utilCleanBindingEnv(tk, dom)
sql := "create global binding for " + c.origin + " using " + c.hint
tk.MustExec(sql)
h.ReloadBindings()
res := tk.MustQuery(`show global bindings`).Rows()

require.Equal(t, len(res), 1)
require.Equal(t, len(res[0]), 11)
drop := fmt.Sprintf("drop global binding for sql digest '%s'", res[0][9])
tk.MustExec(drop)
require.NoError(t, h.GCBindRecord())
h.ReloadBindings()
tk.MustQuery("show global bindings").Check(testkit.Rows())
}

// session scope
for _, c := range cases {
utilCleanBindingEnv(tk, dom)
sql := "create binding for " + c.origin + " using " + c.hint
tk.MustExec(sql)
res := tk.MustQuery(`show bindings`).Rows()

require.Equal(t, len(res), 1)
require.Equal(t, len(res[0]), 11)
drop := fmt.Sprintf("drop binding for sql digest '%s'", res[0][9])
tk.MustExec(drop)
require.NoError(t, h.GCBindRecord())
tk.MustQuery("show bindings").Check(testkit.Rows())
}

// exception cases
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", "1"), "can't find any binding for `1`")
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", ""), "sql digest is empty")
}
14 changes: 14 additions & 0 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,15 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (d
return h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows(), nil
}

// DropBindRecordByDigest drop BindRecord to the storage and BindRecord int the cache.
func (h *BindHandle) DropBindRecordByDigest(sqlDigest string) (deletedRows uint64, err error) {
oldRecord, err := h.GetBindRecordBySQLDigest(sqlDigest)
if err != nil {
return 0, err
}
return h.DropBindRecord(oldRecord.OriginalSQL, strings.ToLower(oldRecord.Db), nil)
}

// SetBindRecordStatus set a BindRecord's status to the storage and bind cache.
func (h *BindHandle) SetBindRecordStatus(originalSQL string, binding *Binding, newStatus string) (ok bool, err error) {
h.bindInfo.Lock()
Expand Down Expand Up @@ -658,6 +667,11 @@ func (h *BindHandle) GetBindRecord(hash, normdOrigSQL, db string) *BindRecord {
return h.bindInfo.Load().(*bindCache).GetBindRecord(hash, normdOrigSQL, db)
}

// GetBindRecordBySQLDigest returns the BindRecord of the sql digest.
func (h *BindHandle) GetBindRecordBySQLDigest(sqlDigest string) (*BindRecord, error) {
return h.bindInfo.Load().(*bindCache).GetBindRecordBySQLDigest(sqlDigest)
}

// GetAllBindRecord returns all bind records in cache.
func (h *BindHandle) GetAllBindRecord() (bindRecords []*BindRecord) {
return h.bindInfo.Load().(*bindCache).GetAllBindRecords()
Expand Down
14 changes: 14 additions & 0 deletions bindinfo/session_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,25 @@ func (h *SessionHandle) DropBindRecord(originalSQL, db string, binding *Binding)
return nil
}

// DropBindRecordByDigest drop BindRecord in the cache.
func (h *SessionHandle) DropBindRecordByDigest(sqlDigest string) error {
oldRecord, err := h.GetBindRecordBySQLDigest(sqlDigest)
if err != nil {
return err
}
return h.DropBindRecord(oldRecord.OriginalSQL, strings.ToLower(oldRecord.Db), nil)
}

// GetBindRecord return the BindMeta of the (normdOrigSQL,db) if BindMeta exist.
func (h *SessionHandle) GetBindRecord(hash, normdOrigSQL, db string) *BindRecord {
return h.ch.GetBindRecord(hash, normdOrigSQL, db)
}

// GetBindRecordBySQLDigest return all BindMeta corresponding to sqlDigest.
func (h *SessionHandle) GetBindRecordBySQLDigest(sqlDigest string) (*BindRecord, error) {
return h.ch.GetBindRecordBySQLDigest(sqlDigest)
}

// GetAllBindRecord return all session bind info.
func (h *SessionHandle) GetAllBindRecord() (bindRecords []*BindRecord) {
return h.ch.GetAllBindRecords()
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
d := domain.GetDomain(gs.se).DDL()
var dbName model.CIStr

// Disable foreign key check when batch create tables.
gs.se.GetSessionVars().ForeignKeyChecks = false
for db, tablesInDB := range tables {
dbName = model.NewCIStr(db)
queryBuilder := strings.Builder{}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus/promhttp",
"@com_github_shurcool_httpgzip//:httpgzip",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ const (
CREATE SCHEMA IF NOT EXISTS %s;
`

syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
conflictErrorTableName = "conflict_error_v1"
syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v1"

createSyntaxErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` (
Expand All @@ -69,7 +70,7 @@ const (
`

createConflictErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + conflictErrorTableName + ` (
CREATE TABLE IF NOT EXISTS %s.` + ConflictErrorTableName + ` (
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand All @@ -91,15 +92,15 @@ const (
`

insertIntoConflictErrorData = `
INSERT INTO %s.` + conflictErrorTableName + `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
VALUES
`

sqlValuesConflictErrorData = "(?,?,'PRIMARY',?,?,?,?,raw_key,raw_value)"

insertIntoConflictErrorIndex = `
INSERT INTO %s.` + conflictErrorTableName + `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
VALUES
`
Expand All @@ -108,7 +109,7 @@ const (

selectConflictKeys = `
SELECT _tidb_rowid, raw_handle, raw_row
FROM %s.` + conflictErrorTableName + `
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND _tidb_rowid >= ? and _tidb_rowid < ?
ORDER BY _tidb_rowid LIMIT ?;
`
Expand Down Expand Up @@ -468,7 +469,7 @@ func (em *ErrorManager) LogErrorDetails() {
em.logger.Warn(fmtErrMsg(errCnt, "data type", ""))
}
if errCnt := em.conflictError(); errCnt > 0 {
em.logger.Warn(fmtErrMsg(errCnt, "data type", conflictErrorTableName))
em.logger.Warn(fmtErrMsg(errCnt, "data type", ConflictErrorTableName))
}
}

Expand Down Expand Up @@ -511,7 +512,7 @@ func (em *ErrorManager) Output() string {
}
if errCnt := em.conflictError(); errCnt > 0 {
count++
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(conflictErrorTableName)})
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictErrorTableName)})
}

res := "\nImport Data Error Summary: \n"
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
Glue: g,
CheckpointStorage: o.checkpointStorage,
CheckpointName: o.checkpointName,
DupIndicator: o.dupIndicator,
}

var procedure *restore.Controller
Expand Down
Loading

0 comments on commit de2769d

Please sign in to comment.