Skip to content

Commit

Permalink
Merge branch 'master' into issue27797
Browse files Browse the repository at this point in the history
  • Loading branch information
time-and-fate committed Sep 6, 2021
2 parents 88b502a + d6ce2ae commit 918cb53
Show file tree
Hide file tree
Showing 20 changed files with 889 additions and 340 deletions.
53 changes: 45 additions & 8 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand All @@ -69,6 +68,7 @@ import (
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/ranger"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -967,25 +967,62 @@ func NewLocalBackend(
}

func (local *local) checkMultiIngestSupport(ctx context.Context, pdCtl *pdutil.PdController) error {
stores, err := conn.GetAllTiKVStores(ctx, pdCtl.GetPDClient(), conn.SkipTiFlash)
stores, err := pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
}

hasTiFlash := false
for _, s := range stores {
client, err := local.getImportClient(ctx, s.Id)
if err != nil {
return errors.Trace(err)
if version.IsTiFlash(s) {
hasTiFlash = true
break
}
_, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
}

for _, s := range stores {
// skip stores that are not online
if s.State != metapb.StoreState_Up || version.IsTiFlash(s) {
continue
}
var err error
for i := 0; i < maxRetryTimes; i++ {
if i > 0 {
select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
}
client, err1 := local.getImportClient(ctx, s.Id)
if err1 != nil {
err = err1
log.L().Warn("get import client failed", zap.Error(err), zap.String("store", s.Address))
continue
}
_, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err == nil {
break
}
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
log.L().Info("multi ingest not support", zap.Any("unsupported store", s))
local.supportMultiIngest = false
return nil
}
}
return errors.Trace(err)
log.L().Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address),
zap.Int("retry", i))
}
if err != nil {
// if the cluster contains no TiFlash store, we don't need the multi-ingest feature,
// so in this condition, downgrade the logic instead of return an error.
if hasTiFlash {
return errors.Trace(err)
}
log.L().Warn("check multi failed all retry, fallback to false", log.ShortError(err))
local.supportMultiIngest = false
return nil
}
}

Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/conn"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand Down Expand Up @@ -229,6 +230,9 @@ func (importer *FileImporter) CheckMultiIngestSupport(ctx context.Context, pdCli
}
storeIDs := make([]uint64, 0, len(allStores))
for _, s := range allStores {
if s.State != metapb.StoreState_Up {
continue
}
storeIDs = append(storeIDs, s.Id)
}

Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ func (c *Config) Valid() error {
return err
}

if c.Performance.TxnTotalSizeLimit > 10<<30 {
return fmt.Errorf("txn-total-size-limit should be less than %d", 10<<30)
if c.Performance.TxnTotalSizeLimit > 1<<40 {
return fmt.Errorf("txn-total-size-limit should be less than %d", 1<<40)
}

if c.Performance.MemoryUsageAlarmRatio > 1 || c.Performance.MemoryUsageAlarmRatio < 0 {
Expand Down
4 changes: 3 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,9 @@ func TestTxnTotalSizeLimitValid(t *testing.T) {
}{
{4 << 10, true},
{10 << 30, true},
{10<<30 + 1, false},
{10<<30 + 1, true},
{1 << 40, true},
{1<<40 + 1, false},
}

for _, tt := range tests {
Expand Down
224 changes: 214 additions & 10 deletions ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,27 @@ package ddl_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
)

func (s *testDBSuite8) TestAlterTableAttributes(c *C) {
tk := testkit.NewTestKit(c, s.store)
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
dom, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
defer func() {
dom.Close()
err := store.Close()
c.Assert(err, IsNil)
}()
tk := testkit.NewTestKit(c, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
defer tk.MustExec("drop table if exists t1")

tk.MustExec(`create table t1 (c int);`)

// normal cases
_, err := tk.Exec(`alter table t1 attributes="nomerge";`)
_, err = tk.Exec(`alter table t1 attributes="nomerge";`)
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table t1 attributes="nomerge,somethingelse";`)
c.Assert(err, IsNil)
Expand All @@ -47,11 +55,17 @@ func (s *testDBSuite8) TestAlterTableAttributes(c *C) {
}

func (s *testDBSuite8) TestAlterTablePartitionAttributes(c *C) {
tk := testkit.NewTestKit(c, s.store)
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
dom, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
defer func() {
dom.Close()
err := store.Close()
c.Assert(err, IsNil)
}()
tk := testkit.NewTestKit(c, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
defer tk.MustExec("drop table if exists t1")

tk.MustExec(`create table t1 (c int)
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (6),
Expand All @@ -61,7 +75,7 @@ PARTITION BY RANGE (c) (
);`)

// normal cases
_, err := tk.Exec(`alter table t1 partition p0 attributes="nomerge";`)
_, err = tk.Exec(`alter table t1 partition p0 attributes="nomerge";`)
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table t1 partition p1 attributes="nomerge,somethingelse";`)
c.Assert(err, IsNil)
Expand All @@ -78,3 +92,193 @@ PARTITION BY RANGE (c) (
_, err = tk.Exec(`alter table t1 partition p1 attributes " nomerge , somethingelse ";`)
c.Assert(err, IsNil)
}

func (s *testDBSuite8) TestTruncateTable(c *C) {
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
dom, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
defer func() {
dom.Close()
err := store.Close()
c.Assert(err, IsNil)
}()
tk := testkit.NewTestKit(c, store)
tk.MustExec("use test")
tk.MustExec(`create table t1 (c int)
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11)
);`)

// add rules
_, err = tk.Exec(`alter table t1 attributes="attr";`)
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table t1 partition p0 attributes="attr1";`)
c.Assert(err, IsNil)
rows := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows()
c.Assert(len(rows), Equals, 2)
// truncate table
_, err = tk.Exec(`truncate table t1;`)
c.Assert(err, IsNil)
rows1 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows()
c.Assert(len(rows1), Equals, 2)
// check table t1's rule
c.Assert(rows1[0][0], Equals, "schema/test/t1")
c.Assert(rows1[0][2], Equals, `"attr"`)
c.Assert(rows1[0][3], Not(Equals), rows[0][3])
c.Assert(rows1[0][4], Not(Equals), rows[0][4])
// check partition p0's rule
c.Assert(rows1[1][0], Equals, "schema/test/t1/p0")
c.Assert(rows1[1][2], Equals, `"attr1"`)
c.Assert(rows1[1][3], Not(Equals), rows[1][3])
c.Assert(rows1[1][4], Not(Equals), rows[1][4])
}

func (s *testDBSuite8) TestRenameTable(c *C) {
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
dom, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
defer func() {
dom.Close()
err := store.Close()
c.Assert(err, IsNil)
}()
tk := testkit.NewTestKit(c, store)
tk.MustExec("use test")
tk.MustExec(`create table t1 (c int)
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11)
);`)

// add rules
_, err = tk.Exec(`alter table t1 attributes="attr";`)
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table t1 partition p0 attributes="attr1";`)
c.Assert(err, IsNil)
rows := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows()
c.Assert(len(rows), Equals, 2)
// rename table
_, err = tk.Exec(`rename table t1 to t2;`)
c.Assert(err, IsNil)
rows1 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows()
c.Assert(len(rows1), Equals, 2)
// check table t1's rule
c.Assert(rows1[0][0], Equals, "schema/test/t2")
c.Assert(rows1[0][2], Equals, `"attr"`)
c.Assert(rows1[0][3], Equals, rows[0][3])
c.Assert(rows1[0][4], Equals, rows[0][4])
// // check partition p0's rule
c.Assert(rows1[1][0], Equals, "schema/test/t2/p0")
c.Assert(rows1[1][2], Equals, `"attr1"`)
c.Assert(rows1[1][3], Equals, rows[1][3])
c.Assert(rows1[1][4], Equals, rows[1][4])
}

func (s *testDBSuite8) TestPartition(c *C) {
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
dom, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
defer func() {
dom.Close()
err := store.Close()
c.Assert(err, IsNil)
}()
tk := testkit.NewTestKit(c, store)
tk.MustExec("use test")
tk.MustExec(`create table t1 (c int)
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11),
PARTITION p2 VALUES LESS THAN (20)
);`)
tk.MustExec(`create table t2 (c int);`)

// add rules
_, err = tk.Exec(`alter table t1 attributes="attr";`)
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table t1 partition p0 attributes="attr1";`)
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table t1 partition p1 attributes="attr2";`)
c.Assert(err, IsNil)
rows := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows()
c.Assert(len(rows), Equals, 3)
// drop partition
// partition p0's rule will be deleted
_, err = tk.Exec(`alter table t1 drop partition p0;`)
c.Assert(err, IsNil)
rows1 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows()
c.Assert(len(rows1), Equals, 2)
c.Assert(rows1[0][0], Equals, "schema/test/t1")
c.Assert(rows1[0][2], Equals, `"attr"`)
c.Assert(rows1[0][3], Equals, rows[0][3])
c.Assert(rows1[0][4], Equals, rows[0][4])
c.Assert(rows1[1][0], Equals, "schema/test/t1/p1")
c.Assert(rows1[1][2], Equals, `"attr2"`)
c.Assert(rows1[1][3], Equals, rows[2][3])
c.Assert(rows1[1][4], Equals, rows[2][4])

// truncate partition
// partition p1's key range will be updated
_, err = tk.Exec(`alter table t1 truncate partition p1;`)
c.Assert(err, IsNil)
rows2 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows()
c.Assert(len(rows2), Equals, 2)
c.Assert(rows2[1][0], Equals, "schema/test/t1/p1")
c.Assert(rows2[1][2], Equals, `"attr2"`)
c.Assert(rows2[1][3], Not(Equals), rows1[1][3])
c.Assert(rows2[1][4], Not(Equals), rows1[1][4])

// exchange partition
// partition p1's rule will be exchanged to table t2
_, err = tk.Exec(`set @@tidb_enable_exchange_partition=1;`)
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table t1 exchange partition p1 with table t2;`)
c.Assert(err, IsNil)
rows3 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows()
c.Assert(len(rows3), Equals, 2)
c.Assert(rows3[1][0], Equals, "schema/test/t2")
c.Assert(rows3[1][2], Equals, `"attr2"`)
c.Assert(rows3[1][3], Equals, rows2[1][3])
c.Assert(rows3[1][4], Equals, rows2[1][4])
}

func (s *testDBSuite8) TestDefaultKeyword(c *C) {
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
dom, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
defer func() {
dom.Close()
err := store.Close()
c.Assert(err, IsNil)
}()
tk := testkit.NewTestKit(c, store)
tk.MustExec("use test")
tk.MustExec(`create table t1 (c int)
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11)
);`)

// add rules
_, err = tk.Exec(`alter table t1 attributes="attr";`)
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table t1 partition p0 attributes="attr1";`)
c.Assert(err, IsNil)
rows := tk.MustQuery(`select * from information_schema.region_label;`).Rows()
c.Assert(len(rows), Equals, 2)
// reset the table t1's rule
_, err = tk.Exec(`alter table t1 attributes=default;`)
c.Assert(err, IsNil)
rows = tk.MustQuery(`select * from information_schema.region_label;`).Rows()
c.Assert(len(rows), Equals, 1)
// reset the partition p0's rule
_, err = tk.Exec(`alter table t1 partition p0 attributes=default;`)
c.Assert(err, IsNil)
rows = tk.MustQuery(`select * from information_schema.region_label;`).Rows()
c.Assert(len(rows), Equals, 0)
}
2 changes: 1 addition & 1 deletion ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *testIntegrationSuite3) TestCreateTableWithPartition(c *C) {
c varchar(30))
partition by range columns (a, b)
(partition p0 values less than (10, 10.0))`)
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 8200 Unsupported partition type, treat as normal table"))
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 8200 Unsupported partition type RANGE, treat as normal table"))

tk.MustGetErrCode(`create table t31 (a int not null) partition by range( a );`, tmysql.ErrPartitionsMustBeDefined)
tk.MustGetErrCode(`create table t32 (a int not null) partition by range columns( a );`, tmysql.ErrPartitionsMustBeDefined)
Expand Down
Loading

0 comments on commit 918cb53

Please sign in to comment.