From 4a172ebeefe11edab31e83f1772fe848918f55f2 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Wed, 17 May 2023 00:01:18 +0200 Subject: [PATCH 1/3] Fixed double writing to correct hash/key partition during reorg --- planner/core/point_get_plan.go | 2 +- planner/core/rule_partition_processor.go | 5 +- planner/core/util.go | 2 +- table/tables/partition.go | 53 +++++++--------- table/tables/partition_test.go | 81 ++++++++++++++++++++++++ 5 files changed, 109 insertions(+), 34 deletions(-) diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 43241b68fde7a..d4aeb7c824bba 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -1818,7 +1818,7 @@ func getPartitionInfo(ctx sessionctx.Context, tbl *model.TableInfo, pairs []name if len(pi.Columns) == 1 { for i, pair := range pairs { if pi.Columns[0].L == pair.colName { - pos, err := partitionExpr.LocateKeyPartitionWithSPC(pi, []types.Datum{pair.value}) + pos, err := partitionExpr.LocateKeyPartition(pi.Num, []types.Datum{pair.value}) if err != nil { return nil, 0, 0, false } diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index 21e262b961601..0521dd7f69b2d 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -247,6 +247,7 @@ func (s *partitionProcessor) getUsedKeyPartitions(ctx sessionctx.Context, pi := tbl.Meta().Partition partExpr := tbl.(partitionTable).PartitionExpr() partCols, colLen := partExpr.GetPartColumnsForKeyPartition(columns) + pe := &tables.ForKeyPruning{KeyPartCols: partCols} detachedResult, err := ranger.DetachCondAndBuildRangeForPartition(ctx, conds, partCols, colLen, ctx.GetSessionVars().RangeMaxSize) if err != nil { return nil, nil, err @@ -263,7 +264,7 @@ func (s *partitionProcessor) getUsedKeyPartitions(ctx sessionctx.Context, colVals := make([]types.Datum, 0, len(r.HighVal)) colVals = append(colVals, r.HighVal...) - idx, err := partExpr.LocateKeyPartition(pi, partCols, colVals) + idx, err := pe.LocateKeyPartition(pi.Num, colVals) if err != nil { // If we failed to get the point position, we can just skip and ignore it. continue @@ -305,7 +306,7 @@ func (s *partitionProcessor) getUsedKeyPartitions(ctx sessionctx.Context, if rangeScalar < float64(pi.Num) && !highIsNull && !lowIsNull { for i := posLow; i <= posHigh; i++ { d := types.NewIntDatum(i) - idx, err := partExpr.LocateKeyPartition(pi, partCols, []types.Datum{d}) + idx, err := pe.LocateKeyPartition(pi.Num, []types.Datum{d}) if err != nil { // If we failed to get the point position, we can just skip and ignore it. continue diff --git a/planner/core/util.go b/planner/core/util.go index 3aff180018815..2b25aec4abe25 100644 --- a/planner/core/util.go +++ b/planner/core/util.go @@ -432,7 +432,7 @@ func GetPhysID(tblInfo *model.TableInfo, partitionExpr *tables.PartitionExpr, d if len(pi.Columns) > 1 { return 0, errors.Errorf("unsupported partition type in BatchGet") } - partIdx, err := partitionExpr.LocateKeyPartitionWithSPC(pi, []types.Datum{d}) + partIdx, err := partitionExpr.LocateKeyPartition(pi.Num, []types.Datum{d}) if err != nil { return 0, errors.Errorf("unsupported partition type in BatchGet") } diff --git a/table/tables/partition.go b/table/tables/partition.go index ae579a5b7ba95..94f65ebbcf479 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -281,21 +281,10 @@ func (pe *PartitionExpr) GetPartColumnsForKeyPartition(columns []*expression.Col return partCols, colLen } -// LocateKeyPartitionWithSPC is used to locate the destination partition for key -// partition table has single partition column(SPC). It's called in FastPlan process. -func (pe *PartitionExpr) LocateKeyPartitionWithSPC(pi *model.PartitionInfo, - r []types.Datum) (int, error) { - col := &expression.Column{} - *col = *pe.KeyPartCols[0] - col.Index = 0 - return pe.LocateKeyPartition(pi, []*expression.Column{col}, r) -} - // LocateKeyPartition is the common interface used to locate the destination partition -func (pe *PartitionExpr) LocateKeyPartition(pi *model.PartitionInfo, - cols []*expression.Column, r []types.Datum) (int, error) { +func (kp *ForKeyPruning) LocateKeyPartition(numParts uint64, r []types.Datum) (int, error) { h := crc32.NewIEEE() - for _, col := range cols { + for _, col := range kp.KeyPartCols { val := r[col.Index] if val.Kind() == types.KindNull { h.Write([]byte{0}) @@ -307,7 +296,7 @@ func (pe *PartitionExpr) LocateKeyPartition(pi *model.PartitionInfo, h.Write(data) } } - return int(h.Sum32() % uint32(pi.Num)), nil + return int(h.Sum32() % uint32(numParts)), nil } func initEvalBufferType(t *partitionedTable) { @@ -1218,7 +1207,7 @@ func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi } // locatePartitionCommon returns the partition idx of the input record. -func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *model.PartitionInfo, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { +func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *model.PartitionInfo, partitionExpr *PartitionExpr, num uint64, r []types.Datum) (int, error) { var err error var idx int switch t.meta.Partition.Type { @@ -1230,10 +1219,9 @@ func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *mod } case model.PartitionTypeHash: // Note that only LIST and RANGE supports REORGANIZE PARTITION - // TODO: Add support for ADD PARTITION and COALESCE PARTITION for HASH - idx, err = t.locateHashPartition(ctx, pi, r) + idx, err = t.locateHashPartition(ctx, partitionExpr, num, r) case model.PartitionTypeKey: - idx, err = t.locateKeyPartition(pi, r) + idx, err = partitionExpr.LocateKeyPartition(num, r) case model.PartitionTypeList: idx, err = t.locateListPartition(ctx, partitionExpr, r) } @@ -1245,7 +1233,7 @@ func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *mod func (t *partitionedTable) locatePartition(ctx sessionctx.Context, r []types.Datum) (int64, error) { pi := t.Meta().GetPartitionInfo() - idx, err := t.locatePartitionCommon(ctx, pi, t.partitionExpr, r) + idx, err := t.locatePartitionCommon(ctx, pi, t.partitionExpr, pi.Num, r) if err != nil { return 0, errors.Trace(err) } @@ -1254,7 +1242,17 @@ func (t *partitionedTable) locatePartition(ctx sessionctx.Context, r []types.Dat func (t *partitionedTable) locateReorgPartition(ctx sessionctx.Context, r []types.Datum) (int64, error) { pi := t.Meta().GetPartitionInfo() - idx, err := t.locatePartitionCommon(ctx, pi, t.reorgPartitionExpr, r) + // Note that for KEY/HASH partitioning, since we do not support LINEAR, + // all partitions will be reorganized, + // so we can use the number in Dropping or AddingDefinitions, + // depending on current state. + var numParts uint64 + if pi.DDLState == model.StateDeleteReorganization { + numParts = uint64(len(pi.DroppingDefinitions)) + } else { + numParts = uint64(len(pi.AddingDefinitions)) + } + idx, err := t.locatePartitionCommon(ctx, pi, t.reorgPartitionExpr, numParts, r) if err != nil { return 0, errors.Trace(err) } @@ -1369,8 +1367,8 @@ func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, partitio } // TODO: supports linear hashing -func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { - if col, ok := t.partitionExpr.Expr.(*expression.Column); ok { +func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, partExpr *PartitionExpr, numParts uint64, r []types.Datum) (int, error) { + if col, ok := partExpr.Expr.(*expression.Column); ok { var data types.Datum switch r[col.Index].Kind() { case types.KindInt64, types.KindUint64: @@ -1383,7 +1381,7 @@ func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model } } ret := data.GetInt64() - ret = ret % int64(t.meta.Partition.Num) + ret = ret % int64(numParts) if ret < 0 { ret = -ret } @@ -1392,25 +1390,20 @@ func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow) defer t.evalBufferPool.Put(evalBuffer) evalBuffer.SetDatums(r...) - ret, isNull, err := t.partitionExpr.Expr.EvalInt(ctx, evalBuffer.ToRow()) + ret, isNull, err := partExpr.Expr.EvalInt(ctx, evalBuffer.ToRow()) if err != nil { return 0, err } if isNull { return 0, nil } - ret = ret % int64(t.meta.Partition.Num) + ret = ret % int64(numParts) if ret < 0 { ret = -ret } return int(ret), nil } -// TODO: supports linear hashing -func (t *partitionedTable) locateKeyPartition(pi *model.PartitionInfo, r []types.Datum) (int, error) { - return t.partitionExpr.LocateKeyPartition(pi, t.partitionExpr.KeyPartCols, r) -} - // GetPartition returns a Table, which is actually a partition. func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable { // Attention, can't simply use `return t.partitions[pid]` here. diff --git a/table/tables/partition_test.go b/table/tables/partition_test.go index b5ce43468493a..16ce149597396 100644 --- a/table/tables/partition_test.go +++ b/table/tables/partition_test.go @@ -709,6 +709,87 @@ func TestIssue31629(t *testing.T) { } } +func TestAddKeyPartitionStates(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + dbName := "partSchemaVer" + tk.MustExec("create database " + dbName) + tk.MustExec("use " + dbName) + tk.MustExec(`set @@global.tidb_enable_metadata_lock = ON`) + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use " + dbName) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use " + dbName) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec("use " + dbName) + tk.MustExec(`create table t (a int primary key, b varchar(255), key (b)) partition by hash (a) partitions 3`) + tk.MustExec(`insert into t values (1, "1")`) + tk.MustExec(`analyze table t`) + tk.MustExec("BEGIN") + tk.MustQuery(`select * from t`).Check(testkit.Rows("1 1")) + tk.MustExec(`insert into t values (2, "2")`) + syncChan := make(chan bool) + go func() { + tk2.MustExec(`alter table t add partition partitions 1`) + syncChan <- true + }() + waitFor := func(i int, s string) { + for true { + //res := tk4.MustQuery(`admin show ddl jobs`).Rows() + //res := tk4.MustQuery(`admin show ddl jobs where db_name = '` + dbName + `'`).Rows() + res := tk4.MustQuery(`admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = 't' and job_type like 'alter table%'`).Rows() + if len(res) == 1 && res[0][i] == s { + break + } else { + gotime.Sleep(10 * gotime.Millisecond) + } + } + } + waitFor(4, "delete only") + tk3.MustExec(`BEGIN`) + tk3.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1")) + tk3.MustExec(`insert into t values (3,"3")`) + + tk.MustExec(`COMMIT`) + waitFor(4, "write only") + tk.MustExec(`BEGIN`) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "2 2")) + tk.MustExec(`insert into t values (4,"4")`) + + tk3.MustExec(`COMMIT`) + waitFor(4, "write reorganization") + tk3.MustExec(`BEGIN`) + tk3.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY HASH (`a`) PARTITIONS 3")) + tk3.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "2 2", "3 3")) + tk3.MustExec(`insert into t values (5,"5")`) + + tk.MustExec(`COMMIT`) + waitFor(4, "delete reorganization") + tk.MustExec(`BEGIN`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY HASH (`a`) PARTITIONS 4")) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "2 2", "3 3", "4 4")) + tk.MustExec(`insert into t values (6,"6")`) + + tk3.MustExec(`COMMIT`) + tk.MustExec(`COMMIT`) + <-syncChan + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "2 2", "3 3", "4 4", "5 5", "6 6")) +} + type compoundSQL struct { selectSQL string point bool From 13afc5a872496ce433fabd694cf72d8c26184008 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Wed, 17 May 2023 00:22:35 +0200 Subject: [PATCH 2/3] Linting --- table/tables/partition_test.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/table/tables/partition_test.go b/table/tables/partition_test.go index 16ce149597396..e0bc678057700 100644 --- a/table/tables/partition_test.go +++ b/table/tables/partition_test.go @@ -734,15 +734,12 @@ func TestAddKeyPartitionStates(t *testing.T) { syncChan <- true }() waitFor := func(i int, s string) { - for true { - //res := tk4.MustQuery(`admin show ddl jobs`).Rows() - //res := tk4.MustQuery(`admin show ddl jobs where db_name = '` + dbName + `'`).Rows() + for { res := tk4.MustQuery(`admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = 't' and job_type like 'alter table%'`).Rows() if len(res) == 1 && res[0][i] == s { break - } else { - gotime.Sleep(10 * gotime.Millisecond) } + gotime.Sleep(10 * gotime.Millisecond) } } waitFor(4, "delete only") From ebd22074a3bd525d91e24d7228fb0f25fd83d2d9 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Wed, 17 May 2023 01:18:25 +0200 Subject: [PATCH 3/3] Reverted changes for LocateKeyPartitionWithSPC --- planner/core/point_get_plan.go | 2 +- planner/core/util.go | 2 +- table/tables/partition.go | 11 +++++++++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index d4aeb7c824bba..43241b68fde7a 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -1818,7 +1818,7 @@ func getPartitionInfo(ctx sessionctx.Context, tbl *model.TableInfo, pairs []name if len(pi.Columns) == 1 { for i, pair := range pairs { if pi.Columns[0].L == pair.colName { - pos, err := partitionExpr.LocateKeyPartition(pi.Num, []types.Datum{pair.value}) + pos, err := partitionExpr.LocateKeyPartitionWithSPC(pi, []types.Datum{pair.value}) if err != nil { return nil, 0, 0, false } diff --git a/planner/core/util.go b/planner/core/util.go index 2b25aec4abe25..3aff180018815 100644 --- a/planner/core/util.go +++ b/planner/core/util.go @@ -432,7 +432,7 @@ func GetPhysID(tblInfo *model.TableInfo, partitionExpr *tables.PartitionExpr, d if len(pi.Columns) > 1 { return 0, errors.Errorf("unsupported partition type in BatchGet") } - partIdx, err := partitionExpr.LocateKeyPartition(pi.Num, []types.Datum{d}) + partIdx, err := partitionExpr.LocateKeyPartitionWithSPC(pi, []types.Datum{d}) if err != nil { return 0, errors.Errorf("unsupported partition type in BatchGet") } diff --git a/table/tables/partition.go b/table/tables/partition.go index 94f65ebbcf479..7df9e36e74e3d 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -281,6 +281,17 @@ func (pe *PartitionExpr) GetPartColumnsForKeyPartition(columns []*expression.Col return partCols, colLen } +// LocateKeyPartitionWithSPC is used to locate the destination partition for key +// partition table has single partition column(SPC). It's called in FastPlan process. +func (pe *PartitionExpr) LocateKeyPartitionWithSPC(pi *model.PartitionInfo, + r []types.Datum) (int, error) { + col := &expression.Column{} + *col = *pe.KeyPartCols[0] + col.Index = 0 + kp := &ForKeyPruning{KeyPartCols: []*expression.Column{col}} + return kp.LocateKeyPartition(pi.Num, r) +} + // LocateKeyPartition is the common interface used to locate the destination partition func (kp *ForKeyPruning) LocateKeyPartition(numParts uint64, r []types.Datum) (int, error) { h := crc32.NewIEEE()