diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 76af2f39d6f8e..96ff8893c6c13 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -44,7 +44,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -69,6 +68,7 @@ import ( "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/ranger" "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" "go.uber.org/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -967,17 +967,43 @@ func NewLocalBackend( } func (local *local) checkMultiIngestSupport(ctx context.Context, pdCtl *pdutil.PdController) error { - stores, err := conn.GetAllTiKVStores(ctx, pdCtl.GetPDClient(), conn.SkipTiFlash) + stores, err := pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) if err != nil { return errors.Trace(err) } + + hasTiFlash := false for _, s := range stores { - client, err := local.getImportClient(ctx, s.Id) - if err != nil { - return errors.Trace(err) + if version.IsTiFlash(s) { + hasTiFlash = true + break } - _, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{}) - if err != nil { + } + + for _, s := range stores { + // skip stores that are not online + if s.State != metapb.StoreState_Up || version.IsTiFlash(s) { + continue + } + var err error + for i := 0; i < maxRetryTimes; i++ { + if i > 0 { + select { + case <-time.After(100 * time.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + } + client, err1 := local.getImportClient(ctx, s.Id) + if err1 != nil { + err = err1 + log.L().Warn("get import client failed", zap.Error(err), zap.String("store", s.Address)) + continue + } + _, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{}) + if err == nil { + break + } if st, ok := status.FromError(err); ok { if st.Code() == codes.Unimplemented { log.L().Info("multi ingest not support", zap.Any("unsupported store", s)) @@ -985,7 +1011,18 @@ func (local *local) checkMultiIngestSupport(ctx context.Context, pdCtl *pdutil.P return nil } } - return errors.Trace(err) + log.L().Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address), + zap.Int("retry", i)) + } + if err != nil { + // if the cluster contains no TiFlash store, we don't need the multi-ingest feature, + // so in this condition, downgrade the logic instead of return an error. + if hasTiFlash { + return errors.Trace(err) + } + log.L().Warn("check multi failed all retry, fallback to false", log.ShortError(err)) + local.supportMultiIngest = false + return nil } } diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index d82b7866b994a..e5f611eaeed42 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -15,6 +15,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/conn" berrors "github.com/pingcap/tidb/br/pkg/errors" @@ -229,6 +230,9 @@ func (importer *FileImporter) CheckMultiIngestSupport(ctx context.Context, pdCli } storeIDs := make([]uint64, 0, len(allStores)) for _, s := range allStores { + if s.State != metapb.StoreState_Up { + continue + } storeIDs = append(storeIDs, s.Id) } diff --git a/config/config.go b/config/config.go index 37f8774355b40..4a0fa3dc01620 100644 --- a/config/config.go +++ b/config/config.go @@ -883,8 +883,8 @@ func (c *Config) Valid() error { return err } - if c.Performance.TxnTotalSizeLimit > 10<<30 { - return fmt.Errorf("txn-total-size-limit should be less than %d", 10<<30) + if c.Performance.TxnTotalSizeLimit > 1<<40 { + return fmt.Errorf("txn-total-size-limit should be less than %d", 1<<40) } if c.Performance.MemoryUsageAlarmRatio > 1 || c.Performance.MemoryUsageAlarmRatio < 0 { diff --git a/config/config_test.go b/config/config_test.go index d851e49921a66..0abc4d374250e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -466,7 +466,9 @@ func TestTxnTotalSizeLimitValid(t *testing.T) { }{ {4 << 10, true}, {10 << 30, true}, - {10<<30 + 1, false}, + {10<<30 + 1, true}, + {1 << 40, true}, + {1<<40 + 1, false}, } for _, tt := range tests { diff --git a/ddl/attributes_sql_test.go b/ddl/attributes_sql_test.go index e8eef36f80b91..ced356acc5e11 100644 --- a/ddl/attributes_sql_test.go +++ b/ddl/attributes_sql_test.go @@ -16,19 +16,27 @@ package ddl_test import ( . "github.com/pingcap/check" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/testkit" ) func (s *testDBSuite8) TestAlterTableAttributes(c *C) { - tk := testkit.NewTestKit(c, s.store) + store, err := mockstore.NewMockStore() + c.Assert(err, IsNil) + dom, err := session.BootstrapSession(store) + c.Assert(err, IsNil) + defer func() { + dom.Close() + err := store.Close() + c.Assert(err, IsNil) + }() + tk := testkit.NewTestKit(c, store) tk.MustExec("use test") - tk.MustExec("drop table if exists t1") - defer tk.MustExec("drop table if exists t1") - tk.MustExec(`create table t1 (c int);`) // normal cases - _, err := tk.Exec(`alter table t1 attributes="nomerge";`) + _, err = tk.Exec(`alter table t1 attributes="nomerge";`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 attributes="nomerge,somethingelse";`) c.Assert(err, IsNil) @@ -47,11 +55,17 @@ func (s *testDBSuite8) TestAlterTableAttributes(c *C) { } func (s *testDBSuite8) TestAlterTablePartitionAttributes(c *C) { - tk := testkit.NewTestKit(c, s.store) + store, err := mockstore.NewMockStore() + c.Assert(err, IsNil) + dom, err := session.BootstrapSession(store) + c.Assert(err, IsNil) + defer func() { + dom.Close() + err := store.Close() + c.Assert(err, IsNil) + }() + tk := testkit.NewTestKit(c, store) tk.MustExec("use test") - tk.MustExec("drop table if exists t1") - defer tk.MustExec("drop table if exists t1") - tk.MustExec(`create table t1 (c int) PARTITION BY RANGE (c) ( PARTITION p0 VALUES LESS THAN (6), @@ -61,7 +75,7 @@ PARTITION BY RANGE (c) ( );`) // normal cases - _, err := tk.Exec(`alter table t1 partition p0 attributes="nomerge";`) + _, err = tk.Exec(`alter table t1 partition p0 attributes="nomerge";`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 partition p1 attributes="nomerge,somethingelse";`) c.Assert(err, IsNil) @@ -78,3 +92,193 @@ PARTITION BY RANGE (c) ( _, err = tk.Exec(`alter table t1 partition p1 attributes " nomerge , somethingelse ";`) c.Assert(err, IsNil) } + +func (s *testDBSuite8) TestTruncateTable(c *C) { + store, err := mockstore.NewMockStore() + c.Assert(err, IsNil) + dom, err := session.BootstrapSession(store) + c.Assert(err, IsNil) + defer func() { + dom.Close() + err := store.Close() + c.Assert(err, IsNil) + }() + tk := testkit.NewTestKit(c, store) + tk.MustExec("use test") + tk.MustExec(`create table t1 (c int) +PARTITION BY RANGE (c) ( + PARTITION p0 VALUES LESS THAN (6), + PARTITION p1 VALUES LESS THAN (11) +);`) + + // add rules + _, err = tk.Exec(`alter table t1 attributes="attr";`) + c.Assert(err, IsNil) + _, err = tk.Exec(`alter table t1 partition p0 attributes="attr1";`) + c.Assert(err, IsNil) + rows := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows() + c.Assert(len(rows), Equals, 2) + // truncate table + _, err = tk.Exec(`truncate table t1;`) + c.Assert(err, IsNil) + rows1 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows() + c.Assert(len(rows1), Equals, 2) + // check table t1's rule + c.Assert(rows1[0][0], Equals, "schema/test/t1") + c.Assert(rows1[0][2], Equals, `"attr"`) + c.Assert(rows1[0][3], Not(Equals), rows[0][3]) + c.Assert(rows1[0][4], Not(Equals), rows[0][4]) + // check partition p0's rule + c.Assert(rows1[1][0], Equals, "schema/test/t1/p0") + c.Assert(rows1[1][2], Equals, `"attr1"`) + c.Assert(rows1[1][3], Not(Equals), rows[1][3]) + c.Assert(rows1[1][4], Not(Equals), rows[1][4]) +} + +func (s *testDBSuite8) TestRenameTable(c *C) { + store, err := mockstore.NewMockStore() + c.Assert(err, IsNil) + dom, err := session.BootstrapSession(store) + c.Assert(err, IsNil) + defer func() { + dom.Close() + err := store.Close() + c.Assert(err, IsNil) + }() + tk := testkit.NewTestKit(c, store) + tk.MustExec("use test") + tk.MustExec(`create table t1 (c int) +PARTITION BY RANGE (c) ( + PARTITION p0 VALUES LESS THAN (6), + PARTITION p1 VALUES LESS THAN (11) +);`) + + // add rules + _, err = tk.Exec(`alter table t1 attributes="attr";`) + c.Assert(err, IsNil) + _, err = tk.Exec(`alter table t1 partition p0 attributes="attr1";`) + c.Assert(err, IsNil) + rows := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows() + c.Assert(len(rows), Equals, 2) + // rename table + _, err = tk.Exec(`rename table t1 to t2;`) + c.Assert(err, IsNil) + rows1 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows() + c.Assert(len(rows1), Equals, 2) + // check table t1's rule + c.Assert(rows1[0][0], Equals, "schema/test/t2") + c.Assert(rows1[0][2], Equals, `"attr"`) + c.Assert(rows1[0][3], Equals, rows[0][3]) + c.Assert(rows1[0][4], Equals, rows[0][4]) + // // check partition p0's rule + c.Assert(rows1[1][0], Equals, "schema/test/t2/p0") + c.Assert(rows1[1][2], Equals, `"attr1"`) + c.Assert(rows1[1][3], Equals, rows[1][3]) + c.Assert(rows1[1][4], Equals, rows[1][4]) +} + +func (s *testDBSuite8) TestPartition(c *C) { + store, err := mockstore.NewMockStore() + c.Assert(err, IsNil) + dom, err := session.BootstrapSession(store) + c.Assert(err, IsNil) + defer func() { + dom.Close() + err := store.Close() + c.Assert(err, IsNil) + }() + tk := testkit.NewTestKit(c, store) + tk.MustExec("use test") + tk.MustExec(`create table t1 (c int) +PARTITION BY RANGE (c) ( + PARTITION p0 VALUES LESS THAN (6), + PARTITION p1 VALUES LESS THAN (11), + PARTITION p2 VALUES LESS THAN (20) +);`) + tk.MustExec(`create table t2 (c int);`) + + // add rules + _, err = tk.Exec(`alter table t1 attributes="attr";`) + c.Assert(err, IsNil) + _, err = tk.Exec(`alter table t1 partition p0 attributes="attr1";`) + c.Assert(err, IsNil) + _, err = tk.Exec(`alter table t1 partition p1 attributes="attr2";`) + c.Assert(err, IsNil) + rows := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows() + c.Assert(len(rows), Equals, 3) + // drop partition + // partition p0's rule will be deleted + _, err = tk.Exec(`alter table t1 drop partition p0;`) + c.Assert(err, IsNil) + rows1 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows() + c.Assert(len(rows1), Equals, 2) + c.Assert(rows1[0][0], Equals, "schema/test/t1") + c.Assert(rows1[0][2], Equals, `"attr"`) + c.Assert(rows1[0][3], Equals, rows[0][3]) + c.Assert(rows1[0][4], Equals, rows[0][4]) + c.Assert(rows1[1][0], Equals, "schema/test/t1/p1") + c.Assert(rows1[1][2], Equals, `"attr2"`) + c.Assert(rows1[1][3], Equals, rows[2][3]) + c.Assert(rows1[1][4], Equals, rows[2][4]) + + // truncate partition + // partition p1's key range will be updated + _, err = tk.Exec(`alter table t1 truncate partition p1;`) + c.Assert(err, IsNil) + rows2 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows() + c.Assert(len(rows2), Equals, 2) + c.Assert(rows2[1][0], Equals, "schema/test/t1/p1") + c.Assert(rows2[1][2], Equals, `"attr2"`) + c.Assert(rows2[1][3], Not(Equals), rows1[1][3]) + c.Assert(rows2[1][4], Not(Equals), rows1[1][4]) + + // exchange partition + // partition p1's rule will be exchanged to table t2 + _, err = tk.Exec(`set @@tidb_enable_exchange_partition=1;`) + c.Assert(err, IsNil) + _, err = tk.Exec(`alter table t1 exchange partition p1 with table t2;`) + c.Assert(err, IsNil) + rows3 := tk.MustQuery(`select * from information_schema.region_label;`).Sort().Rows() + c.Assert(len(rows3), Equals, 2) + c.Assert(rows3[1][0], Equals, "schema/test/t2") + c.Assert(rows3[1][2], Equals, `"attr2"`) + c.Assert(rows3[1][3], Equals, rows2[1][3]) + c.Assert(rows3[1][4], Equals, rows2[1][4]) +} + +func (s *testDBSuite8) TestDefaultKeyword(c *C) { + store, err := mockstore.NewMockStore() + c.Assert(err, IsNil) + dom, err := session.BootstrapSession(store) + c.Assert(err, IsNil) + defer func() { + dom.Close() + err := store.Close() + c.Assert(err, IsNil) + }() + tk := testkit.NewTestKit(c, store) + tk.MustExec("use test") + tk.MustExec(`create table t1 (c int) +PARTITION BY RANGE (c) ( + PARTITION p0 VALUES LESS THAN (6), + PARTITION p1 VALUES LESS THAN (11) +);`) + + // add rules + _, err = tk.Exec(`alter table t1 attributes="attr";`) + c.Assert(err, IsNil) + _, err = tk.Exec(`alter table t1 partition p0 attributes="attr1";`) + c.Assert(err, IsNil) + rows := tk.MustQuery(`select * from information_schema.region_label;`).Rows() + c.Assert(len(rows), Equals, 2) + // reset the table t1's rule + _, err = tk.Exec(`alter table t1 attributes=default;`) + c.Assert(err, IsNil) + rows = tk.MustQuery(`select * from information_schema.region_label;`).Rows() + c.Assert(len(rows), Equals, 1) + // reset the partition p0's rule + _, err = tk.Exec(`alter table t1 partition p0 attributes=default;`) + c.Assert(err, IsNil) + rows = tk.MustQuery(`select * from information_schema.region_label;`).Rows() + c.Assert(len(rows), Equals, 0) +} diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 6a7a6175492c6..375bd2fafd669 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -247,7 +247,7 @@ func (s *testIntegrationSuite3) TestCreateTableWithPartition(c *C) { c varchar(30)) partition by range columns (a, b) (partition p0 values less than (10, 10.0))`) - tk.MustQuery("show warnings").Check(testkit.Rows("Warning 8200 Unsupported partition type, treat as normal table")) + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 8200 Unsupported partition type RANGE, treat as normal table")) tk.MustGetErrCode(`create table t31 (a int not null) partition by range( a );`, tmysql.ErrPartitionsMustBeDefined) tk.MustGetErrCode(`create table t32 (a int not null) partition by range columns( a );`, tmysql.ErrPartitionsMustBeDefined) diff --git a/ddl/partition.go b/ddl/partition.go index 4d279dbae0921..96a8326bb428b 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -335,7 +335,7 @@ func buildTablePartitionInfo(ctx sessionctx.Context, s *ast.PartitionOptions, tb } if !enable { - ctx.GetSessionVars().StmtCtx.AppendWarning(errUnsupportedCreatePartition) + ctx.GetSessionVars().StmtCtx.AppendWarning(errUnsupportedCreatePartition.GenWithStack(fmt.Sprintf("Unsupported partition type %v, treat as normal table", s.Tp))) return nil } @@ -1136,11 +1136,9 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e oldRules := make([]string, 0, len(oldIDs)) newRules := make([]*label.Rule, 0, len(oldIDs)) - oldRuleMap := make(map[string]struct{}) for _, newPartition := range newPartitions { oldRuleID := fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, newPartition.Name.L) oldRules = append(oldRules, oldRuleID) - oldRuleMap[oldRuleID] = struct{}{} } rules, err := infosync.GetLabelRules(context.TODO(), oldRules) @@ -1149,11 +1147,9 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e return ver, errors.Wrapf(err, "failed to get label rules from PD") } - for _, r := range rules { - for _, newPartition := range newPartitions { - if _, ok := oldRuleMap[r.ID]; ok { - newRules = append(newRules, r.Clone().Reset(newPartition.ID, job.SchemaName, tblInfo.Name.L, newPartition.Name.L)) - } + for idx, newPartition := range newPartitions { + if r, ok := rules[oldRules[idx]]; ok { + newRules = append(newRules, r.Clone().Reset(newPartition.ID, job.SchemaName, tblInfo.Name.L, newPartition.Name.L)) } } @@ -1380,16 +1376,8 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return 0, errors.Wrapf(err, "failed to get PD the label rules") } - var ntr, ptr *label.Rule - for _, rule := range rules { - if rule.ID == ntrID { - ntr = rule - } else if rule.ID == ptrID { - ptr = rule - } else { - logutil.BgLogger().Error("[ddl] unexpected label rule", zap.Any("rule", rule)) - } - } + ntr := rules[ntrID] + ptr := rules[ptrID] var setRules []*label.Rule var deleteRules []string diff --git a/ddl/serial_test.go b/ddl/serial_test.go index dd76ba984d633..3f9fb2009209c 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -658,7 +658,7 @@ func (s *testSerialSuite) TestCreateTableWithLikeAtTemporaryMode(c *C) { ") ENGINE=memory DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) tk.MustExec("create temporary table if not exists tb12 like tb11;") c.Assert(tk.Se.(sessionctx.Context).GetSessionVars().StmtCtx.GetWarnings()[0].Err.Error(), Equals, - infoschema.ErrTableExists.GenWithStackByArgs("tb12").Error()) + infoschema.ErrTableExists.GenWithStackByArgs("test.tb12").Error()) defer tk.MustExec("drop table if exists tb11, tb12") // Test from->local temporary, to->local temporary tk.MustExec("drop table if exists tb13, tb14") diff --git a/ddl/table.go b/ddl/table.go index 9cabfe31381f3..795d52b8c4035 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -520,31 +520,28 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro tableRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L) ids := []string{tableRuleID} + var partRuleIDs []string if tblInfo.GetPartitionInfo() != nil { for _, def := range tblInfo.GetPartitionInfo().Definitions { - ids = append(ids, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, def.Name.L)) + partRuleIDs = append(partRuleIDs, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, def.Name.L)) } } - oldRules, err := infosync.GetLabelRules(context.TODO(), ids) + oldRules, err := infosync.GetLabelRules(context.TODO(), append(ids, partRuleIDs...)) if err != nil { job.State = model.JobStateCancelled return 0, errors.Wrapf(err, "failed to get PD the label rule") } var newRules []*label.Rule - for _, r := range oldRules { - if r.ID == tableRuleID { - newRules = append(newRules, r.Clone().Reset(newTableID, job.SchemaName, tblInfo.Name.L)) - } + if r, ok := oldRules[tableRuleID]; ok { + newRules = append(newRules, r.Clone().Reset(newTableID, job.SchemaName, tblInfo.Name.L)) } if tblInfo.GetPartitionInfo() != nil { - for _, r := range oldRules { - for _, def := range tblInfo.GetPartitionInfo().Definitions { - if r.ID == fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, def.Name.L) { - newRules = append(newRules, r.Clone().Reset(def.ID, job.SchemaName, tblInfo.Name.L, def.Name.L)) - } + for idx, def := range tblInfo.GetPartitionInfo().Definitions { + if r, ok := oldRules[partRuleIDs[idx]]; ok { + newRules = append(newRules, r.Clone().Reset(def.ID, job.SchemaName, tblInfo.Name.L, def.Name.L)) } } } @@ -825,13 +822,14 @@ func checkAndRenameTables(t *meta.Meta, job *model.Job, oldSchemaID, newSchemaID oldTableName := tblInfo.Name tableRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, oldSchemaName.L, oldTableName.L) oldRuleIDs := []string{tableRuleID} - if pi := tblInfo.GetPartitionInfo(); pi != nil { - if len(pi.Definitions) != 0 { - for _, def := range pi.Definitions { - oldRuleIDs = append(oldRuleIDs, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, oldSchemaName.L, oldTableName.L, def.Name.L)) - } + var partRuleIDs []string + if tblInfo.GetPartitionInfo() != nil { + for _, def := range tblInfo.GetPartitionInfo().Definitions { + partRuleIDs = append(partRuleIDs, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, oldSchemaName.L, oldTableName.L, def.Name.L)) } } + + oldRuleIDs = append(oldRuleIDs, partRuleIDs...) oldRules, err := infosync.GetLabelRules(context.TODO(), oldRuleIDs) if err != nil { job.State = model.JobStateCancelled @@ -859,22 +857,17 @@ func checkAndRenameTables(t *meta.Meta, job *model.Job, oldSchemaID, newSchemaID } var newRules []*label.Rule - for _, r := range oldRules { - if r.ID == tableRuleID { - newRules = append(newRules, r.Clone().Reset(tblInfo.ID, job.SchemaName, tblInfo.Name.L)) - } + if r, ok := oldRules[tableRuleID]; ok { + newRules = append(newRules, r.Clone().Reset(tblInfo.ID, job.SchemaName, tblInfo.Name.L)) } if tblInfo.GetPartitionInfo() != nil { - for _, r := range oldRules { - for _, def := range tblInfo.GetPartitionInfo().Definitions { - if r.ID == fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, oldSchemaName.L, oldTableName.L, def.Name.L) { - newRules = append(newRules, r.Clone().Reset(def.ID, job.SchemaName, tblInfo.Name.L, def.Name.L)) - } + for idx, def := range tblInfo.GetPartitionInfo().Definitions { + if r, ok := oldRules[partRuleIDs[idx]]; ok { + newRules = append(newRules, r.Clone().Reset(def.ID, job.SchemaName, tblInfo.Name.L, def.Name.L)) } } } - patch := label.NewRulePatch(newRules, oldRuleIDs) err = infosync.UpdateLabelRules(context.TODO(), patch) if err != nil { diff --git a/domain/infosync/info.go b/domain/infosync/info.go index dbf62e276d210..266946ae1e51b 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -209,7 +209,7 @@ func (is *InfoSyncer) GetSessionManager() util2.SessionManager { func initLabelRuleManager(addrs []string) LabelRuleManager { if len(addrs) == 0 { - return &mockLabelManager{labelRules: map[string]*label.Rule{}} + return &mockLabelManager{labelRules: map[string][]byte{}} } return &PDLabelManager{addrs: addrs} } @@ -865,7 +865,7 @@ func GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) { } // GetLabelRules gets the label rules according to the given IDs from PD. -func GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) { +func GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rule, error) { if len(ruleIDs) == 0 { return nil, nil } diff --git a/domain/infosync/label_manager.go b/domain/infosync/label_manager.go index bf6ba634fdf24..663d3f01976fc 100644 --- a/domain/infosync/label_manager.go +++ b/domain/infosync/label_manager.go @@ -30,7 +30,7 @@ type LabelRuleManager interface { PutLabelRule(ctx context.Context, rule *label.Rule) error UpdateLabelRules(ctx context.Context, patch *label.RulePatch) error GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) - GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) + GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rule, error) } // PDLabelManager manages rules with pd @@ -71,7 +71,7 @@ func (lm *PDLabelManager) GetAllLabelRules(ctx context.Context) ([]*label.Rule, } // GetLabelRules implements GetLabelRules -func (lm *PDLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) { +func (lm *PDLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rule, error) { ids, err := json.Marshal(ruleIDs) if err != nil { return nil, err @@ -83,12 +83,17 @@ func (lm *PDLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) ( if err == nil && res != nil { err = json.Unmarshal(res, &rules) } - return rules, err + + ruleMap := make(map[string]*label.Rule, len((rules))) + for _, r := range rules { + ruleMap[r.ID] = r + } + return ruleMap, err } type mockLabelManager struct { sync.RWMutex - labelRules map[string]*label.Rule + labelRules map[string][]byte } // PutLabelRule implements PutLabelRule @@ -98,7 +103,11 @@ func (mm *mockLabelManager) PutLabelRule(ctx context.Context, rule *label.Rule) if rule == nil { return nil } - mm.labelRules[rule.ID] = rule + r, err := json.Marshal(rule) + if err != nil { + return err + } + mm.labelRules[rule.ID] = r return nil } @@ -116,7 +125,11 @@ func (mm *mockLabelManager) UpdateLabelRules(ctx context.Context, patch *label.R if p == nil { continue } - mm.labelRules[p.ID] = p + r, err := json.Marshal(p) + if err != nil { + return err + } + mm.labelRules[p.ID] = r } return nil } @@ -130,23 +143,33 @@ func (mm *mockLabelManager) GetAllLabelRules(ctx context.Context) ([]*label.Rule if labelRule == nil { continue } - r = append(r, labelRule) + var rule *label.Rule + err := json.Unmarshal(labelRule, &rule) + if err != nil { + return nil, err + } + r = append(r, rule) } return r, nil } // mockLabelManager implements GetLabelRules -func (mm *mockLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) { +func (mm *mockLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rule, error) { mm.RLock() defer mm.RUnlock() - r := make([]*label.Rule, 0, len(ruleIDs)) + r := make(map[string]*label.Rule, len(ruleIDs)) for _, ruleID := range ruleIDs { - for _, labelRule := range mm.labelRules { - if labelRule.ID == ruleID { + for id, labelRule := range mm.labelRules { + if id == ruleID { if labelRule == nil { continue } - r = append(r, labelRule) + var rule *label.Rule + err := json.Unmarshal(labelRule, &rule) + if err != nil { + return nil, err + } + r[ruleID] = rule break } } diff --git a/executor/builder.go b/executor/builder.go index 4be49f6eeb79b..5ce6c1a3136b8 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/admin" @@ -950,6 +951,7 @@ func (b *executorBuilder) buildDDL(v *plannercore.DDL) Executor { baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), stmt: v.Statement, is: b.is, + tempTableDDL: temptable.GetTemporaryTableDDL(b.ctx), } return e } diff --git a/executor/ddl.go b/executor/ddl.go index 02267089359a4..26c6f04111738 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -15,7 +15,6 @@ package executor import ( - "bytes" "context" "fmt" "strings" @@ -29,20 +28,16 @@ import ( "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" - "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/table/tables" - "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" - "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) @@ -51,9 +46,10 @@ import ( type DDLExec struct { baseExecutor - stmt ast.StmtNode - is infoschema.InfoSchema - done bool + stmt ast.StmtNode + is infoschema.InfoSchema + tempTableDDL temptable.TemporaryTableDDL + done bool } // toErr converts the error to the ErrInfoSchemaChanged when the schema is outdated. @@ -73,47 +69,6 @@ func (e *DDLExec) toErr(err error) error { return err } -// deleteTemporaryTableRecords delete temporary table data. -func deleteTemporaryTableRecords(sessionData variable.TemporaryTableData, tblID int64) error { - if sessionData == nil { - return kv.ErrNotExist - } - - tblPrefix := tablecodec.EncodeTablePrefix(tblID) - endKey := tablecodec.EncodeTablePrefix(tblID + 1) - - iter, err := sessionData.Iter(tblPrefix, endKey) - if err != nil { - return err - } - for iter.Valid() { - key := iter.Key() - if !bytes.HasPrefix(key, tblPrefix) { - break - } - - err = sessionData.DeleteTableKey(tblID, key) - if err != nil { - return err - } - - err = iter.Next() - if err != nil { - return err - } - } - - return nil -} - -func (e *DDLExec) getLocalTemporaryTables() *infoschema.LocalTemporaryTables { - tempTables := e.ctx.GetSessionVars().LocalTemporaryTables - if tempTables != nil { - return tempTables.(*infoschema.LocalTemporaryTables) - } - return nil -} - func (e *DDLExec) getLocalTemporaryTable(schema model.CIStr, table model.CIStr) (table.Table, bool) { tbl, err := e.ctx.GetInfoSchema().(infoschema.InfoSchema).TableByName(schema, table) if infoschema.ErrTableNotExists.Equal(err) { @@ -146,14 +101,9 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { if s.IsView { break } - sessVars := e.ctx.GetSessionVars() - sessVarsTempTable := sessVars.LocalTemporaryTables - if sessVarsTempTable == nil { - break - } - localTemporaryTables := sessVarsTempTable.(*infoschema.LocalTemporaryTables) + for tbIdx := len(s.Tables) - 1; tbIdx >= 0; tbIdx-- { - if _, ok := localTemporaryTables.TableByName(s.Tables[tbIdx].Schema, s.Tables[tbIdx].Name); ok { + if _, ok := e.getLocalTemporaryTable(s.Tables[tbIdx].Schema, s.Tables[tbIdx].Name); ok { localTempTablesToDrop = append(localTempTablesToDrop, s.Tables[tbIdx]) s.Tables = append(s.Tables[:tbIdx], s.Tables[tbIdx+1:]...) } @@ -263,39 +213,12 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { func (e *DDLExec) executeTruncateTable(s *ast.TruncateTableStmt) error { ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name} if _, exist := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name); exist { - return e.executeTruncateLocalTemporaryTable(s) + return e.tempTableDDL.TruncateLocalTemporaryTable(s.Table.Schema, s.Table.Name) } err := domain.GetDomain(e.ctx).DDL().TruncateTable(e.ctx, ident) return err } -func (e *DDLExec) executeTruncateLocalTemporaryTable(s *ast.TruncateTableStmt) error { - tbl, exists := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name) - if !exists { - return infoschema.ErrTableNotExists.GenWithStackByArgs(s.Table.Schema, s.Table.Name) - } - - tblInfo := tbl.Meta() - - newTbl, err := e.newTemporaryTableFromTableInfo(tblInfo.Clone()) - if err != nil { - return err - } - - localTempTables := e.getLocalTemporaryTables() - localTempTables.RemoveTable(s.Table.Schema, s.Table.Name) - if err := localTempTables.AddTable(s.Table.Schema, newTbl); err != nil { - return err - } - - err = deleteTemporaryTableRecords(e.ctx.GetSessionVars().TemporaryTableData, tblInfo.ID) - if err != nil { - return err - } - - return nil -} - func (e *DDLExec) executeRenameTable(s *ast.RenameTableStmt) error { isAlterTable := false var err error @@ -361,70 +284,23 @@ func (e *DDLExec) createSessionTemporaryTable(s *ast.CreateTableStmt) error { if !ok { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(s.Table.Schema.O) } - tbInfo, err := ddl.BuildSessionTemporaryTableInfo(e.ctx, is, s, dbInfo.Charset, dbInfo.Collate) - if err != nil { - return err - } - - tbl, err := e.newTemporaryTableFromTableInfo(tbInfo) - if err != nil { - return err - } - - // Store this temporary table to the session. - sessVars := e.ctx.GetSessionVars() - if sessVars.LocalTemporaryTables == nil { - sessVars.LocalTemporaryTables = infoschema.NewLocalTemporaryTables() - } - localTempTables := sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables) - // Init MemBuffer in session - if sessVars.TemporaryTableData == nil { - // Create this txn just for getting a MemBuffer. It's a little tricky - bufferTxn, err := e.ctx.GetStore().BeginWithOption(tikv.DefaultStartTSOption().SetStartTS(0)) - if err != nil { - return err + _, exists := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name) + if exists { + err := infoschema.ErrTableExists.GenWithStackByArgs(ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}) + if s.IfNotExists { + e.ctx.GetSessionVars().StmtCtx.AppendNote(err) + return nil } - - sessVars.TemporaryTableData = variable.NewTemporaryTableData(bufferTxn.GetMemBuffer()) - } - - err = localTempTables.AddTable(dbInfo.Name, tbl) - - if err != nil && s.IfNotExists && infoschema.ErrTableExists.Equal(err) { - e.ctx.GetSessionVars().StmtCtx.AppendNote(err) - return nil + return err } - return err -} - -func (e *DDLExec) newTemporaryTableFromTableInfo(tbInfo *model.TableInfo) (table.Table, error) { - dom := domain.GetDomain(e.ctx) - // Local temporary table uses a real table ID. - // We could mock a table ID, but the mocked ID might be identical to an existing - // real table, and then we'll get into trouble. - err := kv.RunInNewTxn(context.Background(), dom.Store(), true, func(ctx context.Context, txn kv.Transaction) error { - m := meta.NewMeta(txn) - tblID, err := m.GenGlobalID() - if err != nil { - return errors.Trace(err) - } - tbInfo.ID = tblID - tbInfo.State = model.StatePublic - return nil - }) + tbInfo, err := ddl.BuildSessionTemporaryTableInfo(e.ctx, is, s, dbInfo.Charset, dbInfo.Collate) if err != nil { - return nil, err + return err } - // AutoID is allocated in mocked.. - alloc := autoid.NewAllocatorFromTempTblInfo(tbInfo) - allocs := make([]autoid.Allocator, 0, 1) - if alloc != nil { - allocs = append(allocs, alloc) - } - return tables.TableFromMeta(allocs, tbInfo) + return e.tempTableDDL.CreateLocalTemporaryTable(dbInfo.Name, tbInfo) } func (e *DDLExec) executeCreateView(s *ast.CreateViewStmt) error { @@ -604,21 +480,14 @@ func (e *DDLExec) dropLocalTemporaryTables(localTempTables []*ast.TableName) err if len(localTempTables) == 0 { return nil } - sessVars := e.ctx.GetSessionVars() - sessVarsTempTable := sessVars.LocalTemporaryTables - if sessVarsTempTable == nil { - return nil - } - localTemporaryTables := sessVarsTempTable.(*infoschema.LocalTemporaryTables) - // if all tables are local temporary, directly drop those tables. + for _, tb := range localTempTables { - tableInfo, _ := localTemporaryTables.TableByName(tb.Schema, tb.Name) - localTemporaryTables.RemoveTable(tb.Schema, tb.Name) - err := deleteTemporaryTableRecords(sessVars.TemporaryTableData, tableInfo.Meta().ID) + err := e.tempTableDDL.DropLocalTemporaryTable(tb.Schema, tb.Name) if err != nil { return err } } + return nil } diff --git a/planner/core/integration_partition_test.go b/planner/core/integration_partition_test.go index 009b37b883c8f..e9c90a04f7578 100644 --- a/planner/core/integration_partition_test.go +++ b/planner/core/integration_partition_test.go @@ -921,6 +921,14 @@ PARTITION BY LIST COLUMNS(col1) ( tk.MustQuery(`SELECT COL1 FROM PK_LCP9290 WHERE COL1 IN (x'ffacadeb424179bc4b5c',x'ae9f733168669fa900be',x'32d8fb9da8b63508a6b8')`).Sort().Check(testkit.Rows("2\xd8\xfb\x9d\xa8\xb65\b\xa6\xb8", "\xae\x9fs1hf\x9f\xa9\x00\xbe", "\xff\xac\xad\xebBAy\xbcK\\")) } +func (s *testIntegrationPartitionSerialSuite) TestIssue27070(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database issue_27070") + tk.MustExec("use issue_27070") + tk.MustExec(`create table if not exists t (id int, create_date date NOT NULL DEFAULT '2000-01-01', PRIMARY KEY (id,create_date) ) PARTITION BY list COLUMNS(create_date) ( PARTITION p20210506 VALUES IN ("20210507"), PARTITION p20210507 VALUES IN ("20210508") )`) + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 8200 Unsupported partition type LIST, treat as normal table")) +} + func (s *testIntegrationPartitionSerialSuite) TestIssue27031(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create database issue_27031") diff --git a/table/temptable/ddl.go b/table/temptable/ddl.go new file mode 100644 index 0000000000000..7fb5f45135599 --- /dev/null +++ b/table/temptable/ddl.go @@ -0,0 +1,211 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package temptable + +import ( + "bytes" + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" + "github.com/tikv/client-go/v2/tikv" +) + +// TemporaryTableDDL is an interface providing ddl operations for temporary table +type TemporaryTableDDL interface { + CreateLocalTemporaryTable(schema model.CIStr, info *model.TableInfo) error + DropLocalTemporaryTable(schema model.CIStr, tblName model.CIStr) error + TruncateLocalTemporaryTable(schema model.CIStr, tblName model.CIStr) error +} + +// temporaryTableDDL implements temptable.TemporaryTableDDL +type temporaryTableDDL struct { + sctx sessionctx.Context +} + +func (d *temporaryTableDDL) CreateLocalTemporaryTable(schema model.CIStr, info *model.TableInfo) error { + if _, err := ensureSessionData(d.sctx); err != nil { + return err + } + + tbl, err := newTemporaryTableFromTableInfo(d.sctx, info) + if err != nil { + return err + } + + return ensureLocalTemporaryTables(d.sctx).AddTable(schema, tbl) +} + +func (d *temporaryTableDDL) DropLocalTemporaryTable(schema model.CIStr, tblName model.CIStr) error { + tbl, err := checkLocalTemporaryExistsAndReturn(d.sctx, schema, tblName) + if err != nil { + return err + } + + getLocalTemporaryTables(d.sctx).RemoveTable(schema, tblName) + return d.clearTemporaryTableRecords(tbl.Meta().ID) +} + +func (d *temporaryTableDDL) TruncateLocalTemporaryTable(schema model.CIStr, tblName model.CIStr) error { + oldTbl, err := checkLocalTemporaryExistsAndReturn(d.sctx, schema, tblName) + if err != nil { + return err + } + + oldTblInfo := oldTbl.Meta() + newTbl, err := newTemporaryTableFromTableInfo(d.sctx, oldTblInfo.Clone()) + if err != nil { + return err + } + + localTempTables := getLocalTemporaryTables(d.sctx) + localTempTables.RemoveTable(schema, tblName) + if err = localTempTables.AddTable(schema, newTbl); err != nil { + return err + } + + return d.clearTemporaryTableRecords(oldTblInfo.ID) +} + +func (d *temporaryTableDDL) clearTemporaryTableRecords(tblID int64) error { + sessionData := getSessionData(d.sctx) + if sessionData == nil { + return nil + } + + tblPrefix := tablecodec.EncodeTablePrefix(tblID) + endKey := tablecodec.EncodeTablePrefix(tblID + 1) + iter, err := sessionData.Iter(tblPrefix, endKey) + if err != nil { + return err + } + + for iter.Valid() { + key := iter.Key() + if !bytes.HasPrefix(key, tblPrefix) { + break + } + + err = sessionData.DeleteTableKey(tblID, key) + if err != nil { + return err + } + + err = iter.Next() + if err != nil { + return err + } + } + + return nil +} + +func checkLocalTemporaryExistsAndReturn(sctx sessionctx.Context, schema model.CIStr, tblName model.CIStr) (table.Table, error) { + ident := ast.Ident{Schema: schema, Name: tblName} + localTemporaryTables := getLocalTemporaryTables(sctx) + if localTemporaryTables == nil { + return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(ident.String()) + } + + tbl, exist := localTemporaryTables.TableByName(schema, tblName) + if !exist { + return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(ident.String()) + } + + return tbl, nil +} + +func getSessionData(sctx sessionctx.Context) variable.TemporaryTableData { + return sctx.GetSessionVars().TemporaryTableData +} + +func ensureSessionData(sctx sessionctx.Context) (variable.TemporaryTableData, error) { + sessVars := sctx.GetSessionVars() + if sessVars.TemporaryTableData == nil { + // Create this txn just for getting a MemBuffer. It's a little tricky + bufferTxn, err := sctx.GetStore().BeginWithOption(tikv.DefaultStartTSOption().SetStartTS(0)) + if err != nil { + return nil, err + } + + sessVars.TemporaryTableData = variable.NewTemporaryTableData(bufferTxn.GetMemBuffer()) + } + + return sessVars.TemporaryTableData, nil +} + +func getLocalTemporaryTables(sctx sessionctx.Context) *infoschema.LocalTemporaryTables { + localTemporaryTables := sctx.GetSessionVars().LocalTemporaryTables + if localTemporaryTables == nil { + return nil + } + + return localTemporaryTables.(*infoschema.LocalTemporaryTables) +} + +func ensureLocalTemporaryTables(sctx sessionctx.Context) *infoschema.LocalTemporaryTables { + sessVars := sctx.GetSessionVars() + if sessVars.LocalTemporaryTables == nil { + localTempTables := infoschema.NewLocalTemporaryTables() + sessVars.LocalTemporaryTables = localTempTables + return localTempTables + } + + return sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables) +} + +func newTemporaryTableFromTableInfo(sctx sessionctx.Context, tbInfo *model.TableInfo) (table.Table, error) { + // Local temporary table uses a real table ID. + // We could mock a table ID, but the mocked ID might be identical to an existing + // real table, and then we'll get into trouble. + err := kv.RunInNewTxn(context.Background(), sctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { + m := meta.NewMeta(txn) + tblID, err := m.GenGlobalID() + if err != nil { + return errors.Trace(err) + } + tbInfo.ID = tblID + tbInfo.State = model.StatePublic + return nil + }) + if err != nil { + return nil, err + } + + // AutoID is allocated in mocked.. + alloc := autoid.NewAllocatorFromTempTblInfo(tbInfo) + allocs := make([]autoid.Allocator, 0, 1) + if alloc != nil { + allocs = append(allocs, alloc) + } + return tables.TableFromMeta(allocs, tbInfo) +} + +// GetTemporaryTableDDL gets the temptable.TemporaryTableDDL from session context +func GetTemporaryTableDDL(sctx sessionctx.Context) TemporaryTableDDL { + return &temporaryTableDDL{ + sctx: sctx, + } +} diff --git a/table/temptable/ddl_test.go b/table/temptable/ddl_test.go new file mode 100644 index 0000000000000..a32c9ef7dbdbb --- /dev/null +++ b/table/temptable/ddl_test.go @@ -0,0 +1,234 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package temptable + +import ( + "context" + "testing" + + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/mock" + "github.com/pingcap/tidb/util/testbridge" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.WorkaroundGoCheckFlags() + goleak.VerifyTestMain(m) +} + +type TemporaryTableDDLSuite struct { + suite.Suite + sctx sessionctx.Context + ddl *temporaryTableDDL +} + +func TestTemporaryTableDDLSuit(t *testing.T) { + suite.Run(t, new(TemporaryTableDDLSuite)) +} + +func (s *TemporaryTableDDLSuite) SetupTest() { + store, err := mockstore.NewMockStore() + assert.Nil(s.T(), err) + + sctx := mock.NewContext() + sctx.Store = store + + s.sctx = sctx + s.ddl = GetTemporaryTableDDL(sctx).(*temporaryTableDDL) +} + +func (s *TemporaryTableDDLSuite) TearDownTest() { + assert.Nil(s.T(), s.sctx.GetStore().Close()) +} + +func (s *TemporaryTableDDLSuite) TestAddLocalTemporaryTable() { + assert := assert.New(s.T()) + sessVars := s.sctx.GetSessionVars() + + tbl1 := newMockTable("t1") + tbl2 := newMockTable("t2") + + assert.Nil(sessVars.LocalTemporaryTables) + assert.Nil(sessVars.TemporaryTableData) + + // insert t1 + err := s.ddl.CreateLocalTemporaryTable(model.NewCIStr("db1"), tbl1) + assert.Nil(err) + assert.NotNil(sessVars.LocalTemporaryTables) + assert.NotNil(sessVars.TemporaryTableData) + assert.Equal(int64(1), tbl1.ID) + got, exists := sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables).TableByName(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.True(exists) + assert.Equal(got.Meta(), tbl1) + + // insert t2 with data + err = s.ddl.CreateLocalTemporaryTable(model.NewCIStr("db1"), tbl2) + assert.Nil(err) + assert.Equal(int64(2), tbl2.ID) + got, exists = sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables).TableByName(model.NewCIStr("db1"), model.NewCIStr("t2")) + assert.True(exists) + assert.Equal(got.Meta(), tbl2) + + // should success to set a key for a table + k := tablecodec.EncodeRowKeyWithHandle(tbl1.ID, kv.IntHandle(1)) + err = sessVars.TemporaryTableData.SetTableKey(tbl1.ID, k, []byte("v1")) + assert.Nil(err) + + val, err := sessVars.TemporaryTableData.Get(context.Background(), k) + assert.Nil(err) + assert.Equal([]byte("v1"), val) + + // insert dup table + tbl1x := newMockTable("t1") + err = s.ddl.CreateLocalTemporaryTable(model.NewCIStr("db1"), tbl1x) + assert.True(infoschema.ErrTableExists.Equal(err)) + got, exists = sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables).TableByName(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.True(exists) + assert.Equal(got.Meta(), tbl1) + + // insert should be success for same table name in different db + err = s.ddl.CreateLocalTemporaryTable(model.NewCIStr("db2"), tbl1x) + assert.Nil(err) + got, exists = sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables).TableByName(model.NewCIStr("db2"), model.NewCIStr("t1")) + assert.Equal(int64(4), got.Meta().ID) + assert.True(exists) + assert.Equal(got.Meta(), tbl1x) + + // tbl1 still exist + got, exists = sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables).TableByName(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.True(exists) + assert.Equal(got.Meta(), tbl1) +} + +func (s *TemporaryTableDDLSuite) TestRemoveLocalTemporaryTable() { + assert := assert.New(s.T()) + sessVars := s.sctx.GetSessionVars() + + // remove when empty + err := s.ddl.DropLocalTemporaryTable(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.True(infoschema.ErrTableNotExists.Equal(err)) + + // add one table + tbl1 := newMockTable("t1") + err = s.ddl.CreateLocalTemporaryTable(model.NewCIStr("db1"), tbl1) + assert.Nil(err) + assert.Equal(int64(1), tbl1.ID) + k := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(1)) + err = sessVars.TemporaryTableData.SetTableKey(tbl1.ID, k, []byte("v1")) + assert.Nil(err) + + // remove failed when table not found + err = s.ddl.DropLocalTemporaryTable(model.NewCIStr("db1"), model.NewCIStr("t2")) + assert.True(infoschema.ErrTableNotExists.Equal(err)) + + // remove failed when table not found (same table name in different db) + err = s.ddl.DropLocalTemporaryTable(model.NewCIStr("db2"), model.NewCIStr("t1")) + assert.True(infoschema.ErrTableNotExists.Equal(err)) + + // check failed remove should have no effects + got, exists := sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables).TableByID(tbl1.ID) + assert.True(exists) + assert.Equal(got.Meta(), tbl1) + val, err := sessVars.TemporaryTableData.Get(context.Background(), k) + assert.Nil(err) + assert.Equal([]byte("v1"), val) + + // remove success + err = s.ddl.DropLocalTemporaryTable(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.Nil(err) + got, exists = sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables).TableByName(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.Nil(got) + assert.False(exists) + val, err = sessVars.TemporaryTableData.Get(context.Background(), k) + assert.Nil(err) + assert.Equal([]byte{}, val) +} + +func (s *TemporaryTableDDLSuite) TestTruncateLocalTemporaryTable() { + assert := assert.New(s.T()) + sessVars := s.sctx.GetSessionVars() + + // truncate when empty + err := s.ddl.TruncateLocalTemporaryTable(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.True(infoschema.ErrTableNotExists.Equal(err)) + assert.Nil(sessVars.LocalTemporaryTables) + assert.Nil(sessVars.TemporaryTableData) + + // add one table + tbl1 := newMockTable("t1") + err = s.ddl.CreateLocalTemporaryTable(model.NewCIStr("db1"), tbl1) + assert.Equal(int64(1), tbl1.ID) + assert.Nil(err) + k := tablecodec.EncodeRowKeyWithHandle(1, kv.IntHandle(1)) + err = sessVars.TemporaryTableData.SetTableKey(1, k, []byte("v1")) + assert.Nil(err) + + // truncate failed for table not exist + err = s.ddl.TruncateLocalTemporaryTable(model.NewCIStr("db1"), model.NewCIStr("t2")) + assert.True(infoschema.ErrTableNotExists.Equal(err)) + err = s.ddl.TruncateLocalTemporaryTable(model.NewCIStr("db2"), model.NewCIStr("t1")) + assert.True(infoschema.ErrTableNotExists.Equal(err)) + + // check failed should have no effects + got, exists := sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables).TableByName(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.True(exists) + assert.Equal(got.Meta(), tbl1) + val, err := sessVars.TemporaryTableData.Get(context.Background(), k) + assert.Nil(err) + assert.Equal([]byte("v1"), val) + + // insert a new tbl + tbl2 := newMockTable("t2") + err = s.ddl.CreateLocalTemporaryTable(model.NewCIStr("db1"), tbl2) + assert.Equal(int64(2), tbl2.ID) + assert.Nil(err) + k2 := tablecodec.EncodeRowKeyWithHandle(2, kv.IntHandle(1)) + err = sessVars.TemporaryTableData.SetTableKey(2, k2, []byte("v2")) + assert.Nil(err) + + // truncate success + err = s.ddl.TruncateLocalTemporaryTable(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.Nil(err) + got, exists = sessVars.LocalTemporaryTables.(*infoschema.LocalTemporaryTables).TableByName(model.NewCIStr("db1"), model.NewCIStr("t1")) + assert.True(exists) + assert.NotEqual(got.Meta(), tbl1) + assert.Equal(int64(3), got.Meta().ID) + val, err = sessVars.TemporaryTableData.Get(context.Background(), k) + assert.Nil(err) + assert.Equal([]byte{}, val) + + // truncate just effect its own data + val, err = sessVars.TemporaryTableData.Get(context.Background(), k2) + assert.Nil(err) + assert.Equal([]byte("v2"), val) +} + +func newMockTable(tblName string) *model.TableInfo { + c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLonglong)} + c2 := &model.ColumnInfo{ID: 2, Name: model.NewCIStr("c2"), State: model.StatePublic, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeVarchar)} + + tblInfo := &model.TableInfo{Name: model.NewCIStr(tblName), Columns: []*model.ColumnInfo{c1, c2}, PKIsHandle: true} + return tblInfo +} diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index bf1727b58e11f..e13ec31373935 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -18,20 +18,16 @@ import ( "fmt" "testing" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/testutils" ) func TestTxnUsageInfo(t *testing.T) { t.Parallel() - store, clean := newMockStore(t) + store, clean := testkit.CreateMockStore(t) defer clean() t.Run("Used", func(t *testing.T) { @@ -76,7 +72,7 @@ func TestTxnUsageInfo(t *testing.T) { func TestTemporaryTable(t *testing.T) { t.Parallel() - store, clean := newMockStore(t) + store, clean := testkit.CreateMockStore(t) defer clean() tk := testkit.NewTestKit(t, store) @@ -92,24 +88,3 @@ func TestTemporaryTable(t *testing.T) { require.NoError(t, err) require.True(t, usage.TemporaryTable) } - -func newMockStore(t *testing.T) (store kv.Storage, clean func()) { - store, err := mockstore.NewMockStore( - mockstore.WithClusterInspector(func(c testutils.Cluster) { - mockstore.BootstrapWithSingleStore(c) - }), - mockstore.WithStoreType(mockstore.EmbedUnistore), - ) - require.NoError(t, err) - - dom, err := session.BootstrapSession(store) - require.NoError(t, err) - - clean = func() { - dom.Close() - err := store.Close() - require.NoError(t, err) - } - - return -} diff --git a/telemetry/data_window_test.go b/telemetry/data_window_test.go index 83e6540a94cca..1fb233ca163be 100644 --- a/telemetry/data_window_test.go +++ b/telemetry/data_window_test.go @@ -8,8 +8,10 @@ // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package telemetry_test import ( @@ -21,7 +23,9 @@ import ( ) func TestBuiltinFunctionsUsage(t *testing.T) { - store, clean := newMockStore(t) + t.Parallel() + + store, clean := testkit.CreateMockStore(t) defer clean() tk := testkit.NewTestKit(t, store) diff --git a/telemetry/telemetry_serial_test.go b/telemetry/telemetry_serial_test.go new file mode 100644 index 0000000000000..c464d46c4c6de --- /dev/null +++ b/telemetry/telemetry_serial_test.go @@ -0,0 +1,54 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry_test + +import ( + "runtime" + "testing" + + "github.com/Jeffail/gabs/v2" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/telemetry" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/integration" +) + +func TestReport(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + + etcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer etcdCluster.Terminate(t) + store, clean := testkit.CreateMockStore(t) + defer clean() + se, err := session.CreateSession4Test(store) + require.NoError(t, err) + defer se.Close() + + config.GetGlobalConfig().EnableTelemetry = false + require.NoError(t, telemetry.ReportUsageData(se, etcdCluster.RandClient())) + + status, err := telemetry.GetTelemetryStatus(etcdCluster.RandClient()) + require.NoError(t, err) + + jsonParsed, err := gabs.ParseJSON([]byte(status)) + require.NoError(t, err) + require.True(t, jsonParsed.Path("is_error").Data().(bool)) + require.Equal(t, "telemetry is disabled", jsonParsed.Path("error_msg").Data().(string)) + require.False(t, jsonParsed.Path("is_request_sent").Data().(bool)) +} diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index 6978c0ef42da3..8b4ecfdfd3a16 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -21,11 +21,9 @@ import ( "github.com/Jeffail/gabs/v2" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/telemetry" + "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" "go.etcd.io/etcd/integration" ) @@ -37,18 +35,18 @@ func TestTrackingID(t *testing.T) { t.Parallel() - s := newSuite(t) - defer s.close() + etcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer etcdCluster.Terminate(t) - id, err := telemetry.GetTrackingID(s.etcdCluster.RandClient()) + id, err := telemetry.GetTrackingID(etcdCluster.RandClient()) require.NoError(t, err) require.Equal(t, "", id) - id2, err := telemetry.ResetTrackingID(s.etcdCluster.RandClient()) + id2, err := telemetry.ResetTrackingID(etcdCluster.RandClient()) require.NoError(t, err) require.NotEqual(t, "", id2) - id3, err := telemetry.GetTrackingID(s.etcdCluster.RandClient()) + id3, err := telemetry.GetTrackingID(etcdCluster.RandClient()) require.NoError(t, err) require.Equal(t, id2, id3) } @@ -58,19 +56,24 @@ func TestPreview(t *testing.T) { t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") } - s := newSuite(t) - defer s.close() + etcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer etcdCluster.Terminate(t) + store, clean := testkit.CreateMockStore(t) + defer clean() + se, err := session.CreateSession4Test(store) + require.NoError(t, err) + defer se.Close() config.GetGlobalConfig().EnableTelemetry = false - r, err := telemetry.PreviewUsageData(s.se, s.etcdCluster.RandClient()) + r, err := telemetry.PreviewUsageData(se, etcdCluster.RandClient()) require.NoError(t, err) require.Equal(t, "", r) - trackingID, err := telemetry.ResetTrackingID(s.etcdCluster.RandClient()) + trackingID, err := telemetry.ResetTrackingID(etcdCluster.RandClient()) require.NoError(t, err) config.GetGlobalConfig().EnableTelemetry = true - r, err = telemetry.PreviewUsageData(s.se, s.etcdCluster.RandClient()) + r, err = telemetry.PreviewUsageData(se, etcdCluster.RandClient()) require.NoError(t, err) jsonParsed, err := gabs.ParseJSON([]byte(r)) @@ -86,79 +89,17 @@ func TestPreview(t *testing.T) { require.Equal(t, "tikv", jsonParsed.Path("instances.1.instanceType").Data().(string)) require.True(t, jsonParsed.ExistsP("hardware")) - _, err = s.se.Execute(context.Background(), "SET @@global.tidb_enable_telemetry = 0") + _, err = se.Execute(context.Background(), "SET @@global.tidb_enable_telemetry = 0") require.NoError(t, err) - r, err = telemetry.PreviewUsageData(s.se, s.etcdCluster.RandClient()) + r, err = telemetry.PreviewUsageData(se, etcdCluster.RandClient()) require.NoError(t, err) require.Equal(t, "", r) - _, err = s.se.Execute(context.Background(), "SET @@global.tidb_enable_telemetry = 1") + _, err = se.Execute(context.Background(), "SET @@global.tidb_enable_telemetry = 1") config.GetGlobalConfig().EnableTelemetry = false require.NoError(t, err) - r, err = telemetry.PreviewUsageData(s.se, s.etcdCluster.RandClient()) + r, err = telemetry.PreviewUsageData(se, etcdCluster.RandClient()) require.NoError(t, err) require.Equal(t, "", r) } - -func TestReport(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - - s := newSuite(t) - defer s.close() - - config.GetGlobalConfig().EnableTelemetry = false - err := telemetry.ReportUsageData(s.se, s.etcdCluster.RandClient()) - require.NoError(t, err) - - status, err := telemetry.GetTelemetryStatus(s.etcdCluster.RandClient()) - require.NoError(t, err) - - jsonParsed, err := gabs.ParseJSON([]byte(status)) - require.NoError(t, err) - require.True(t, jsonParsed.Path("is_error").Data().(bool)) - require.Equal(t, "telemetry is disabled", jsonParsed.Path("error_msg").Data().(string)) - require.False(t, jsonParsed.Path("is_request_sent").Data().(bool)) -} - -type testSuite struct { - store kv.Storage - dom *domain.Domain - etcdCluster *integration.ClusterV3 - se session.Session - close func() -} - -func newSuite(t *testing.T) *testSuite { - suite := new(testSuite) - - store, err := mockstore.NewMockStore() - require.NoError(t, err) - suite.store = store - - session.SetSchemaLease(0) - session.DisableStats4Test() - - dom, err := session.BootstrapSession(store) - require.NoError(t, err) - suite.dom = dom - - etcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - suite.etcdCluster = etcdCluster - - se, err := session.CreateSession4Test(store) - require.NoError(t, err) - suite.se = se - - suite.close = func() { - suite.se.Close() - suite.etcdCluster.Terminate(t) - suite.dom.Close() - err = suite.store.Close() - require.NoError(t, err) - } - - return suite -}