Skip to content

Commit

Permalink
*: Fixed double writing to correct hash/key partition during reorg (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss authored May 18, 2023
1 parent ca62944 commit 181f3ed
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 23 deletions.
5 changes: 3 additions & 2 deletions planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (s *partitionProcessor) getUsedKeyPartitions(ctx sessionctx.Context,
pi := tbl.Meta().Partition
partExpr := tbl.(partitionTable).PartitionExpr()
partCols, colLen := partExpr.GetPartColumnsForKeyPartition(columns)
pe := &tables.ForKeyPruning{KeyPartCols: partCols}
detachedResult, err := ranger.DetachCondAndBuildRangeForPartition(ctx, conds, partCols, colLen, ctx.GetSessionVars().RangeMaxSize)
if err != nil {
return nil, nil, err
Expand All @@ -263,7 +264,7 @@ func (s *partitionProcessor) getUsedKeyPartitions(ctx sessionctx.Context,

colVals := make([]types.Datum, 0, len(r.HighVal))
colVals = append(colVals, r.HighVal...)
idx, err := partExpr.LocateKeyPartition(pi, partCols, colVals)
idx, err := pe.LocateKeyPartition(pi.Num, colVals)
if err != nil {
// If we failed to get the point position, we can just skip and ignore it.
continue
Expand Down Expand Up @@ -305,7 +306,7 @@ func (s *partitionProcessor) getUsedKeyPartitions(ctx sessionctx.Context,
if rangeScalar < float64(pi.Num) && !highIsNull && !lowIsNull {
for i := posLow; i <= posHigh; i++ {
d := types.NewIntDatum(i)
idx, err := partExpr.LocateKeyPartition(pi, partCols, []types.Datum{d})
idx, err := pe.LocateKeyPartition(pi.Num, []types.Datum{d})
if err != nil {
// If we failed to get the point position, we can just skip and ignore it.
continue
Expand Down
46 changes: 25 additions & 21 deletions table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ func (pe *PartitionExpr) LocateKeyPartitionWithSPC(pi *model.PartitionInfo,
col := &expression.Column{}
*col = *pe.KeyPartCols[0]
col.Index = 0
return pe.LocateKeyPartition(pi, []*expression.Column{col}, r)
kp := &ForKeyPruning{KeyPartCols: []*expression.Column{col}}
return kp.LocateKeyPartition(pi.Num, r)
}

// LocateKeyPartition is the common interface used to locate the destination partition
func (pe *PartitionExpr) LocateKeyPartition(pi *model.PartitionInfo,
cols []*expression.Column, r []types.Datum) (int, error) {
func (kp *ForKeyPruning) LocateKeyPartition(numParts uint64, r []types.Datum) (int, error) {
h := crc32.NewIEEE()
for _, col := range cols {
for _, col := range kp.KeyPartCols {
val := r[col.Index]
if val.Kind() == types.KindNull {
h.Write([]byte{0})
Expand All @@ -307,7 +307,7 @@ func (pe *PartitionExpr) LocateKeyPartition(pi *model.PartitionInfo,
h.Write(data)
}
}
return int(h.Sum32() % uint32(pi.Num)), nil
return int(h.Sum32() % uint32(numParts)), nil
}

func initEvalBufferType(t *partitionedTable) {
Expand Down Expand Up @@ -1218,7 +1218,7 @@ func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi
}

// locatePartitionCommon returns the partition idx of the input record.
func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *model.PartitionInfo, partitionExpr *PartitionExpr, r []types.Datum) (int, error) {
func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *model.PartitionInfo, partitionExpr *PartitionExpr, num uint64, r []types.Datum) (int, error) {
var err error
var idx int
switch t.meta.Partition.Type {
Expand All @@ -1230,10 +1230,9 @@ func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *mod
}
case model.PartitionTypeHash:
// Note that only LIST and RANGE supports REORGANIZE PARTITION
// TODO: Add support for ADD PARTITION and COALESCE PARTITION for HASH
idx, err = t.locateHashPartition(ctx, pi, r)
idx, err = t.locateHashPartition(ctx, partitionExpr, num, r)
case model.PartitionTypeKey:
idx, err = t.locateKeyPartition(pi, r)
idx, err = partitionExpr.LocateKeyPartition(num, r)
case model.PartitionTypeList:
idx, err = t.locateListPartition(ctx, partitionExpr, r)
}
Expand All @@ -1245,7 +1244,7 @@ func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, pi *mod

func (t *partitionedTable) locatePartition(ctx sessionctx.Context, r []types.Datum) (int64, error) {
pi := t.Meta().GetPartitionInfo()
idx, err := t.locatePartitionCommon(ctx, pi, t.partitionExpr, r)
idx, err := t.locatePartitionCommon(ctx, pi, t.partitionExpr, pi.Num, r)
if err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -1254,7 +1253,17 @@ func (t *partitionedTable) locatePartition(ctx sessionctx.Context, r []types.Dat

func (t *partitionedTable) locateReorgPartition(ctx sessionctx.Context, r []types.Datum) (int64, error) {
pi := t.Meta().GetPartitionInfo()
idx, err := t.locatePartitionCommon(ctx, pi, t.reorgPartitionExpr, r)
// Note that for KEY/HASH partitioning, since we do not support LINEAR,
// all partitions will be reorganized,
// so we can use the number in Dropping or AddingDefinitions,
// depending on current state.
var numParts uint64
if pi.DDLState == model.StateDeleteReorganization {
numParts = uint64(len(pi.DroppingDefinitions))
} else {
numParts = uint64(len(pi.AddingDefinitions))
}
idx, err := t.locatePartitionCommon(ctx, pi, t.reorgPartitionExpr, numParts, r)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down Expand Up @@ -1369,8 +1378,8 @@ func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, partitio
}

// TODO: supports linear hashing
func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) {
if col, ok := t.partitionExpr.Expr.(*expression.Column); ok {
func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, partExpr *PartitionExpr, numParts uint64, r []types.Datum) (int, error) {
if col, ok := partExpr.Expr.(*expression.Column); ok {
var data types.Datum
switch r[col.Index].Kind() {
case types.KindInt64, types.KindUint64:
Expand All @@ -1383,7 +1392,7 @@ func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model
}
}
ret := data.GetInt64()
ret = ret % int64(t.meta.Partition.Num)
ret = ret % int64(numParts)
if ret < 0 {
ret = -ret
}
Expand All @@ -1392,25 +1401,20 @@ func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model
evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow)
defer t.evalBufferPool.Put(evalBuffer)
evalBuffer.SetDatums(r...)
ret, isNull, err := t.partitionExpr.Expr.EvalInt(ctx, evalBuffer.ToRow())
ret, isNull, err := partExpr.Expr.EvalInt(ctx, evalBuffer.ToRow())
if err != nil {
return 0, err
}
if isNull {
return 0, nil
}
ret = ret % int64(t.meta.Partition.Num)
ret = ret % int64(numParts)
if ret < 0 {
ret = -ret
}
return int(ret), nil
}

// TODO: supports linear hashing
func (t *partitionedTable) locateKeyPartition(pi *model.PartitionInfo, r []types.Datum) (int, error) {
return t.partitionExpr.LocateKeyPartition(pi, t.partitionExpr.KeyPartCols, r)
}

// GetPartition returns a Table, which is actually a partition.
func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable {
// Attention, can't simply use `return t.partitions[pid]` here.
Expand Down
78 changes: 78 additions & 0 deletions table/tables/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,84 @@ func TestIssue31629(t *testing.T) {
}
}

func TestAddKeyPartitionStates(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
dbName := "partSchemaVer"
tk.MustExec("create database " + dbName)
tk.MustExec("use " + dbName)
tk.MustExec(`set @@global.tidb_enable_metadata_lock = ON`)
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use " + dbName)
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use " + dbName)
tk4 := testkit.NewTestKit(t, store)
tk4.MustExec("use " + dbName)
tk.MustExec(`create table t (a int primary key, b varchar(255), key (b)) partition by hash (a) partitions 3`)
tk.MustExec(`insert into t values (1, "1")`)
tk.MustExec(`analyze table t`)
tk.MustExec("BEGIN")
tk.MustQuery(`select * from t`).Check(testkit.Rows("1 1"))
tk.MustExec(`insert into t values (2, "2")`)
syncChan := make(chan bool)
go func() {
tk2.MustExec(`alter table t add partition partitions 1`)
syncChan <- true
}()
waitFor := func(i int, s string) {
for {
res := tk4.MustQuery(`admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = 't' and job_type like 'alter table%'`).Rows()
if len(res) == 1 && res[0][i] == s {
break
}
gotime.Sleep(10 * gotime.Millisecond)
}
}
waitFor(4, "delete only")
tk3.MustExec(`BEGIN`)
tk3.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1"))
tk3.MustExec(`insert into t values (3,"3")`)

tk.MustExec(`COMMIT`)
waitFor(4, "write only")
tk.MustExec(`BEGIN`)
tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "2 2"))
tk.MustExec(`insert into t values (4,"4")`)

tk3.MustExec(`COMMIT`)
waitFor(4, "write reorganization")
tk3.MustExec(`BEGIN`)
tk3.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(11) NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY HASH (`a`) PARTITIONS 3"))
tk3.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "2 2", "3 3"))
tk3.MustExec(`insert into t values (5,"5")`)

tk.MustExec(`COMMIT`)
waitFor(4, "delete reorganization")
tk.MustExec(`BEGIN`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(11) NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY HASH (`a`) PARTITIONS 4"))
tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "2 2", "3 3", "4 4"))
tk.MustExec(`insert into t values (6,"6")`)

tk3.MustExec(`COMMIT`)
tk.MustExec(`COMMIT`)
<-syncChan
tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "2 2", "3 3", "4 4", "5 5", "6 6"))
}

type compoundSQL struct {
selectSQL string
point bool
Expand Down

0 comments on commit 181f3ed

Please sign in to comment.