Skip to content

Commit

Permalink
partition: reorganized the execution order of partition table update …
Browse files Browse the repository at this point in the history
…operators (#52442)

close #49998, close #52380
  • Loading branch information
Defined2014 authored Apr 10, 2024
1 parent a2cb0e1 commit e540f0f
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 32 deletions.
10 changes: 0 additions & 10 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package tables

import (
"bytes"
"context"
"sync"
"time"
Expand Down Expand Up @@ -355,15 +354,6 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}
continue
}
if c.idxInfo.Global && len(value) != 0 && !bytes.Equal(value, idxVal) {
val := idxVal
err = txn.GetMemBuffer().Set(key, val)
if err != nil {
return nil, err
}
continue
}

if keyIsTempIdxKey && !tempIdxVal.IsEmpty() {
value = tempIdxVal.Current().Value
}
Expand Down
53 changes: 31 additions & 22 deletions pkg/table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,24 +1763,27 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext,
}
}

txn, err := ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
memBuffer := txn.GetMemBuffer()
sh := memBuffer.Staging()
defer memBuffer.Cleanup(sh)

// The old and new data locate in different partitions.
// Remove record from old partition and add record to new partition.
if from != to {
_, err = t.GetPartition(to).AddRecord(ctx, newData)
err = t.GetPartition(from).RemoveRecord(ctx, h, currData)
if err != nil {
return errors.Trace(err)
}
// UpdateRecord should be side effect free, but there're two steps here.
// What would happen if step1 succeed but step2 meets error? It's hard
// to rollback.
// So this special order is chosen: add record first, errors such as
// 'Key Already Exists' will generally happen during step1, errors are
// unlikely to happen in step2.
err = t.GetPartition(from).RemoveRecord(ctx, h, currData)

_, err = t.GetPartition(to).AddRecord(ctx, newData)
if err != nil {
logutil.BgLogger().Error("update partition record fails", zap.String("message", "new record inserted while old record is not removed"), zap.Error(err))
return errors.Trace(err)
}

newTo, newFrom := int64(0), int64(0)
if _, ok := t.reorganizePartitions[to]; ok {
newTo, err = t.locateReorgPartition(ectx, newData)
Expand All @@ -1798,24 +1801,28 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext,
}
if newTo == newFrom && newTo != 0 {
// Update needs to be done in StateDeleteOnly as well
tbl := t.GetPartition(newTo)
return tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched)
}
if newTo != 0 && t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly {
tbl := t.GetPartition(newTo)
_, err = tbl.AddRecord(ctx, newData)
err = t.GetPartition(newTo).UpdateRecord(gctx, ctx, h, currData, newData, touched)
if err != nil {
return errors.Trace(err)
}
memBuffer.Release(sh)
return nil
}

if newFrom != 0 {
tbl := t.GetPartition(newFrom)
err = tbl.RemoveRecord(ctx, h, currData)
err = t.GetPartition(newFrom).RemoveRecord(ctx, h, currData)
// TODO: Can this happen? When the data is not yet backfilled?
if err != nil {
return errors.Trace(err)
}
}
if newTo != 0 && t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly {
_, err = t.GetPartition(newTo).AddRecord(ctx, newData)
if err != nil {
return errors.Trace(err)
}
}
memBuffer.Release(sh)
return nil
}
tbl := t.GetPartition(to)
Expand Down Expand Up @@ -1844,21 +1851,23 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext,
if err != nil {
return errors.Trace(err)
}
memBuffer.Release(sh)
return nil
}
tbl = t.GetPartition(newFrom)
err = tbl.RemoveRecord(ctx, h, currData)
if err != nil {
return errors.Trace(err)
}
if t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly {
tbl = t.GetPartition(newTo)
_, err = tbl.AddRecord(ctx, newData)
if err != nil {
return errors.Trace(err)
}
}
tbl = t.GetPartition(newFrom)
err = tbl.RemoveRecord(ctx, h, currData)
if err != nil {
return errors.Trace(err)
}
}
memBuffer.Release(sh)
return nil
}

Expand Down
28 changes: 28 additions & 0 deletions tests/integrationtest/r/globalindex/update.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
set tidb_enable_global_index=true;
drop table if exists t;
CREATE TABLE `t` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
`c` int(11) DEFAULT NULL,
PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,
UNIQUE KEY `idx1` (`b`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
PARTITION BY HASH (`a`) PARTITIONS 5;
begin;
insert into t values (1, 2, 3);
insert into t values (2, 2, 3);
Error 1062 (23000): Duplicate entry '2' for key 't.idx1'
rollback;
drop table if exists t;
create table t ( a int, b int, c int, unique key idx(b))
partition by range( a ) (
partition p1 values less than (10),
partition p2 values less than (20),
partition p3 values less than (30)
);
begin;
insert into t values (1, 1, 1), (8, 8, 8), (11, 11, 11), (12, 12, 12);
update t set a = 2, b = 12 where a = 11;
Error 1062 (23000): Duplicate entry '12' for key 't.idx'
rollback;
set tidb_enable_global_index=default;
31 changes: 31 additions & 0 deletions tests/integrationtest/t/globalindex/update.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
set tidb_enable_global_index=true;
drop table if exists t;
CREATE TABLE `t` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
`c` int(11) DEFAULT NULL,
PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,
UNIQUE KEY `idx1` (`b`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
PARTITION BY HASH (`a`) PARTITIONS 5;

begin;
insert into t values (1, 2, 3);
--error 1062
insert into t values (2, 2, 3);
rollback;

drop table if exists t;
create table t ( a int, b int, c int, unique key idx(b))
partition by range( a ) (
partition p1 values less than (10),
partition p2 values less than (20),
partition p3 values less than (30)
);
begin;
insert into t values (1, 1, 1), (8, 8, 8), (11, 11, 11), (12, 12, 12);
--error 1062
update t set a = 2, b = 12 where a = 11;
rollback;

set tidb_enable_global_index=default;

0 comments on commit e540f0f

Please sign in to comment.