Skip to content

Commit

Permalink
Merge branch 'master' into chore/typo
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleFall authored Oct 18, 2022
2 parents 8f46997 + 5cb35c6 commit 25ce455
Show file tree
Hide file tree
Showing 110 changed files with 2,628 additions and 1,238 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
startup --host_jvm_args=-Xmx6g
startup --host_jvm_args=-Xmx5g
startup --unlimit_coredumps

run:ci --color=yes
Expand Down
2 changes: 1 addition & 1 deletion Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,6 @@ DUMPLING_GOTEST := CGO_ENABLED=1 GO111MODULE=on go test -ldflags '$(DUMPLING_LD
TEST_COVERAGE_DIR := "test_coverage"

ifneq ("$(CI)", "0")
BAZEL_GLOBAL_CONFIG := --output_user_root=/home/jenkins/.tidb/tmp
BAZEL_GLOBAL_CONFIG := --output_user_root=/home/jenkins/.tidb/tmp --host_jvm_args=-XX:+UnlockExperimentalVMOptions --host_jvm_args=-XX:+UseZGC
BAZEL_CMD_CONFIG := --config=ci --repository_cache=/home/jenkins/.tidb/tmp
endif
1 change: 1 addition & 0 deletions bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
],
embed = [":bindinfo"],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//config",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/glue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//ddl",
"//domain",
"//kv",
"//parser/model",
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package glue
import (
"context"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -42,7 +43,7 @@ type Session interface {
Execute(ctx context.Context, sql string) error
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
GetGlobalVariable(name string) (string, error)
Expand All @@ -51,7 +52,7 @@ type Session interface {

// BatchCreateTableSession is an interface to batch create table parallelly
type BatchCreateTableSession interface {
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
}

// Progress is an interface recording the current execution progress.
Expand Down
13 changes: 7 additions & 6 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
var dbName model.CIStr

Expand All @@ -231,7 +231,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore)
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, append(cs, ddl.OnExistIgnore)...)
if err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
Expand All @@ -245,7 +245,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error {
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
Expand All @@ -259,7 +259,8 @@ func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, tabl
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore)

return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...)
}

// Close implements glue.Session.
Expand Down Expand Up @@ -349,13 +350,13 @@ func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.P
}

// CreateTables implements glue.BatchCreateTableSession.
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTable implements glue.Session.
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error {
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func setDatumByString(d *types.Datum, v string, meta *parquet.SchemaElement) {
ts = ts.UTC()
v = ts.Format(utcTimeLayout)
}
d.SetString(v, "")
d.SetString(v, "utf8mb4_bin")
}

func binaryToDecimalStr(rawBytes []byte, scale int) string {
Expand Down Expand Up @@ -515,20 +515,20 @@ func setDatumByInt(d *types.Datum, v int64, meta *parquet.SchemaElement) error {
}
val := fmt.Sprintf("%0*d", minLen, v)
dotIndex := len(val) - int(*meta.Scale)
d.SetString(val[:dotIndex]+"."+val[dotIndex:], "")
d.SetString(val[:dotIndex]+"."+val[dotIndex:], "utf8mb4_bin")
case logicalType.DATE != nil:
dateStr := time.Unix(v*86400, 0).Format("2006-01-02")
d.SetString(dateStr, "")
d.SetString(dateStr, "utf8mb4_bin")
case logicalType.TIMESTAMP != nil:
// convert all timestamp types (datetime/timestamp) to string
timeStr := formatTime(v, logicalType.TIMESTAMP.Unit, timeLayout,
utcTimeLayout, logicalType.TIMESTAMP.IsAdjustedToUTC)
d.SetString(timeStr, "")
d.SetString(timeStr, "utf8mb4_bin")
case logicalType.TIME != nil:
// convert all timestamp types (datetime/timestamp) to string
timeStr := formatTime(v, logicalType.TIME.Unit, "15:04:05.999999", "15:04:05.999999Z",
logicalType.TIME.IsAdjustedToUTC)
d.SetString(timeStr, "")
d.SetString(timeStr, "utf8mb4_bin")
default:
d.SetInt64(v)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/parquet_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestParquetParser(t *testing.T) {
verifyRow := func(i int) {
require.Equal(t, int64(i+1), reader.lastRow.RowID)
require.Len(t, reader.lastRow.Row, 2)
require.Equal(t, types.NewCollationStringDatum(strconv.Itoa(i), ""), reader.lastRow.Row[0])
require.Equal(t, types.NewCollationStringDatum(strconv.Itoa(i), "utf8mb4_bin"), reader.lastRow.Row[0])
require.Equal(t, types.NewIntDatum(int64(i)), reader.lastRow.Row[1])
}

Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/restore/get_pre_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,15 @@ INSERT INTO db01.tbl01 (ival, sval) VALUES (444, 'ddd');`
ExpectFirstRowDatums: [][]types.Datum{
{
types.NewIntDatum(1),
types.NewCollationStringDatum("name_1", ""),
types.NewCollationStringDatum("name_1", "utf8mb4_bin"),
},
{
types.NewIntDatum(2),
types.NewCollationStringDatum("name_2", ""),
types.NewCollationStringDatum("name_2", "utf8mb4_bin"),
},
{
types.NewIntDatum(3),
types.NewCollationStringDatum("name_3", ""),
types.NewCollationStringDatum("name_3", "utf8mb4_bin"),
},
},
ExpectColumns: []string{"id", "name"},
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//br/pkg/metautil",
"//br/pkg/pdutil",
"//br/pkg/redact",
"//br/pkg/restore/prealloc_table_id",
"//br/pkg/restore/split",
"//br/pkg/restore/tiflashrec",
"//br/pkg/rtree",
Expand All @@ -41,6 +42,7 @@ go_library(
"//br/pkg/summary",
"//br/pkg/utils",
"//config",
"//ddl",
"//ddl/util",
"//domain",
"//kv",
Expand Down Expand Up @@ -143,6 +145,7 @@ go_test(
"//types",
"//util/codec",
"//util/mathutil",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
29 changes: 29 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
"github.com/pingcap/tidb/br/pkg/rtree"
Expand Down Expand Up @@ -173,6 +174,9 @@ type Client struct {

// see RestoreCommonConfig.WithSysTable
withSysTable bool

// the successfully preallocated table IDs.
preallocedTableIDs *tidalloc.PreallocIDs
}

// NewRestoreClient returns a new RestoreClient.
Expand Down Expand Up @@ -237,6 +241,26 @@ func (rc *Client) Init(g glue.Glue, store kv.Storage) error {
return errors.Trace(err)
}

func (rc *Client) allocTableIDs(ctx context.Context, tables []*metautil.Table) error {
rc.preallocedTableIDs = tidalloc.New(tables)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
err := kv.RunInNewTxn(ctx, rc.GetDomain().Store(), true, func(_ context.Context, txn kv.Transaction) error {
return rc.preallocedTableIDs.Alloc(meta.NewMeta(txn))
})
if err != nil {
return err
}

log.Info("registering the table IDs", zap.Stringer("ids", rc.preallocedTableIDs))
for i := range rc.dbPool {
rc.dbPool[i].registerPreallocatedIDs(rc.preallocedTableIDs)
}
if rc.db != nil {
rc.db.registerPreallocatedIDs(rc.preallocedTableIDs)
}
return nil
}

// SetPlacementPolicyMode to policy mode.
func (rc *Client) SetPlacementPolicyMode(withPlacementPolicy string) {
switch strings.ToUpper(withPlacementPolicy) {
Expand Down Expand Up @@ -724,6 +748,11 @@ func (rc *Client) GoCreateTables(
}
outCh := make(chan CreatedTable, len(tables))
rater := logutil.TraceRateOver(logutil.MetricTableCreatedCounter)
if err := rc.allocTableIDs(ctx, tables); err != nil {
errCh <- err
close(outCh)
return outCh
}

var err error

Expand Down
26 changes: 23 additions & 3 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/metautil"
prealloctableid "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand All @@ -24,7 +26,8 @@ import (

// DB is a TiDB instance, not thread-safe.
type DB struct {
se glue.Session
se glue.Session
preallocedIDs *prealloctableid.PreallocIDs
}

type UniqueTableName struct {
Expand Down Expand Up @@ -78,6 +81,10 @@ func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error)
}, supportPolicy, nil
}

func (db *DB) registerPreallocatedIDs(ids *prealloctableid.PreallocIDs) {
db.preallocedIDs = ids
}

// ExecDDL executes the query of a ddl job.
func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
var err error
Expand Down Expand Up @@ -272,6 +279,19 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table,
return nil
}

func (db *DB) tableIDAllocFilter() ddl.AllocTableIDIf {
return func(ti *model.TableInfo) bool {
if db.preallocedIDs == nil {
return true
}
prealloced := db.preallocedIDs.Prealloced(ti.ID)
if prealloced {
log.Info("reusing table ID", zap.Stringer("table", ti.Name))
}
return !prealloced
}
}

// CreateTables execute a internal CREATE TABLES.
func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error {
Expand All @@ -289,7 +309,7 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
}
}
}
if err := batchSession.CreateTables(ctx, m); err != nil {
if err := batchSession.CreateTables(ctx, m, db.tableIDAllocFilter()); err != nil {
return err
}

Expand All @@ -316,7 +336,7 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table,
}
}

err := db.se.CreateTable(ctx, table.DB.Name, table.Info)
err := db.se.CreateTable(ctx, table.DB.Name, table.Info, db.tableIDAllocFilter())
if err != nil {
log.Error("create table failed",
zap.Stringer("db", table.DB.Name),
Expand Down
22 changes: 22 additions & 0 deletions br/pkg/restore/prealloc_table_id/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "prealloc_table_id",
srcs = ["alloc.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id",
visibility = ["//visibility:public"],
deps = ["//br/pkg/metautil"],
)

go_test(
name = "prealloc_table_id_test",
srcs = ["alloc_test.go"],
flaky = True,
race = "on",
deps = [
":prealloc_table_id",
"//br/pkg/metautil",
"//parser/model",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit 25ce455

Please sign in to comment.