Skip to content

Commit

Permalink
Merge branch 'master' into fk-plan2
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Dec 1, 2022
2 parents 2dfd99e + 9d9eaca commit fc7bc74
Show file tree
Hide file tree
Showing 94 changed files with 6,101 additions and 4,035 deletions.
20 changes: 20 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,23 @@ http_archive(
load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps")

protobuf_deps()

http_archive(
name = "remote_java_tools",
sha256 = "5cd59ea6bf938a1efc1e11ea562d37b39c82f76781211b7cd941a2346ea8484d",
urls = [
"http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip",
"https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools-v11.9.zip",
"https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools-v11.9.zip",
],
)

http_archive(
name = "remote_java_tools_linux",
sha256 = "512582cac5b7ea7974a77b0da4581b21f546c9478f206eedf54687eeac035989",
urls = [
"http://ats.apps.svc/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip",
"https://mirror.bazel.build/bazel_java_tools/releases/java/v11.9/java_tools_linux-v11.9.zip",
"https://github.com/bazelbuild/java_tools/releases/download/java_v11.9/java_tools_linux-v11.9.zip",
],
)
2 changes: 1 addition & 1 deletion bindinfo/bind_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (c *bindCache) GetBindRecordBySQLDigest(sqlDigest string) (*BindRecord, err
return nil, errors.New("more than 1 binding matched")
}
if len(bindings) == 0 || len(bindings[0].Bindings) == 0 {
return nil, errors.New("can't find any binding for `" + sqlDigest + "`")
return nil, errors.New("can't find any binding for '" + sqlDigest + "'")
}
return bindings[0], nil
}
Expand Down
53 changes: 52 additions & 1 deletion bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,57 @@ func TestDropBindBySQLDigest(t *testing.T) {
}

// exception cases
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", "1"), "can't find any binding for `1`")
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", "1"), "can't find any binding for '1'")
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", ""), "sql digest is empty")
}

func TestCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(id int primary key, a int, b int, key(a))")
tk.MustExec("create table t2(id int primary key, a int, b int, key(a))")

var testCases = []struct {
sqls []string
hint string
}{
{
sqls: []string{
"select %s * from t1, t2 where t1.id = t2.id",
"select %s * from test.t1, t2 where t1.id = t2.id",
"select %s * from test.t1, test.t2 where t1.id = t2.id",
"select %s * from t1, test.t2 where t1.id = t2.id",
},
hint: "/*+ merge_join(t1, t2) */",
},
{
sqls: []string{
"select %s * from t1 where a = 1",
"select %s * from test.t1 where a = 1",
},
hint: "/*+ ignore_index(t, a) */",
},
}

for _, testCase := range testCases {
for _, bind := range testCase.sqls {
stmtsummary.StmtSummaryByDigestMap.Clear()
bindSQL := fmt.Sprintf(bind, testCase.hint)
tk.MustExec(bindSQL)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", bindSQL)).Rows()
tk.MustExec(fmt.Sprintf("create session binding from history using plan digest '%s'", planDigest[0][0]))
for _, sql := range testCase.sqls {
tk.MustExec(fmt.Sprintf(sql, ""))
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
}
}
}

// exception cases
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", "1"), "can't find any plans for '1'")
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", ""), "plan digest is empty")
}
1 change: 1 addition & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus/promhttp",
"@com_github_shurcool_httpgzip//:httpgzip",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ const (
CREATE SCHEMA IF NOT EXISTS %s;
`

syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
conflictErrorTableName = "conflict_error_v1"
syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v1"

createSyntaxErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` (
Expand All @@ -69,7 +70,7 @@ const (
`

createConflictErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + conflictErrorTableName + ` (
CREATE TABLE IF NOT EXISTS %s.` + ConflictErrorTableName + ` (
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand All @@ -91,15 +92,15 @@ const (
`

insertIntoConflictErrorData = `
INSERT INTO %s.` + conflictErrorTableName + `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
VALUES
`

sqlValuesConflictErrorData = "(?,?,'PRIMARY',?,?,?,?,raw_key,raw_value)"

insertIntoConflictErrorIndex = `
INSERT INTO %s.` + conflictErrorTableName + `
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
VALUES
`
Expand All @@ -108,7 +109,7 @@ const (

selectConflictKeys = `
SELECT _tidb_rowid, raw_handle, raw_row
FROM %s.` + conflictErrorTableName + `
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND _tidb_rowid >= ? and _tidb_rowid < ?
ORDER BY _tidb_rowid LIMIT ?;
`
Expand Down Expand Up @@ -468,7 +469,7 @@ func (em *ErrorManager) LogErrorDetails() {
em.logger.Warn(fmtErrMsg(errCnt, "data type", ""))
}
if errCnt := em.conflictError(); errCnt > 0 {
em.logger.Warn(fmtErrMsg(errCnt, "data type", conflictErrorTableName))
em.logger.Warn(fmtErrMsg(errCnt, "data type", ConflictErrorTableName))
}
}

Expand Down Expand Up @@ -511,7 +512,7 @@ func (em *ErrorManager) Output() string {
}
if errCnt := em.conflictError(); errCnt > 0 {
count++
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(conflictErrorTableName)})
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictErrorTableName)})
}

res := "\nImport Data Error Summary: \n"
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
Glue: g,
CheckpointStorage: o.checkpointStorage,
CheckpointName: o.checkpointName,
DupIndicator: o.dupIndicator,
}

var procedure *restore.Controller
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ type Controller struct {
diskQuotaState atomic.Int32
compactState atomic.Int32
status *LightningStatus
dupIndicator *atomic.Bool

preInfoGetter PreRestoreInfoGetter
precheckItemBuilder *PrecheckItemBuilder
Expand Down Expand Up @@ -263,6 +264,8 @@ type ControllerParam struct {
CheckpointStorage storage.ExternalStorage
// when CheckpointStorage is not nil, save file checkpoint to it with this name
CheckpointName string
// DupIndicator can expose the duplicate detection result to the caller
DupIndicator *atomic.Bool
}

func NewRestoreController(
Expand Down Expand Up @@ -430,6 +433,7 @@ func NewRestoreControllerWithPauser(
errorMgr: errorMgr,
status: p.Status,
taskMgr: nil,
dupIndicator: p.DupIndicator,

preInfoGetter: preInfoGetter,
precheckItemBuilder: preCheckBuilder,
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,11 @@ func (tr *TableRestore) postProcess(
}
}

if rc.dupIndicator != nil {
tr.logger.Debug("set dupIndicator", zap.Bool("has-duplicate", hasDupe))
rc.dupIndicator.CompareAndSwap(false, hasDupe)
}

nextStage := checkpoints.CheckpointStatusChecksummed
if rc.cfg.PostRestore.Checksum != config.OpLevelOff && !hasDupe && needChecksum {
if cp.Checksum.SumKVS() > 0 || baseTotalChecksum.SumKVS() > 0 {
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/run_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/util/promutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -30,6 +31,7 @@ type options struct {
promFactory promutil.Factory
promRegistry promutil.Registry
logger log.Logger
dupIndicator *atomic.Bool
}

type Option func(*options)
Expand Down Expand Up @@ -81,3 +83,10 @@ func WithLogger(logger *zap.Logger) Option {
o.logger = log.Logger{Logger: logger}
}
}

// WithDupIndicator sets a *bool to indicate duplicate detection has found duplicate data.
func WithDupIndicator(b *atomic.Bool) Option {
return func(o *options) {
o.dupIndicator = b
}
}
11 changes: 10 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,9 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau
return errors.Annotate(berrors.ErrRestoreIncompatibleSys, "missed system table: "+table.Info.Name.O)
}
backupTi := table.Info
if len(ti.Columns) != len(backupTi.Columns) {
// skip checking the number of columns in mysql.user table,
// because higher versions of TiDB may add new columns.
if len(ti.Columns) != len(backupTi.Columns) && backupTi.Name.L != sysUserTableName {
log.Error("column count mismatch",
zap.Stringer("table", table.Info.Name),
zap.Int("col in cluster", len(ti.Columns)),
Expand All @@ -959,6 +961,13 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau
col := ti.Columns[i]
backupCol := backupColMap[col.Name.L]
if backupCol == nil {
// skip when the backed up mysql.user table is missing columns.
if backupTi.Name.L == sysUserTableName {
log.Warn("missing column in backup data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
continue
}
log.Error("missing column in backup data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
Expand Down
28 changes: 16 additions & 12 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,14 @@ func TestCheckSysTableCompatibility(t *testing.T) {
userTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("user"))
require.NoError(t, err)

// column count mismatch
// user table in cluster have more columns(success)
mockedUserTI := userTI.Clone()
mockedUserTI.Columns = mockedUserTI.Columns[:len(mockedUserTI.Columns)-1]
userTI.Columns = append(userTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")})
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedUserTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))
require.NoError(t, err)

// column order mismatch(success)
mockedUserTI = userTI.Clone()
Expand All @@ -213,15 +213,6 @@ func TestCheckSysTableCompatibility(t *testing.T) {
}})
require.NoError(t, err)

// missing column
mockedUserTI = userTI.Clone()
mockedUserTI.Columns[0].Name = model.NewCIStr("new-name")
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedUserTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))

// incompatible column type
mockedUserTI = userTI.Clone()
mockedUserTI.Columns[0].FieldType.SetFlen(2000) // Columns[0] is `Host` char(255)
Expand All @@ -238,6 +229,19 @@ func TestCheckSysTableCompatibility(t *testing.T) {
Info: mockedUserTI,
}})
require.NoError(t, err)

// use the mysql.db table to test for column count mismatch.
dbTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("db"))
require.NoError(t, err)

// other system tables in cluster have more columns(failed)
mockedDBTI := dbTI.Clone()
dbTI.Columns = append(dbTI.Columns, &model.ColumnInfo{Name: model.NewCIStr("new-name")})
err = client.CheckSysTableCompatibility(cluster.Domain, []*metautil.Table{{
DB: tmpSysDB,
Info: mockedDBTI,
}})
require.True(t, berrors.ErrRestoreIncompatibleSys.Equal(err))
}

func TestInitFullClusterRestore(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"bytes"
"context"
"crypto/tls"
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -247,6 +249,8 @@ type FileImporter struct {
rawStartKey []byte
rawEndKey []byte
supportMultiIngest bool

cacheKey string
}

// NewFileImporter returns a new file importClient.
Expand All @@ -261,6 +265,7 @@ func NewFileImporter(
backend: backend,
importClient: importClient,
isRawKvMode: isRawKvMode,
cacheKey: fmt.Sprintf("BR-%s-%d", time.Now().Format("20060102150405"), rand.Int63()),
}
}

Expand Down Expand Up @@ -636,6 +641,7 @@ func (importer *FileImporter) downloadSST(
Name: file.GetName(),
RewriteRule: rule,
CipherInfo: cipher,
StorageCacheId: importer.cacheKey,
}
log.Debug("download SST",
logutil.SSTMeta(&sstMeta),
Expand Down Expand Up @@ -715,6 +721,7 @@ func (importer *FileImporter) downloadRawKVSST(
RewriteRule: rule,
IsRawKv: true,
CipherInfo: cipher,
StorageCacheId: importer.cacheKey,
}
log.Debug("download SST", logutil.SSTMeta(&sstMeta), logutil.Region(regionInfo.Region))

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"go.uber.org/zap"
)

const sysUserTableName = "user"

var statsTables = map[string]struct{}{
"stats_buckets": {},
"stats_extended": {},
Expand Down
Loading

0 comments on commit fc7bc74

Please sign in to comment.