From 1dced1bed1f04036345560f964c0eff0163ffbc8 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Wed, 22 Jun 2022 16:28:40 +0800 Subject: [PATCH 01/23] Add list columns test --- ddl/db_partition_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 4c3bdfb66faf9..1382e7d32dd5a 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -1959,6 +1959,39 @@ func TestAlterTableExchangePartition(t *testing.T) { tk.MustQuery("select * from e7").Check(testkit.Rows("1")) tk.MustGetErrCode("alter table e6 exchange partition p1 with table e7", tmysql.ErrRowDoesNotMatchPartition) + // validation test for list partition + tk.MustExec("set @@tidb_enable_list_partition=true") + tk.MustExec(`CREATE TABLE t1 (store_id int) + PARTITION BY LIST (store_id) ( + PARTITION pNorth VALUES IN (1, 2, 3, 4, 5), + PARTITION pEast VALUES IN (6, 7, 8, 9, 10), + PARTITION pWest VALUES IN (11, 12, 13, 14, 15), + PARTITION pCentral VALUES IN (16, 17, 18, 19, 20) + );`) + tk.MustExec(`create table t2 (store_id int);`) + tk.MustExec(`insert into t1 values (1);`) + tk.MustExec(`insert into t1 values (6);`) + tk.MustExec(`insert into t1 values (11);`) + tk.MustExec(`insert into t2 values (3);`) + tk.MustExec("alter table t1 exchange partition pNorth with table t2") + + tk.MustQuery("select * from t1 partition(pNorth)").Check(testkit.Rows("3")) + tk.MustGetErrCode("alter table t1 exchange partition pEast with table t2", tmysql.ErrRowDoesNotMatchPartition) + + // validation test for list columns partition + tk.MustExec(`CREATE TABLE t3 (id int, store_id int) + PARTITION BY LIST COLUMNS (id, store_id) ( + PARTITION pNorth VALUES IN ((1, 1), (2, 2)), + PARTITION pEast VALUES IN ((3, 3), (4, 4)) + );`) + tk.MustExec(`create table t4 (id int, store_id int);`) + tk.MustExec(`insert into t3 values (1, 1);`) + tk.MustExec(`insert into t4 values (2, 2);`) + tk.MustExec("alter table t3 exchange partition pNorth with table t4") + + tk.MustQuery("select * from t3 partition(p0)").Check(testkit.Rows("")) + tk.MustGetErrCode("alter table t3 exchange partition p1 with table t4", tmysql.ErrRowDoesNotMatchPartition) + // test exchange partition from different databases tk.MustExec("create table e8 (a int) partition by hash(a) partitions 2;") tk.MustExec("create database if not exists exchange_partition") From bfc0fba21f1595f2ea2f76a0c08e2d31d82d0fb0 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Mon, 27 Jun 2022 17:19:54 +0800 Subject: [PATCH 02/23] exchange-partition-GA --- ddl/db_partition_test.go | 132 +++++++++++++++++++++++++++++---------- ddl/ddl_api.go | 1 + ddl/partition.go | 6 ++ 3 files changed, 105 insertions(+), 34 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 1382e7d32dd5a..10cfa6bac8d07 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -1981,15 +1981,16 @@ func TestAlterTableExchangePartition(t *testing.T) { // validation test for list columns partition tk.MustExec(`CREATE TABLE t3 (id int, store_id int) PARTITION BY LIST COLUMNS (id, store_id) ( - PARTITION pNorth VALUES IN ((1, 1), (2, 2)), - PARTITION pEast VALUES IN ((3, 3), (4, 4)) + PARTITION p0 VALUES IN ((1, 1), (2, 2)), + PARTITION p1 VALUES IN ((3, 3), (4, 4)) );`) tk.MustExec(`create table t4 (id int, store_id int);`) tk.MustExec(`insert into t3 values (1, 1);`) tk.MustExec(`insert into t4 values (2, 2);`) - tk.MustExec("alter table t3 exchange partition pNorth with table t4") + tk.MustExec("alter table t3 exchange partition p0 with table t4") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 after the exchange, please analyze related table of the exchange to update statistics")) - tk.MustQuery("select * from t3 partition(p0)").Check(testkit.Rows("")) + tk.MustQuery("select * from t3 partition(p0)").Check(testkit.Rows("2 2")) tk.MustGetErrCode("alter table t3 exchange partition p1 with table t4", tmysql.ErrRowDoesNotMatchPartition) // test exchange partition from different databases @@ -2227,67 +2228,93 @@ func TestExchangePartitionTableCompatiable(t *testing.T) { dbterror.ErrTablesDifferentMetadata, }, { - // default - "create table pt17 (id int not null default 1) partition by hash(id) partitions 1;", - "create table nt17 (id int not null);", + // auto_increment + "create table pt17 (id bigint not null primary key auto_increment) partition by hash(id) partitions 1;", + "create table nt17 (id bigint not null primary key);", "alter table pt17 exchange partition p0 with table nt17;", + dbterror.ErrTablesDifferentMetadata, + }, + { + // auto_random + "create table pt18 (id bigint not null primary key AUTO_RANDOM) partition by hash(id) partitions 1;", + "create table nt18 (id bigint not null primary key);", + "alter table pt18 exchange partition p0 with table nt18;", + dbterror.ErrTablesDifferentMetadata, + }, + { + // default + "create table pt19 (id int not null default 1) partition by hash(id) partitions 1;", + "create table nt19 (id int not null);", + "alter table pt19 exchange partition p0 with table nt19;", nil, }, { // view test - "create table pt18 (id int not null) partition by hash(id) partitions 1;", - "create view nt18 as select id from nt17;", - "alter table pt18 exchange partition p0 with table nt18", + "create table pt20 (id int not null) partition by hash(id) partitions 1;", + "create view nt20 as select id from nt17;", + "alter table pt20 exchange partition p0 with table nt20", dbterror.ErrCheckNoSuchTable, }, { - "create table pt19 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) stored) partition by hash(id) partitions 1;", - "create table nt19 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) virtual);", - "alter table pt19 exchange partition p0 with table nt19;", + "create table pt21 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) stored) partition by hash(id) partitions 1;", + "create table nt21 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) virtual);", + "alter table pt21 exchange partition p0 with table nt21;", dbterror.ErrUnsupportedOnGeneratedColumn, }, { - "create table pt20 (id int not null) partition by hash(id) partitions 1;", - "create table nt20 (id int default null);", - "alter table pt20 exchange partition p0 with table nt20;", + "create table pt22 (id int not null) partition by hash(id) partitions 1;", + "create table nt22 (id int default null);", + "alter table pt22 exchange partition p0 with table nt22;", dbterror.ErrTablesDifferentMetadata, }, { // unsigned - "create table pt21 (id int unsigned) partition by hash(id) partitions 1;", - "create table nt21 (id int);", - "alter table pt21 exchange partition p0 with table nt21;", + "create table pt23 (id int unsigned) partition by hash(id) partitions 1;", + "create table nt23 (id int);", + "alter table pt23 exchange partition p0 with table nt23;", dbterror.ErrTablesDifferentMetadata, }, { // zerofill - "create table pt22 (id int) partition by hash(id) partitions 1;", - "create table nt22 (id int zerofill);", - "alter table pt22 exchange partition p0 with table nt22;", + "create table pt24 (id int) partition by hash(id) partitions 1;", + "create table nt24 (id int zerofill);", + "alter table pt24 exchange partition p0 with table nt24;", dbterror.ErrTablesDifferentMetadata, }, { - "create table pt23 (id int, lname varchar(10) charset binary) partition by hash(id) partitions 1;", - "create table nt23 (id int, lname varchar(10));", - "alter table pt23 exchange partition p0 with table nt23;", + "create table pt25 (id int, lname varchar(10) charset binary) partition by hash(id) partitions 1;", + "create table nt25 (id int, lname varchar(10));", + "alter table pt25 exchange partition p0 with table nt25;", dbterror.ErrTablesDifferentMetadata, }, { - "create table pt25 (id int, a datetime on update current_timestamp) partition by hash(id) partitions 1;", - "create table nt25 (id int, a datetime);", - "alter table pt25 exchange partition p0 with table nt25;", + "create table pt26 (id int, a datetime on update current_timestamp) partition by hash(id) partitions 1;", + "create table nt26 (id int, a datetime);", + "alter table pt26 exchange partition p0 with table nt26;", nil, }, { - "create table pt26 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) virtual) partition by hash(id) partitions 1;", - "create table nt26 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(id, ' ')) virtual);", - "alter table pt26 exchange partition p0 with table nt26;", + "create table pt27 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) virtual) partition by hash(id) partitions 1;", + "create table nt27 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(id, ' ')) virtual);", + "alter table pt27 exchange partition p0 with table nt27;", dbterror.ErrTablesDifferentMetadata, }, { - "create table pt27 (a int key, b int, index(a)) partition by hash(a) partitions 1;", - "create table nt27 (a int not null, b int, index(a));", - "alter table pt27 exchange partition p0 with table nt27;", + "create table pt28 (a int primary key, b int, index(a)) partition by hash(a) partitions 1;", + "create table nt28 (a int not null, b int, index(a));", + "alter table pt28 exchange partition p0 with table nt28;", + dbterror.ErrTablesDifferentMetadata, + }, + { + "create table pt29 (a int primary key, b int) partition by hash(a) partitions 1;", + "create table nt29 (a int not null, b int, index(a));", + "alter table pt29 exchange partition p0 with table nt29;", + dbterror.ErrTablesDifferentMetadata, + }, + { + "create table pt30 (a int primary key, b int) partition by hash(a) partitions 1;", + "create table nt30 (a int, b int, unique index(a));", + "alter table pt30 exchange partition p0 with table nt30;", dbterror.ErrTablesDifferentMetadata, }, } @@ -2313,6 +2340,43 @@ func TestExchangePartitionTableCompatiable(t *testing.T) { require.NoError(t, err) } +// TestExchangePartitionHook is temporary test, needs to be deleted before merging into master. +func TestExchangePartitionHook(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + // why use tkCancel, not tk. + tkCancel := testkit.NewTestKit(t, store) + + tk.MustExec("set @@tidb_enable_exchange_partition=1") + defer tk.MustExec("set @@tidb_enable_exchange_partition=0") + + tk.MustExec("use test") + tk.MustExec(`create table pt (a int) partition by range(a) ( + partition p0 values less than (3), + partition p1 values less than (6), + PARTITION p2 VALUES LESS THAN (9), + PARTITION p3 VALUES LESS THAN (MAXVALUE) + );`) + tk.MustExec(`create table nt(a int);`) + + tk.MustExec(`insert into pt values (0), (4), (7)`) + tk.MustExec("insert into nt values (1)") + + hook := &ddl.TestDDLCallback{Do: dom} + hookFunc := func(job *model.Job) { + if job.Type == model.ActionExchangeTablePartition { + tkCancel.MustExec("use test") + tkCancel.MustExec("insert into nt values (5)") + } + } + dom.DDL().SetHook(hook) + hook.OnJobUpdatedExported = hookFunc + + tk.MustExec("alter table pt exchange partition p0 with table nt") + tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1]\n[5")) +} + func TestExchangePartitionExpressIndex(t *testing.T) { restore := config.RestoreFunc() defer restore() diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 01f57c3ce1d18..40991d2c1853a 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -4054,6 +4054,7 @@ func (d *ddl) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Ident, sp if err != nil { return errors.Trace(err) } + ctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("after the exchange, please analyze related table of the exchange to update statistics")) err = d.callHookOnChanged(job, err) return errors.Trace(err) } diff --git a/ddl/partition.go b/ddl/partition.go index 3ab281c7f1afc..a7e7e99a9a358 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1397,6 +1397,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } } + d.mu.RLock() + d.mu.hook.OnJobUpdated(job) + d.mu.RUnlock() + // partition table auto IDs. ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() if err != nil { @@ -1605,6 +1609,8 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde sql, paramList = buildCheckSQLForListPartition(pi, index, schemaName, tableName) } else if len(pi.Columns) == 1 { sql, paramList = buildCheckSQLForListColumnsPartition(pi, index, schemaName, tableName) + } else { + sql, paramList = buildCheckSQLForListColumnsPartition(pi, index, schemaName, tableName) } default: return dbterror.ErrUnsupportedPartitionType.GenWithStackByArgs(pt.Name.O) From f57d9d7f2a21e74216c3cb37981929c69355f3b0 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Wed, 6 Jul 2022 15:15:02 +0800 Subject: [PATCH 03/23] intermediate state changes are implemented --- ddl/db_partition_test.go | 16 ++++++++++++++-- ddl/ddl_worker.go | 14 ++++++++------ ddl/partition.go | 21 ++++++++++++++++++++- executor/insert_common.go | 17 +++++++++++++++++ executor/write.go | 18 ++++++++++++++++++ infoschema/metrics_schema.go | 5 +++++ infoschema/perfschema/tables.go | 4 ++++ infoschema/tables.go | 4 ++++ parser/model/model.go | 5 +++++ table/table.go | 2 ++ table/tables/partition.go | 15 +++++++++++++++ table/tables/tables.go | 4 ++++ 12 files changed, 116 insertions(+), 9 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 10cfa6bac8d07..99cad8bc5cf4a 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2364,17 +2364,29 @@ func TestExchangePartitionHook(t *testing.T) { tk.MustExec("insert into nt values (1)") hook := &ddl.TestDDLCallback{Do: dom} + dom.DDL().SetHook(hook) + hookFunc := func(job *model.Job) { + if job.Type == model.ActionExchangeTablePartition { + tkCancel.MustExec("use test") + tkCancel.MustExec("insert into nt values (2)") + } + } + hook.OnJobUpdatedExported = hookFunc + + tk.MustExec("alter table pt exchange partition p0 with table nt") + tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1]\n[2")) + + hookFunc = func(job *model.Job) { if job.Type == model.ActionExchangeTablePartition { tkCancel.MustExec("use test") tkCancel.MustExec("insert into nt values (5)") } } - dom.DDL().SetHook(hook) hook.OnJobUpdatedExported = hookFunc tk.MustExec("alter table pt exchange partition p0 with table nt") - tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1]\n[5")) + tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1]\n[2")) } func TestExchangePartitionExpressIndex(t *testing.T) { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 50362923488a8..c4d556b61f947 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -703,9 +703,9 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { d.mu.RUnlock() } - d.mu.RLock() - d.mu.hook.OnJobUpdated(job) - d.mu.RUnlock() + //d.mu.RLock() + //d.mu.hook.OnJobUpdated(job) + //d.mu.RUnlock() if job.IsSynced() || job.IsCancelled() || job.IsRollbackDone() { asyncNotify(d.ddlJobDoneCh) @@ -1160,10 +1160,12 @@ func updateSchemaVersion(_ *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) diff.AffectedOpts = affects case model.ActionExchangeTablePartition: var ( - ptSchemaID int64 - ptTableID int64 + ptSchemaID int64 + ptTableID int64 + partName string + withValidation bool ) - err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID) + err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation) if err != nil { return 0, errors.Trace(err) } diff --git a/ddl/partition.go b/ddl/partition.go index a7e7e99a9a358..9b20cd57c775b 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1389,6 +1389,24 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + if !nt.ExchangePartitionFlag { + if withValidation { + err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + } + nt.ExchangePartitionFlag = true + nt.ExchangePartitionId = ptID + nt.ExchangePartitionDefId = defID + return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) + } + + if d.lease > 0 { + delayForAsyncCommit() + } + if withValidation { err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) if err != nil { @@ -1525,7 +1543,8 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Wrapf(err, "failed to notify PD the label rules") } - ver, err = updateSchemaVersion(d, t, job) + nt.ExchangePartitionFlag = false + ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true) if err != nil { return ver, errors.Trace(err) } diff --git a/executor/insert_common.go b/executor/insert_common.go index 9d31cf44f030f..59cc333814faa 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "github.com/pingcap/tidb/infoschema" "math" "sync" "time" @@ -642,6 +643,22 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue } } } + tbl := e.Table.Meta() + // Handle exchange partition + if tbl.ExchangePartitionFlag { + is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) + pt, tableFound := is.TableByID(tbl.ExchangePartitionId) + if !tableFound { + return nil, errors.Errorf("insert record TableByID Failed") + } + canExchange, err := pt.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionDefId) + if err != nil { + return nil, err + } + if !canExchange { + return nil, errors.Errorf("update data not match partition constraint during exchange partition with table.") + } + } for i, gCol := range gCols { colIdx := gCol.ColumnInfo.Offset val, err := e.GenExprs[i].Eval(chunk.MutRowFromDatums(row).ToRow()) diff --git a/executor/write.go b/executor/write.go index 49707f6417bf5..01b786ea0a05b 100644 --- a/executor/write.go +++ b/executor/write.go @@ -16,6 +16,7 @@ package executor import ( "context" + "github.com/pingcap/tidb/infoschema" "strings" "github.com/opentracing/opentracing-go" @@ -80,6 +81,23 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old } } + // Handle exchange partition + tbl := t.Meta() + if tbl.ExchangePartitionFlag { + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + pt, tableFound := is.TableByID(tbl.ExchangePartitionId) + if !tableFound { + return false, errors.Errorf("exchange partition with table TableByID Failed") + } + canExchange, err := pt.CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionDefId) + if err != nil { + return false, err + } + if !canExchange { + return false, errors.Errorf("update data not match partition constraint during exchange partition with table.") + } + } + // Compare datum, then handle some flags. for i, col := range t.Cols() { // We should use binary collation to compare datum, otherwise the result will be incorrect. diff --git a/infoschema/metrics_schema.go b/infoschema/metrics_schema.go index 9ffe6a5a8e8a2..61387df94880f 100644 --- a/infoschema/metrics_schema.go +++ b/infoschema/metrics_schema.go @@ -17,6 +17,7 @@ package infoschema import ( "bytes" "fmt" + "github.com/pingcap/tidb/types" "sort" "strconv" "strings" @@ -144,6 +145,10 @@ type metricSchemaTable struct { infoschemaTable } +func (m metricSchemaTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { + panic("implement me") +} + func tableFromMeta(alloc autoid.Allocators, meta *model.TableInfo) (table.Table, error) { columns := make([]*table.Column, 0, len(meta.Columns)) for _, colInfo := range meta.Columns { diff --git a/infoschema/perfschema/tables.go b/infoschema/perfschema/tables.go index c0006f9f7413a..ff7fa391c7457 100644 --- a/infoschema/perfschema/tables.go +++ b/infoschema/perfschema/tables.go @@ -112,6 +112,10 @@ type perfSchemaTable struct { indices []table.Index } +func (vt *perfSchemaTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { + panic("implement me") +} + var pluginTable = make(map[string]func(autoid.Allocators, *model.TableInfo) (table.Table, error)) // IsPredefinedTable judges whether this table is predefined. diff --git a/infoschema/tables.go b/infoschema/tables.go index 622eb1ef9452c..51e16818610af 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1910,6 +1910,10 @@ type infoschemaTable struct { tp table.Type } +func (it *infoschemaTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { + panic("implement me") +} + // SchemasSorter implements the sort.Interface interface, sorts DBInfo by name. type SchemasSorter []*model.DBInfo diff --git a/parser/model/model.go b/parser/model/model.go index 43e3e4bc5bcfb..9182f83d45a03 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -444,6 +444,11 @@ type TableInfo struct { // StatsOptions is used when do analyze/auto-analyze for each table StatsOptions *StatsOptions `json:"stats_options"` + + // ExchangePartitionFlag、ExchangePartitionId、ExchangePartitionDefId is used when do exchange partition with table + ExchangePartitionFlag bool `json:"exchange_partition_flag"` + ExchangePartitionId int64 `json:"exchange_partition_id"` + ExchangePartitionDefId int64 `json:"exchange_partition_def_id"` } type TableCacheStatusType int diff --git a/table/table.go b/table/table.go index 775cb03bb6cf9..023dadac8b53f 100644 --- a/table/table.go +++ b/table/table.go @@ -194,6 +194,8 @@ type Table interface { // Type returns the type of table Type() Type + + CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) } // AllocAutoIncrementValue allocates an auto_increment value for a new row. diff --git a/table/tables/partition.go b/table/tables/partition.go index 9f84333213a82..6902d7b08411c 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -68,6 +68,10 @@ type partition struct { TableCommon } +func (p *partition) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { + panic("implement me") +} + // GetPhysicalID implements table.Table GetPhysicalID interface. func (p *partition) GetPhysicalID() int64 { return p.physicalTableID @@ -964,6 +968,17 @@ func PartitionRecordKey(pid int64, handle int64) kv.Key { return tablecodec.EncodeRecordKey(recordPrefix, kv.IntHandle(handle)) } +func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { + defID, err := t.locatePartition(ctx, pi, r) + if err != nil { + return false, err + } + if defID != pid { + return false, errors.Errorf("insert data doesn't match partition constraint during exchange partition with table") + } + return true, nil +} + // locatePartition returns the partition ID of the input record. func (t *partitionedTable) locatePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int64, error) { var err error diff --git a/table/tables/tables.go b/table/tables/tables.go index aab32e1f18d8f..b08815d2f9fcf 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -75,6 +75,10 @@ type TableCommon struct { indexPrefix kv.Key } +func (t *TableCommon) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { + panic("implement me") +} + // MockTableFromMeta only serves for test. func MockTableFromMeta(tblInfo *model.TableInfo) table.Table { columns := make([]*table.Column, 0, len(tblInfo.Columns)) From ccdb0134b82f1de1dfd88625690bd9e154812f6b Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Wed, 6 Jul 2022 15:43:15 +0800 Subject: [PATCH 04/23] fix --- infoschema/metrics_schema.go | 4 ++-- infoschema/tables.go | 35 +++++++++++------------------------ 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/infoschema/metrics_schema.go b/infoschema/metrics_schema.go index 61387df94880f..8508350bc444e 100644 --- a/infoschema/metrics_schema.go +++ b/infoschema/metrics_schema.go @@ -18,7 +18,7 @@ import ( "bytes" "fmt" "github.com/pingcap/tidb/types" - "sort" + "golang.org/x/exp/slices" "strconv" "strings" @@ -136,7 +136,7 @@ func GenLabelConditionValues(values set.StringSet) string { for k := range values { vs = append(vs, k) } - sort.Strings(vs) + slices.Sort(vs) return strings.Join(vs, "|") } diff --git a/infoschema/tables.go b/infoschema/tables.go index e9eaf5832d64d..407ee7f807646 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -26,19 +26,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/parser/charset" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/parser/terror" - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/stmtsummary" - "go.uber.org/zap" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser/charset" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -47,8 +43,12 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/deadlockhistory" "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/pdapi" + "github.com/pingcap/tidb/util/stmtsummary" "github.com/tikv/client-go/v2/tikv" + "go.uber.org/zap" + "golang.org/x/exp/slices" ) const ( @@ -1913,25 +1913,12 @@ func (it *infoschemaTable) CheckForExchangePartition(ctx sessionctx.Context, pi panic("implement me") } -// SchemasSorter implements the sort.Interface interface, sorts DBInfo by name. -type SchemasSorter []*model.DBInfo - -func (s SchemasSorter) Len() int { - return len(s) -} - -func (s SchemasSorter) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -func (s SchemasSorter) Less(i, j int) bool { - return s[i].Name.L < s[j].Name.L -} - func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column) (fullRows [][]types.Datum, err error) { is := ctx.GetInfoSchema().(InfoSchema) dbs := is.AllSchemas() - sort.Sort(SchemasSorter(dbs)) + slices.SortFunc(dbs, func(i, j *model.DBInfo) bool { + return i.Name.L < j.Name.L + }) switch it.meta.Name.O { case tableFiles: case tablePlugins, tableTriggers: From 8414fedf10107b414ca75ea42edef3cdbdc751fc Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 7 Jul 2022 12:15:10 +0800 Subject: [PATCH 05/23] fix --- ddl/db_partition_test.go | 4 ++-- executor/insert_common.go | 2 +- executor/write.go | 2 +- infoschema/metrics_schema.go | 5 ----- infoschema/perfschema/tables.go | 4 ---- infoschema/tables.go | 4 ---- table/table.go | 3 +-- table/tables/partition.go | 4 ---- table/tables/tables.go | 4 ---- 9 files changed, 5 insertions(+), 27 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index c8de6d9771ca4..91ae2dacba261 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2380,13 +2380,13 @@ func TestExchangePartitionHook(t *testing.T) { hookFunc = func(job *model.Job) { if job.Type == model.ActionExchangeTablePartition { tkCancel.MustExec("use test") - tkCancel.MustExec("insert into nt values (5)") + tkCancel.MustGetErrMsg("insert into nt values (5)", "insert data doesn't match partition constraint during exchange partition with table") } } hook.OnJobUpdatedExported = hookFunc tk.MustExec("alter table pt exchange partition p0 with table nt") - tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1]\n[2")) + tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("0")) } func TestExchangePartitionExpressIndex(t *testing.T) { diff --git a/executor/insert_common.go b/executor/insert_common.go index 59cc333814faa..1cff9975258ee 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -651,7 +651,7 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue if !tableFound { return nil, errors.Errorf("insert record TableByID Failed") } - canExchange, err := pt.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionDefId) + canExchange, err := pt.(table.PartitionedTable).CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionDefId) if err != nil { return nil, err } diff --git a/executor/write.go b/executor/write.go index 01b786ea0a05b..3224abc1eda5e 100644 --- a/executor/write.go +++ b/executor/write.go @@ -89,7 +89,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old if !tableFound { return false, errors.Errorf("exchange partition with table TableByID Failed") } - canExchange, err := pt.CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionDefId) + canExchange, err := pt.(table.PartitionedTable).CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionDefId) if err != nil { return false, err } diff --git a/infoschema/metrics_schema.go b/infoschema/metrics_schema.go index 8508350bc444e..15f6cd3163898 100644 --- a/infoschema/metrics_schema.go +++ b/infoschema/metrics_schema.go @@ -17,7 +17,6 @@ package infoschema import ( "bytes" "fmt" - "github.com/pingcap/tidb/types" "golang.org/x/exp/slices" "strconv" "strings" @@ -145,10 +144,6 @@ type metricSchemaTable struct { infoschemaTable } -func (m metricSchemaTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { - panic("implement me") -} - func tableFromMeta(alloc autoid.Allocators, meta *model.TableInfo) (table.Table, error) { columns := make([]*table.Column, 0, len(meta.Columns)) for _, colInfo := range meta.Columns { diff --git a/infoschema/perfschema/tables.go b/infoschema/perfschema/tables.go index 3606ba4febdc2..149336c3bf87d 100644 --- a/infoschema/perfschema/tables.go +++ b/infoschema/perfschema/tables.go @@ -112,10 +112,6 @@ type perfSchemaTable struct { indices []table.Index } -func (vt *perfSchemaTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { - panic("implement me") -} - var pluginTable = make(map[string]func(autoid.Allocators, *model.TableInfo) (table.Table, error)) // IsPredefinedTable judges whether this table is predefined. diff --git a/infoschema/tables.go b/infoschema/tables.go index 407ee7f807646..9f6a21f3727b6 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1909,10 +1909,6 @@ type infoschemaTable struct { tp table.Type } -func (it *infoschemaTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { - panic("implement me") -} - func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column) (fullRows [][]types.Datum, err error) { is := ctx.GetInfoSchema().(InfoSchema) dbs := is.AllSchemas() diff --git a/table/table.go b/table/table.go index 023dadac8b53f..dfa057415f65f 100644 --- a/table/table.go +++ b/table/table.go @@ -194,8 +194,6 @@ type Table interface { // Type returns the type of table Type() Type - - CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) } // AllocAutoIncrementValue allocates an auto_increment value for a new row. @@ -244,6 +242,7 @@ type PartitionedTable interface { GetPartitionByRow(sessionctx.Context, []types.Datum) (PhysicalTable, error) GetAllPartitionIDs() []int64 GetPartitionColumnNames() []model.CIStr + CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) } // TableFromMeta builds a table.Table from *model.TableInfo. diff --git a/table/tables/partition.go b/table/tables/partition.go index 6902d7b08411c..2e795620e59f9 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -68,10 +68,6 @@ type partition struct { TableCommon } -func (p *partition) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { - panic("implement me") -} - // GetPhysicalID implements table.Table GetPhysicalID interface. func (p *partition) GetPhysicalID() int64 { return p.physicalTableID diff --git a/table/tables/tables.go b/table/tables/tables.go index 621e5f83659ea..9b35a1af8cc3b 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -75,10 +75,6 @@ type TableCommon struct { indexPrefix kv.Key } -func (t *TableCommon) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { - panic("implement me") -} - // MockTableFromMeta only serves for test. func MockTableFromMeta(tblInfo *model.TableInfo) table.Table { columns := make([]*table.Column, 0, len(tblInfo.Columns)) From bb82e86e743139151df0924c8c49db0a8b3f1c32 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 7 Jul 2022 12:28:44 +0800 Subject: [PATCH 06/23] fix --- ddl/db_partition_test.go | 2 +- executor/insert_common.go | 7 ++----- executor/write.go | 5 +---- table/table.go | 2 +- table/tables/partition.go | 8 ++++---- 5 files changed, 9 insertions(+), 15 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 91ae2dacba261..365f0f21205f7 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2380,7 +2380,7 @@ func TestExchangePartitionHook(t *testing.T) { hookFunc = func(job *model.Job) { if job.Type == model.ActionExchangeTablePartition { tkCancel.MustExec("use test") - tkCancel.MustGetErrMsg("insert into nt values (5)", "insert data doesn't match partition constraint during exchange partition with table") + tkCancel.MustGetErrCode("insert into nt values (5)", tmysql.ErrRowDoesNotMatchGivenPartitionSet) } } hook.OnJobUpdatedExported = hookFunc diff --git a/executor/insert_common.go b/executor/insert_common.go index 1cff9975258ee..e69611954b060 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -649,15 +649,12 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionId) if !tableFound { - return nil, errors.Errorf("insert record TableByID Failed") + return nil, errors.Errorf("exchange partition with table TableByID Failed") } - canExchange, err := pt.(table.PartitionedTable).CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionDefId) + err := pt.(table.PartitionedTable).CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionDefId) if err != nil { return nil, err } - if !canExchange { - return nil, errors.Errorf("update data not match partition constraint during exchange partition with table.") - } } for i, gCol := range gCols { colIdx := gCol.ColumnInfo.Offset diff --git a/executor/write.go b/executor/write.go index 3224abc1eda5e..61b2d27123a90 100644 --- a/executor/write.go +++ b/executor/write.go @@ -89,13 +89,10 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old if !tableFound { return false, errors.Errorf("exchange partition with table TableByID Failed") } - canExchange, err := pt.(table.PartitionedTable).CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionDefId) + err := pt.(table.PartitionedTable).CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionDefId) if err != nil { return false, err } - if !canExchange { - return false, errors.Errorf("update data not match partition constraint during exchange partition with table.") - } } // Compare datum, then handle some flags. diff --git a/table/table.go b/table/table.go index dfa057415f65f..09dc61feb0fdc 100644 --- a/table/table.go +++ b/table/table.go @@ -242,7 +242,7 @@ type PartitionedTable interface { GetPartitionByRow(sessionctx.Context, []types.Datum) (PhysicalTable, error) GetAllPartitionIDs() []int64 GetPartitionColumnNames() []model.CIStr - CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) + CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error } // TableFromMeta builds a table.Table from *model.TableInfo. diff --git a/table/tables/partition.go b/table/tables/partition.go index 2e795620e59f9..335d8db214e28 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -964,15 +964,15 @@ func PartitionRecordKey(pid int64, handle int64) kv.Key { return tablecodec.EncodeRecordKey(recordPrefix, kv.IntHandle(handle)) } -func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) (bool, error) { +func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, pid int64) error { defID, err := t.locatePartition(ctx, pi, r) if err != nil { - return false, err + return err } if defID != pid { - return false, errors.Errorf("insert data doesn't match partition constraint during exchange partition with table") + return errors.WithStack(table.ErrRowDoesNotMatchGivenPartitionSet) } - return true, nil + return nil } // locatePartition returns the partition ID of the input record. From 675a7b168a998ab9c20614d0a1149fb3ca514bb8 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Mon, 11 Jul 2022 12:56:28 +0800 Subject: [PATCH 07/23] add placement policy --- ddl/partition.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/ddl/partition.go b/ddl/partition.go index 9b20cd57c775b..e1f5bfc56e589 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1493,6 +1493,12 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + placementPolicyEqual := checkExchangePartitionPlacementPolicy(t, pt.PlacementPolicyRef, nt.PlacementPolicyRef) + if !placementPolicyEqual { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect @@ -1653,6 +1659,37 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde return nil } +func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) bool { + if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil { + return true + } + if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { + return false + } + + ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) + ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) + + if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { + return true + } + if ntPlacementPolicyInfo == nil || ptPlacementPolicyInfo == nil { + return false + } + + ptSetting, ntSetting := ntPlacementPolicyInfo.PlacementSettings, ptPlacementPolicyInfo.PlacementSettings + + if ptSetting.PrimaryRegion != ntSetting.PrimaryRegion || ptSetting.Regions != ntSetting.Regions || + ptSetting.Learners != ntSetting.Learners || ptSetting.Followers != ntSetting.Followers || ptSetting.Voters != ntSetting.Voters || + ptSetting.Schedule != ntSetting.Schedule || ptSetting.Constraints != ntSetting.Constraints || + ptSetting.LeaderConstraints != ntSetting.LeaderConstraints || ptSetting.LearnerConstraints != ntSetting.LearnerConstraints || + ptSetting.FollowerConstraints != ntSetting.FollowerConstraints || ptSetting.VoterConstraints != ntSetting.VoterConstraints { + return false + } + + return true +} + func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { var buf strings.Builder paramList := make([]interface{}, 0, 4) From c21dd239a2ff41a53dc779ee11713811692bbdc7 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Mon, 11 Jul 2022 15:03:06 +0800 Subject: [PATCH 08/23] fix --- ddl/db_partition_test.go | 15 ++------------- ddl/ddl_worker.go | 6 +++--- ddl/partition.go | 4 ---- 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 365f0f21205f7..6dc4a76407137 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2367,18 +2367,7 @@ func TestExchangePartitionHook(t *testing.T) { dom.DDL().SetHook(hook) hookFunc := func(job *model.Job) { - if job.Type == model.ActionExchangeTablePartition { - tkCancel.MustExec("use test") - tkCancel.MustExec("insert into nt values (2)") - } - } - hook.OnJobUpdatedExported = hookFunc - - tk.MustExec("alter table pt exchange partition p0 with table nt") - tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1]\n[2")) - - hookFunc = func(job *model.Job) { - if job.Type == model.ActionExchangeTablePartition { + if job.Type == model.ActionExchangeTablePartition && job.SchemaState != model.StateNone { tkCancel.MustExec("use test") tkCancel.MustGetErrCode("insert into nt values (5)", tmysql.ErrRowDoesNotMatchGivenPartitionSet) } @@ -2386,7 +2375,7 @@ func TestExchangePartitionHook(t *testing.T) { hook.OnJobUpdatedExported = hookFunc tk.MustExec("alter table pt exchange partition p0 with table nt") - tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("0")) + tk.MustQuery("select * from pt partition(p0)").Check(testkit.Rows("1")) } func TestExchangePartitionExpressIndex(t *testing.T) { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index e46d13523a9a5..1ff8f11700f19 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -756,9 +756,9 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { d.mu.RUnlock() } - //d.mu.RLock() - //d.mu.hook.OnJobUpdated(job) - //d.mu.RUnlock() + d.mu.RLock() + d.mu.hook.OnJobUpdated(job) + d.mu.RUnlock() if job.IsSynced() || job.IsCancelled() || job.IsRollbackDone() { asyncNotify(d.ddlJobDoneCh) diff --git a/ddl/partition.go b/ddl/partition.go index e1f5bfc56e589..f2b0e3b6a4762 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1415,10 +1415,6 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } } - d.mu.RLock() - d.mu.hook.OnJobUpdated(job) - d.mu.RUnlock() - // partition table auto IDs. ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() if err != nil { From 61e5f7902fb240b2649b39f6bbe7cbfafaef909c Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Mon, 11 Jul 2022 15:21:54 +0800 Subject: [PATCH 09/23] fix --- infoschema/metrics_schema.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infoschema/metrics_schema.go b/infoschema/metrics_schema.go index 15f6cd3163898..8efd1273d5d1e 100644 --- a/infoschema/metrics_schema.go +++ b/infoschema/metrics_schema.go @@ -17,7 +17,6 @@ package infoschema import ( "bytes" "fmt" - "golang.org/x/exp/slices" "strconv" "strings" @@ -29,6 +28,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/set" + "golang.org/x/exp/slices" ) const ( From a730278c7506c1e05bf45c209fd329914f89d56d Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Mon, 11 Jul 2022 15:37:42 +0800 Subject: [PATCH 10/23] fix --- executor/insert_common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index e69611954b060..392cc524d68e6 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "fmt" - "github.com/pingcap/tidb/infoschema" "math" "sync" "time" @@ -27,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/ast" From ec6e029c1c54b9847a031a61b928ee5791fc8b53 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Mon, 11 Jul 2022 15:38:32 +0800 Subject: [PATCH 11/23] fix --- executor/write.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/write.go b/executor/write.go index 61b2d27123a90..c6db1e99556f8 100644 --- a/executor/write.go +++ b/executor/write.go @@ -16,13 +16,13 @@ package executor import ( "context" - "github.com/pingcap/tidb/infoschema" "strings" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/ast" From c722978ec0e42ad185d61ec5edda28897b4d0eaa Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Mon, 11 Jul 2022 22:04:54 +0800 Subject: [PATCH 12/23] fix --- ddl/partition.go | 16 ++++----- ddl/placement_policy_test.go | 64 ++++-------------------------------- errno/errcode.go | 1 + errno/errname.go | 3 +- util/dbterror/ddl_terror.go | 3 ++ 5 files changed, 20 insertions(+), 67 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index f2b0e3b6a4762..14e7ed008eef4 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1489,7 +1489,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - placementPolicyEqual := checkExchangePartitionPlacementPolicy(t, pt.PlacementPolicyRef, nt.PlacementPolicyRef) + placementPolicyEqual, err := checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) if !placementPolicyEqual { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -1655,22 +1655,22 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde return nil } -func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) bool { +func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) (bool, error) { if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil { - return true + return true, nil } if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { - return false + return false, dbterror.ErrPlacementPolicyNotEqual } ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { - return true + return true, nil } if ntPlacementPolicyInfo == nil || ptPlacementPolicyInfo == nil { - return false + return false, dbterror.ErrPlacementPolicyNotEqual } ptSetting, ntSetting := ntPlacementPolicyInfo.PlacementSettings, ptPlacementPolicyInfo.PlacementSettings @@ -1680,10 +1680,10 @@ func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *m ptSetting.Schedule != ntSetting.Schedule || ptSetting.Constraints != ntSetting.Constraints || ptSetting.LeaderConstraints != ntSetting.LeaderConstraints || ptSetting.LearnerConstraints != ntSetting.LearnerConstraints || ptSetting.FollowerConstraints != ntSetting.FollowerConstraints || ptSetting.VoterConstraints != ntSetting.VoterConstraints { - return false + return false, dbterror.ErrPlacementPolicyNotEqual } - return true + return true, nil } func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index 3c4c1b4cd3109..74b3c273238cf 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -1928,9 +1928,6 @@ func TestExchangePartitionWithPlacement(t *testing.T) { policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p1")) require.True(t, ok) - policy2, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p2")) - require.True(t, ok) - tk.MustExec(`CREATE TABLE t1 (id INT) placement policy p1`) defer tk.MustExec("drop table t1") @@ -1941,12 +1938,8 @@ func TestExchangePartitionWithPlacement(t *testing.T) { require.NoError(t, err) t1ID := t1.Meta().ID - t2, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) - require.NoError(t, err) - t2ID := t2.Meta().ID - tk.MustExec(`CREATE TABLE tp (id INT) placement policy p3 PARTITION BY RANGE (id) ( - PARTITION p0 VALUES LESS THAN (100), + PARTITION p0 VALUES LESS THAN (100) placement policy p1, PARTITION p1 VALUES LESS THAN (1000) placement policy p2, PARTITION p2 VALUES LESS THAN (10000) );`) @@ -1956,7 +1949,6 @@ func TestExchangePartitionWithPlacement(t *testing.T) { require.NoError(t, err) tpID := tp.Meta().ID par0ID := tp.Meta().Partition.Definitions[0].ID - par1ID := tp.Meta().Partition.Definitions[1].ID // exchange par0, t1 tk.MustExec("alter table tp exchange partition p0 with table t1") @@ -1969,14 +1961,14 @@ func TestExchangePartitionWithPlacement(t *testing.T) { " `id` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" + "PARTITION BY RANGE (`id`)\n" + - "(PARTITION `p0` VALUES LESS THAN (100),\n" + + "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + " PARTITION `p2` VALUES LESS THAN (10000))")) tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) require.Equal(t, tpID, tp.Meta().ID) require.Equal(t, t1ID, tp.Meta().Partition.Definitions[0].ID) - require.Nil(t, tp.Meta().Partition.Definitions[0].PlacementPolicyRef) + require.NotNil(t, tp.Meta().Partition.Definitions[0].PlacementPolicyRef) t1, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) require.Equal(t, par0ID, t1.Meta().ID) @@ -1984,54 +1976,10 @@ func TestExchangePartitionWithPlacement(t *testing.T) { checkExistTableBundlesInPD(t, dom, "test", "tp") // exchange par0, t2 - tk.MustExec("alter table tp exchange partition p0 with table t2") - tk.MustQuery("show create table t2").Check(testkit.Rows("" + - "t2 CREATE TABLE `t2` (\n" + - " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) - tk.MustQuery("show create table tp").Check(testkit.Rows("" + - "tp CREATE TABLE `tp` (\n" + - " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" + - "PARTITION BY RANGE (`id`)\n" + - "(PARTITION `p0` VALUES LESS THAN (100),\n" + - " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + - " PARTITION `p2` VALUES LESS THAN (10000))")) - tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) - require.NoError(t, err) - require.Equal(t, tpID, tp.Meta().ID) - require.Equal(t, t2ID, tp.Meta().Partition.Definitions[0].ID) - require.Nil(t, tp.Meta().Partition.Definitions[0].PlacementPolicyRef) - t2, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t2")) - require.NoError(t, err) - require.Equal(t, t1ID, t2.Meta().ID) - require.Nil(t, t2.Meta().PlacementPolicyRef) - checkExistTableBundlesInPD(t, dom, "test", "tp") + tk.MustGetErrCode("alter table tp exchange partition p0 with table t2", mysql.ErrPlacementPolicyNotEqual) - // exchange par1, t1 - tk.MustExec("alter table tp exchange partition p1 with table t1") - tk.MustQuery("show create table t1").Check(testkit.Rows("" + - "t1 CREATE TABLE `t1` (\n" + - " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */")) - tk.MustQuery("show create table tp").Check(testkit.Rows("" + - "tp CREATE TABLE `tp` (\n" + - " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" + - "PARTITION BY RANGE (`id`)\n" + - "(PARTITION `p0` VALUES LESS THAN (100),\n" + - " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + - " PARTITION `p2` VALUES LESS THAN (10000))")) - tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) - require.NoError(t, err) - require.Equal(t, tpID, tp.Meta().ID) - require.Equal(t, par0ID, tp.Meta().Partition.Definitions[1].ID) - require.Equal(t, policy2.ID, tp.Meta().Partition.Definitions[1].PlacementPolicyRef.ID) - t1, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) - require.NoError(t, err) - require.Equal(t, par1ID, t1.Meta().ID) - require.Equal(t, policy1.ID, t1.Meta().PlacementPolicyRef.ID) - checkExistTableBundlesInPD(t, dom, "test", "tp") + // exchange par1, t2 + tk.MustGetErrCode("alter table tp exchange partition p1 with table t2", mysql.ErrPlacementPolicyNotEqual) } func TestPDFail(t *testing.T) { diff --git a/errno/errcode.go b/errno/errcode.go index 4054229f8dce5..da049399f1039 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1073,6 +1073,7 @@ const ( ErrHTTPServiceError = 8243 ErrPartitionColumnStatsMissing = 8244 ErrColumnInChange = 8245 + ErrPlacementPolicyNotEqual = 8246 // TiKV/PD/TiFlash errors. ErrPDServerTimeout = 9001 ErrTiKVServerTimeout = 9002 diff --git a/errno/errname.go b/errno/errname.go index a8be48e6eed06..d4aba2b107d74 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1079,7 +1079,8 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrPlacementPolicyInUse: mysql.Message("Placement policy '%-.192s' is still in use", nil), ErrOptOnCacheTable: mysql.Message("'%s' is unsupported on cache tables.", nil), - ErrColumnInChange: mysql.Message("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", nil), + ErrColumnInChange: mysql.Message("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", nil), + ErrPlacementPolicyNotEqual: mysql.Message("Placement policy does not equal during exchange partition with table", nil), // TiKV/PD errors. ErrPDServerTimeout: mysql.Message("PD server timeout", nil), ErrTiKVServerTimeout: mysql.Message("TiKV server timeout", nil), diff --git a/util/dbterror/ddl_terror.go b/util/dbterror/ddl_terror.go index d9e268c650948..087f7cb0e093f 100644 --- a/util/dbterror/ddl_terror.go +++ b/util/dbterror/ddl_terror.go @@ -330,6 +330,9 @@ var ( // ErrPlacementPolicyInUse is returned when placement policy is in use in drop/alter. ErrPlacementPolicyInUse = ClassDDL.NewStd(mysql.ErrPlacementPolicyInUse) + // ErrPlacementPolicyNotEqual is returns when exchange partition with table. + ErrPlacementPolicyNotEqual = ClassDDL.NewStd(mysql.ErrPlacementPolicyNotEqual) + // ErrMultipleDefConstInListPart returns multiple definition of same constant in list partitioning. ErrMultipleDefConstInListPart = ClassDDL.NewStd(mysql.ErrMultipleDefConstInListPart) From b0a742b40f7d264f8cda66613fcf5d30d171d87c Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 14 Jul 2022 11:14:38 +0800 Subject: [PATCH 13/23] fix --- ddl/partition.go | 18 +++++++++--------- ddl/placement_policy_test.go | 4 ++-- errno/errcode.go | 1 - errno/errname.go | 3 +-- util/dbterror/ddl_terror.go | 3 --- 5 files changed, 12 insertions(+), 17 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index 14e7ed008eef4..54ac4c845cfa1 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1489,8 +1489,8 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - placementPolicyEqual, err := checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) - if !placementPolicyEqual { + err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) + if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -1655,22 +1655,22 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde return nil } -func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) (bool, error) { +func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) error { if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil { - return true, nil + return nil } if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { - return false, dbterror.ErrPlacementPolicyNotEqual + return nil } ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { - return true, nil + return nil } if ntPlacementPolicyInfo == nil || ptPlacementPolicyInfo == nil { - return false, dbterror.ErrPlacementPolicyNotEqual + return nil } ptSetting, ntSetting := ntPlacementPolicyInfo.PlacementSettings, ptPlacementPolicyInfo.PlacementSettings @@ -1680,10 +1680,10 @@ func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *m ptSetting.Schedule != ntSetting.Schedule || ptSetting.Constraints != ntSetting.Constraints || ptSetting.LeaderConstraints != ntSetting.LeaderConstraints || ptSetting.LearnerConstraints != ntSetting.LearnerConstraints || ptSetting.FollowerConstraints != ntSetting.FollowerConstraints || ptSetting.VoterConstraints != ntSetting.VoterConstraints { - return false, dbterror.ErrPlacementPolicyNotEqual + return nil } - return true, nil + return nil } func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index 74b3c273238cf..0a1663abd85cd 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -1976,10 +1976,10 @@ func TestExchangePartitionWithPlacement(t *testing.T) { checkExistTableBundlesInPD(t, dom, "test", "tp") // exchange par0, t2 - tk.MustGetErrCode("alter table tp exchange partition p0 with table t2", mysql.ErrPlacementPolicyNotEqual) + tk.MustGetErrCode("alter table tp exchange partition p0 with table t2", mysql.ErrTablesDifferentMetadata) // exchange par1, t2 - tk.MustGetErrCode("alter table tp exchange partition p1 with table t2", mysql.ErrPlacementPolicyNotEqual) + tk.MustGetErrCode("alter table tp exchange partition p1 with table t2", mysql.ErrTablesDifferentMetadata) } func TestPDFail(t *testing.T) { diff --git a/errno/errcode.go b/errno/errcode.go index da049399f1039..4054229f8dce5 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1073,7 +1073,6 @@ const ( ErrHTTPServiceError = 8243 ErrPartitionColumnStatsMissing = 8244 ErrColumnInChange = 8245 - ErrPlacementPolicyNotEqual = 8246 // TiKV/PD/TiFlash errors. ErrPDServerTimeout = 9001 ErrTiKVServerTimeout = 9002 diff --git a/errno/errname.go b/errno/errname.go index d4aba2b107d74..a8be48e6eed06 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1079,8 +1079,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrPlacementPolicyInUse: mysql.Message("Placement policy '%-.192s' is still in use", nil), ErrOptOnCacheTable: mysql.Message("'%s' is unsupported on cache tables.", nil), - ErrColumnInChange: mysql.Message("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", nil), - ErrPlacementPolicyNotEqual: mysql.Message("Placement policy does not equal during exchange partition with table", nil), + ErrColumnInChange: mysql.Message("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", nil), // TiKV/PD errors. ErrPDServerTimeout: mysql.Message("PD server timeout", nil), ErrTiKVServerTimeout: mysql.Message("TiKV server timeout", nil), diff --git a/util/dbterror/ddl_terror.go b/util/dbterror/ddl_terror.go index 087f7cb0e093f..d9e268c650948 100644 --- a/util/dbterror/ddl_terror.go +++ b/util/dbterror/ddl_terror.go @@ -330,9 +330,6 @@ var ( // ErrPlacementPolicyInUse is returned when placement policy is in use in drop/alter. ErrPlacementPolicyInUse = ClassDDL.NewStd(mysql.ErrPlacementPolicyInUse) - // ErrPlacementPolicyNotEqual is returns when exchange partition with table. - ErrPlacementPolicyNotEqual = ClassDDL.NewStd(mysql.ErrPlacementPolicyNotEqual) - // ErrMultipleDefConstInListPart returns multiple definition of same constant in list partitioning. ErrMultipleDefConstInListPart = ClassDDL.NewStd(mysql.ErrMultipleDefConstInListPart) From db8c32b317f8a93f2456dbafd9385934108f34d9 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 14 Jul 2022 11:33:40 +0800 Subject: [PATCH 14/23] fix --- ddl/partition.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index 54ac4c845cfa1..865ba3d6768fa 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1660,27 +1660,19 @@ func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *m return nil } if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { - return nil + return dbterror.ErrTablesDifferentMetadata } ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) - if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { return nil } if ntPlacementPolicyInfo == nil || ptPlacementPolicyInfo == nil { - return nil + return dbterror.ErrTablesDifferentMetadata } - - ptSetting, ntSetting := ntPlacementPolicyInfo.PlacementSettings, ptPlacementPolicyInfo.PlacementSettings - - if ptSetting.PrimaryRegion != ntSetting.PrimaryRegion || ptSetting.Regions != ntSetting.Regions || - ptSetting.Learners != ntSetting.Learners || ptSetting.Followers != ntSetting.Followers || ptSetting.Voters != ntSetting.Voters || - ptSetting.Schedule != ntSetting.Schedule || ptSetting.Constraints != ntSetting.Constraints || - ptSetting.LeaderConstraints != ntSetting.LeaderConstraints || ptSetting.LearnerConstraints != ntSetting.LearnerConstraints || - ptSetting.FollowerConstraints != ntSetting.FollowerConstraints || ptSetting.VoterConstraints != ntSetting.VoterConstraints { - return nil + if ntPlacementPolicyInfo.Name.L != ptPlacementPolicyInfo.Name.L { + return dbterror.ErrTablesDifferentMetadata } return nil From 481e2a6955afc653d45fcdef3d902fe5c109cdd3 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 14 Jul 2022 13:14:25 +0800 Subject: [PATCH 15/23] fix --- ddl/ddl_api.go | 9 +++++---- ddl/partition.go | 10 +++++----- executor/insert_common.go | 6 +++--- executor/write.go | 6 +++--- parser/model/model.go | 11 +++++++---- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 6a5acb7072ba9..c3803bebdecf2 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1792,10 +1792,11 @@ func BuildTableInfo( collate string, ) (tbInfo *model.TableInfo, err error) { tbInfo = &model.TableInfo{ - Name: tableName, - Version: model.CurrLatestTableInfoVersion, - Charset: charset, - Collate: collate, + Name: tableName, + Version: model.CurrLatestTableInfoVersion, + Charset: charset, + Collate: collate, + ExchangePartitionInfo: &model.ExchangePartitionInfo{}, } tblColumns := make([]*table.Column, 0, len(cols)) for _, v := range cols { diff --git a/ddl/partition.go b/ddl/partition.go index 865ba3d6768fa..8f9e4318dbe38 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1389,7 +1389,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - if !nt.ExchangePartitionFlag { + if !nt.ExchangePartitionInfo.ExchangePartitionFlag { if withValidation { err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) if err != nil { @@ -1397,9 +1397,9 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } } - nt.ExchangePartitionFlag = true - nt.ExchangePartitionId = ptID - nt.ExchangePartitionDefId = defID + nt.ExchangePartitionInfo.ExchangePartitionFlag = true + nt.ExchangePartitionInfo.ExchangePartitionId = ptID + nt.ExchangePartitionInfo.ExchangePartitionDefId = defID return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) } @@ -1545,7 +1545,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Wrapf(err, "failed to notify PD the label rules") } - nt.ExchangePartitionFlag = false + nt.ExchangePartitionInfo.ExchangePartitionFlag = false ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true) if err != nil { return ver, errors.Trace(err) diff --git a/executor/insert_common.go b/executor/insert_common.go index 392cc524d68e6..b6a1368c94096 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -645,13 +645,13 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue } tbl := e.Table.Meta() // Handle exchange partition - if tbl.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo.ExchangePartitionFlag { is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) - pt, tableFound := is.TableByID(tbl.ExchangePartitionId) + pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionId) if !tableFound { return nil, errors.Errorf("exchange partition with table TableByID Failed") } - err := pt.(table.PartitionedTable).CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionDefId) + err := pt.(table.PartitionedTable).CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefId) if err != nil { return nil, err } diff --git a/executor/write.go b/executor/write.go index c6db1e99556f8..5d3616cd2fc4b 100644 --- a/executor/write.go +++ b/executor/write.go @@ -83,13 +83,13 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old // Handle exchange partition tbl := t.Meta() - if tbl.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo.ExchangePartitionFlag { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) - pt, tableFound := is.TableByID(tbl.ExchangePartitionId) + pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionId) if !tableFound { return false, errors.Errorf("exchange partition with table TableByID Failed") } - err := pt.(table.PartitionedTable).CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionDefId) + err := pt.(table.PartitionedTable).CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionInfo.ExchangePartitionDefId) if err != nil { return false, err } diff --git a/parser/model/model.go b/parser/model/model.go index 980d57c7f7736..514895e273c8a 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -449,10 +449,7 @@ type TableInfo struct { // StatsOptions is used when do analyze/auto-analyze for each table StatsOptions *StatsOptions `json:"stats_options"` - // ExchangePartitionFlag、ExchangePartitionId、ExchangePartitionDefId is used when do exchange partition with table - ExchangePartitionFlag bool `json:"exchange_partition_flag"` - ExchangePartitionId int64 `json:"exchange_partition_id"` - ExchangePartitionDefId int64 `json:"exchange_partition_def_id"` + ExchangePartitionInfo *ExchangePartitionInfo `json:"exchange_partition_info"` } type TableCacheStatusType int @@ -1045,6 +1042,12 @@ func (p PartitionType) String() string { } +type ExchangePartitionInfo struct { + ExchangePartitionFlag bool `json:"exchange_partition_flag"` + ExchangePartitionId int64 `json:"exchange_partition_id"` + ExchangePartitionDefId int64 `json:"exchange_partition_def_id"` +} + // PartitionInfo provides table partition info. type PartitionInfo struct { Type PartitionType `json:"type"` From 6d11d8711b5657e48013d704d4b4367d4b87e98c Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 14 Jul 2022 14:59:39 +0800 Subject: [PATCH 16/23] fix --- executor/insert_common.go | 6 +++++- executor/write.go | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index b6a1368c94096..c69fc470f99c2 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -651,7 +651,11 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue if !tableFound { return nil, errors.Errorf("exchange partition with table TableByID Failed") } - err := pt.(table.PartitionedTable).CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefId) + p, ok := pt.(table.PartitionedTable) + if !ok { + return nil, errors.Errorf("exchange partition process assert table partition failed.") + } + err := p.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefId) if err != nil { return nil, err } diff --git a/executor/write.go b/executor/write.go index 5d3616cd2fc4b..ea89668d83402 100644 --- a/executor/write.go +++ b/executor/write.go @@ -89,7 +89,11 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old if !tableFound { return false, errors.Errorf("exchange partition with table TableByID Failed") } - err := pt.(table.PartitionedTable).CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionInfo.ExchangePartitionDefId) + p, ok := pt.(table.PartitionedTable) + if !ok { + return false, errors.Errorf("exchange partition process assert table partition failed.") + } + err := p.CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionInfo.ExchangePartitionDefId) if err != nil { return false, err } From e2f7664eb83dd00ad7aa294bfca3c3fc1181572e Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 14 Jul 2022 17:08:55 +0800 Subject: [PATCH 17/23] fix --- executor/insert_common.go | 4 ++-- executor/write.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index c69fc470f99c2..139fc4e978179 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -649,11 +649,11 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionId) if !tableFound { - return nil, errors.Errorf("exchange partition with table TableByID Failed") + return nil, errors.Errorf("exchange partition process table by id failed") } p, ok := pt.(table.PartitionedTable) if !ok { - return nil, errors.Errorf("exchange partition process assert table partition failed.") + return nil, errors.Errorf("exchange partition process assert table partition failed") } err := p.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefId) if err != nil { diff --git a/executor/write.go b/executor/write.go index ea89668d83402..9aa38936df301 100644 --- a/executor/write.go +++ b/executor/write.go @@ -87,11 +87,11 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionId) if !tableFound { - return false, errors.Errorf("exchange partition with table TableByID Failed") + return false, errors.Errorf("exchange partition process table by id failed") } p, ok := pt.(table.PartitionedTable) if !ok { - return false, errors.Errorf("exchange partition process assert table partition failed.") + return false, errors.Errorf("exchange partition process assert table partition failed") } err := p.CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionInfo.ExchangePartitionDefId) if err != nil { From c39533c9280c127b099f1dced0473ab04e52ec15 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Thu, 14 Jul 2022 17:36:11 +0800 Subject: [PATCH 18/23] fix test --- ddl/db_partition_test.go | 87 +++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 41 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index fbbaa45c30bcb..2b0cabaf44d6e 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2262,75 +2262,67 @@ func TestExchangePartitionTableCompatiable(t *testing.T) { "alter table pt16 exchange partition p0 with table nt16;", dbterror.ErrTablesDifferentMetadata, }, - { - // auto_increment - "create table pt17 (id bigint not null primary key auto_increment) partition by hash(id) partitions 1;", - "create table nt17 (id bigint not null primary key);", - "alter table pt17 exchange partition p0 with table nt17;", - dbterror.ErrTablesDifferentMetadata, - }, - { - // auto_random - "create table pt18 (id bigint not null primary key AUTO_RANDOM) partition by hash(id) partitions 1;", - "create table nt18 (id bigint not null primary key);", - "alter table pt18 exchange partition p0 with table nt18;", - dbterror.ErrTablesDifferentMetadata, - }, { // default - "create table pt19 (id int not null default 1) partition by hash(id) partitions 1;", - "create table nt19 (id int not null);", - "alter table pt19 exchange partition p0 with table nt19;", + "create table pt17 (id int not null default 1) partition by hash(id) partitions 1;", + "create table nt17 (id int not null);", + "alter table pt17 exchange partition p0 with table nt17;", nil, }, { // view test - "create table pt20 (id int not null) partition by hash(id) partitions 1;", - "create view nt20 as select id from nt17;", - "alter table pt20 exchange partition p0 with table nt20", + "create table pt18 (id int not null) partition by hash(id) partitions 1;", + "create view nt18 as select id from nt17;", + "alter table pt18 exchange partition p0 with table nt18", dbterror.ErrCheckNoSuchTable, }, { - "create table pt21 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) stored) partition by hash(id) partitions 1;", - "create table nt21 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) virtual);", - "alter table pt21 exchange partition p0 with table nt21;", + "create table pt19 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) stored) partition by hash(id) partitions 1;", + "create table nt19 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) virtual);", + "alter table pt19 exchange partition p0 with table nt19;", dbterror.ErrUnsupportedOnGeneratedColumn, }, { - "create table pt22 (id int not null) partition by hash(id) partitions 1;", - "create table nt22 (id int default null);", - "alter table pt22 exchange partition p0 with table nt22;", + "create table pt20 (id int not null) partition by hash(id) partitions 1;", + "create table nt20 (id int default null);", + "alter table pt20 exchange partition p0 with table nt20;", dbterror.ErrTablesDifferentMetadata, }, { // unsigned - "create table pt23 (id int unsigned) partition by hash(id) partitions 1;", - "create table nt23 (id int);", - "alter table pt23 exchange partition p0 with table nt23;", + "create table pt21 (id int unsigned) partition by hash(id) partitions 1;", + "create table nt21 (id int);", + "alter table pt21 exchange partition p0 with table nt21;", dbterror.ErrTablesDifferentMetadata, }, { // zerofill - "create table pt24 (id int) partition by hash(id) partitions 1;", - "create table nt24 (id int zerofill);", - "alter table pt24 exchange partition p0 with table nt24;", + "create table pt22 (id int) partition by hash(id) partitions 1;", + "create table nt22 (id int zerofill);", + "alter table pt22 exchange partition p0 with table nt22;", dbterror.ErrTablesDifferentMetadata, }, { - "create table pt25 (id int, lname varchar(10) charset binary) partition by hash(id) partitions 1;", - "create table nt25 (id int, lname varchar(10));", - "alter table pt25 exchange partition p0 with table nt25;", + "create table pt23 (id int, lname varchar(10) charset binary) partition by hash(id) partitions 1;", + "create table nt23 (id int, lname varchar(10));", + "alter table pt23 exchange partition p0 with table nt23;", dbterror.ErrTablesDifferentMetadata, }, { - "create table pt26 (id int, a datetime on update current_timestamp) partition by hash(id) partitions 1;", - "create table nt26 (id int, a datetime);", - "alter table pt26 exchange partition p0 with table nt26;", + "create table pt25 (id int, a datetime on update current_timestamp) partition by hash(id) partitions 1;", + "create table nt25 (id int, a datetime);", + "alter table pt25 exchange partition p0 with table nt25;", nil, }, { - "create table pt27 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) virtual) partition by hash(id) partitions 1;", - "create table nt27 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(id, ' ')) virtual);", + "create table pt26 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname, ' ')) virtual) partition by hash(id) partitions 1;", + "create table nt26 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(id, ' ')) virtual);", + "alter table pt26 exchange partition p0 with table nt26;", + dbterror.ErrTablesDifferentMetadata, + }, + { + "create table pt27 (a int key, b int, index(a)) partition by hash(a) partitions 1;", + "create table nt27 (a int not null, b int, index(a));", "alter table pt27 exchange partition p0 with table nt27;", dbterror.ErrTablesDifferentMetadata, }, @@ -2352,6 +2344,20 @@ func TestExchangePartitionTableCompatiable(t *testing.T) { "alter table pt30 exchange partition p0 with table nt30;", dbterror.ErrTablesDifferentMetadata, }, + { + // auto_increment + "create table pt31 (id bigint not null primary key auto_increment) partition by hash(id) partitions 1;", + "create table nt31 (id bigint not null primary key);", + "alter table pt31 exchange partition p0 with table nt31;", + dbterror.ErrTablesDifferentMetadata, + }, + { + // auto_random + "create table pt32 (id bigint not null primary key AUTO_RANDOM) partition by hash(id) partitions 1;", + "create table nt32 (id bigint not null primary key);", + "alter table pt32 exchange partition p0 with table nt32;", + dbterror.ErrTablesDifferentMetadata, + }, } tk := testkit.NewTestKit(t, store) @@ -2375,7 +2381,6 @@ func TestExchangePartitionTableCompatiable(t *testing.T) { require.NoError(t, err) } -// TestExchangePartitionHook is temporary test, needs to be deleted before merging into master. func TestExchangePartitionHook(t *testing.T) { store, dom, clean := testkit.CreateMockStoreAndDomain(t) defer clean() From 65b00cb5bd2d9b0b0d136015873b451cba20a24d Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Fri, 15 Jul 2022 09:32:00 +0800 Subject: [PATCH 19/23] fix --- ddl/ddl_api.go | 9 ++++----- ddl/partition.go | 18 +++++------------- executor/insert_common.go | 2 +- executor/write.go | 2 +- 4 files changed, 11 insertions(+), 20 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index c3803bebdecf2..6a5acb7072ba9 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1792,11 +1792,10 @@ func BuildTableInfo( collate string, ) (tbInfo *model.TableInfo, err error) { tbInfo = &model.TableInfo{ - Name: tableName, - Version: model.CurrLatestTableInfoVersion, - Charset: charset, - Collate: collate, - ExchangePartitionInfo: &model.ExchangePartitionInfo{}, + Name: tableName, + Version: model.CurrLatestTableInfoVersion, + Charset: charset, + Collate: collate, } tblColumns := make([]*table.Column, 0, len(cols)) for _, v := range cols { diff --git a/ddl/partition.go b/ddl/partition.go index 8f9e4318dbe38..bf6b07259016b 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1388,18 +1388,12 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo if err != nil { return ver, errors.Trace(err) } - - if !nt.ExchangePartitionInfo.ExchangePartitionFlag { - if withValidation { - err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } + if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag { + nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ + ExchangePartitionFlag: true, + ExchangePartitionId: ptID, + ExchangePartitionDefId: defID, } - nt.ExchangePartitionInfo.ExchangePartitionFlag = true - nt.ExchangePartitionInfo.ExchangePartitionId = ptID - nt.ExchangePartitionInfo.ExchangePartitionDefId = defID return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) } @@ -1628,8 +1622,6 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde case model.PartitionTypeList: if len(pi.Columns) == 0 { sql, paramList = buildCheckSQLForListPartition(pi, index, schemaName, tableName) - } else if len(pi.Columns) == 1 { - sql, paramList = buildCheckSQLForListColumnsPartition(pi, index, schemaName, tableName) } else { sql, paramList = buildCheckSQLForListColumnsPartition(pi, index, schemaName, tableName) } diff --git a/executor/insert_common.go b/executor/insert_common.go index 139fc4e978179..e2698015ecafc 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -645,7 +645,7 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue } tbl := e.Table.Meta() // Handle exchange partition - if tbl.ExchangePartitionInfo.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionId) if !tableFound { diff --git a/executor/write.go b/executor/write.go index 9aa38936df301..f0849c2d40ea0 100644 --- a/executor/write.go +++ b/executor/write.go @@ -83,7 +83,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old // Handle exchange partition tbl := t.Meta() - if tbl.ExchangePartitionInfo.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionId) if !tableFound { From 583848d91a6e59a1258bbaf8c9dc495a590e6ba0 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Mon, 18 Jul 2022 12:00:15 +0800 Subject: [PATCH 20/23] fix --- ddl/partition.go | 4 ++-- executor/insert_common.go | 4 ++-- executor/write.go | 4 ++-- parser/model/model.go | 5 +++-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index bf6b07259016b..fd42e46abb12e 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1391,8 +1391,8 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag { nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ ExchangePartitionFlag: true, - ExchangePartitionId: ptID, - ExchangePartitionDefId: defID, + ExchangePartitionID: ptID, + ExchangePartitionDefID: defID, } return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) } diff --git a/executor/insert_common.go b/executor/insert_common.go index e2698015ecafc..dee087b13ae91 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -647,7 +647,7 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue // Handle exchange partition if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) - pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionId) + pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { return nil, errors.Errorf("exchange partition process table by id failed") } @@ -655,7 +655,7 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue if !ok { return nil, errors.Errorf("exchange partition process assert table partition failed") } - err := p.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefId) + err := p.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefID) if err != nil { return nil, err } diff --git a/executor/write.go b/executor/write.go index f0849c2d40ea0..2c9c8fe4107fa 100644 --- a/executor/write.go +++ b/executor/write.go @@ -85,7 +85,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old tbl := t.Meta() if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) - pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionId) + pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { return false, errors.Errorf("exchange partition process table by id failed") } @@ -93,7 +93,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old if !ok { return false, errors.Errorf("exchange partition process assert table partition failed") } - err := p.CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionInfo.ExchangePartitionDefId) + err := p.CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionInfo.ExchangePartitionDefID) if err != nil { return false, err } diff --git a/parser/model/model.go b/parser/model/model.go index 514895e273c8a..f4981d36f6aa4 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1042,10 +1042,11 @@ func (p PartitionType) String() string { } +// ExchangePartitionInfo provides exchange partition info. type ExchangePartitionInfo struct { ExchangePartitionFlag bool `json:"exchange_partition_flag"` - ExchangePartitionId int64 `json:"exchange_partition_id"` - ExchangePartitionDefId int64 `json:"exchange_partition_def_id"` + ExchangePartitionID int64 `json:"exchange_partition_id"` + ExchangePartitionDefID int64 `json:"exchange_partition_def_id"` } // PartitionInfo provides table partition info. From 2b632d6cf92e9d14590cc18fc91a390e0dc8d2db Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Wed, 20 Jul 2022 11:24:43 +0800 Subject: [PATCH 21/23] fix --- ddl/partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/partition.go b/ddl/partition.go index fd42e46abb12e..13bd9c9eac485 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1539,7 +1539,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Wrapf(err, "failed to notify PD the label rules") } - nt.ExchangePartitionInfo.ExchangePartitionFlag = false + nt.ExchangePartitionInfo = nil ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true) if err != nil { return ver, errors.Trace(err) From e29d29b213e8ff4f8d5d7a88a80896df43bbb6c1 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Wed, 20 Jul 2022 19:34:37 +0800 Subject: [PATCH 22/23] add Privilege for exchange partition --- privilege/privileges/privileges_test.go | 31 +++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/privilege/privileges/privileges_test.go b/privilege/privileges/privileges_test.go index 9b159309c0795..72c0bc2573ab7 100644 --- a/privilege/privileges/privileges_test.go +++ b/privilege/privileges/privileges_test.go @@ -95,6 +95,37 @@ func TestCheckPointGetDBPrivilege(t *testing.T) { require.True(t, terror.ErrorEqual(err, core.ErrTableaccessDenied)) } +func TestCheckExchangePartitionDBPrivilege(t *testing.T) { + store, clean := createStoreAndPrepareDB(t) + defer clean() + rootTk := testkit.NewTestKit(t, store) + + rootTk.MustExec(`CREATE USER 'tester'@'localhost';`) + rootTk.MustExec(`GRANT SELECT ON test.* TO 'tester'@'localhost';`) + rootTk.MustExec("use test") + rootTk.MustExec(`create table pt (a varchar(3)) partition by range columns (a) ( + partition p0 values less than ('3'), + partition p1 values less than ('6') + );`) + rootTk.MustExec(`create table nt (a varchar(3));`) + + tk := testkit.NewTestKit(t, store) + require.True(t, tk.Session().Auth(&auth.UserIdentity{Username: "tester", Hostname: "localhost"}, nil, nil)) + tk.MustExec("use test") + + rootTk.MustExec(`GRANT CREATE ON test.* TO 'tester'@'localhost';`) + tk.MustGetErrCode("alter table pt exchange partition p0 with table nt", mysql.ErrTableaccessDenied) + + rootTk.MustExec(`GRANT ALTER ON test.* TO 'tester'@'localhost';`) + tk.MustGetErrCode("alter table pt exchange partition p0 with table nt", mysql.ErrTableaccessDenied) + + rootTk.MustExec(`GRANT INSERT ON test.* TO 'tester'@'localhost';`) + tk.MustGetErrCode("alter table pt exchange partition p0 with table nt", mysql.ErrTableaccessDenied) + + rootTk.MustExec(`GRANT DROP ON test.* TO 'tester'@'localhost';`) + tk.MustExec("alter table pt exchange partition p0 with table nt") +} + func TestIssue22946(t *testing.T) { store, clean := createStoreAndPrepareDB(t) defer clean() From 2f4237e36baaf5a56609f24428c1ec9d49ef3be0 Mon Sep 17 00:00:00 2001 From: littlelittlehorse Date: Wed, 20 Jul 2022 22:29:10 +0800 Subject: [PATCH 23/23] set the more larger limitation --- executor/seqtest/seq_executor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 16b556cde9ca9..51cc1717f5b76 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -941,7 +941,7 @@ func TestBatchInsertDelete(t *testing.T) { atomic.StoreUint64(&kv.TxnTotalSizeLimit, originLimit) }() // Set the limitation to a small value, make it easier to reach the limitation. - atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5600) + atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5700) tk := testkit.NewTestKit(t, store) tk.MustExec("use test")