Skip to content

Commit

Permalink
ddl: Regenerating AutoIDs for _tidb_rowid during Reorganize Partition (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 12, 2024
1 parent 112a821 commit 20d69a7
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 8 deletions.
30 changes: 27 additions & 3 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2609,6 +2609,8 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
})

// Set both tables to the maximum auto IDs between normal table and partitioned table.
// TODO: Fix the issue of big transactions during EXCHANGE PARTITION with AutoID.
// Similar to https://github.com/pingcap/tidb/issues/46904
newAutoIDs := meta.AutoIDGroup{
RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID),
IncrementID: mathutil.Max(ptAutoIDs.IncrementID, ntAutoIDs.IncrementID),
Expand Down Expand Up @@ -3250,9 +3252,31 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
if err != nil {
return false, errors.Trace(err)
}
pid := p.GetPhysicalID()
newKey := tablecodec.EncodeTablePrefix(pid)
newKey = append(newKey, recordKey[len(newKey):]...)
var newKey kv.Key
if w.reorgedTbl.Meta().PKIsHandle || w.reorgedTbl.Meta().IsCommonHandle {
pid := p.GetPhysicalID()
newKey = tablecodec.EncodeTablePrefix(pid)
newKey = append(newKey, recordKey[len(newKey):]...)
} else {
// Non-clustered table / not unique _tidb_rowid for the whole table
// Generate new _tidb_rowid if exists.
// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
// TODO: Which autoid allocator to use?
ids := uint64(max(1, w.batchCnt-len(w.rowRecords)))
// Keep using the original table's allocator
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.sessCtx, w.reorgedTbl, ids)
if err != nil {
return false, errors.Trace(err)
}
}
recordID, err := tables.AllocHandle(w.ctx, w.sessCtx, w.reorgedTbl)
if err != nil {
return false, errors.Trace(err)
}
newKey = tablecodec.EncodeRecordKey(p.RecordPrefix(), recordID)
}
w.rowRecords = append(w.rowRecords, &rowRecord{
key: newKey, vals: rawRow,
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3638,7 +3638,7 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
//waitFor(4, "t", "public")
//tk2.MustExec(`commit`)
// TODO: Investigate and fix, but it is also related to https://github.com/pingcap/tidb/issues/46904
require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '26' for key 't.PRIMARY'")
require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '31' for key 't.PRIMARY'")
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
Expand Down
8 changes: 5 additions & 3 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
// following AddRecord() operation.
// Make the IDs continuous benefit for the performance of TiKV.
stmtCtx := sctx.GetSessionVars().StmtCtx
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = allocHandleIDs(ctx, sctx, t, uint64(opt.ReserveAutoID))
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = AllocHandleIDs(ctx, sctx, t, uint64(opt.ReserveAutoID))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1684,11 +1684,13 @@ func AllocHandle(ctx context.Context, sctx sessionctx.Context, t table.Table) (k
}
}

_, rowID, err := allocHandleIDs(ctx, sctx, t, 1)
_, rowID, err := AllocHandleIDs(ctx, sctx, t, 1)
return kv.IntHandle(rowID), err
}

func allocHandleIDs(ctx context.Context, sctx sessionctx.Context, t table.Table, n uint64) (int64, int64, error) {
// AllocHandleIDs allocates n handle ids (_tidb_rowid), and caches the range
// in the session context.
func AllocHandleIDs(ctx context.Context, sctx sessionctx.Context, t table.Table, n uint64) (int64, int64, error) {
meta := t.Meta()
base, maxID, err := t.Allocators(sctx).Get(autoid.RowIDAllocType).Alloc(ctx, n, 1, 1)
if err != nil {
Expand Down
41 changes: 41 additions & 0 deletions tests/integrationtest/r/ddl/db_partition.result
Original file line number Diff line number Diff line change
Expand Up @@ -3291,3 +3291,44 @@ partition by range(unix_timestamp(time_recorded)) (
partition p1 values less than (1559192604)
);
set @@session.tidb_enable_table_partition = default;
# 53385
drop table if exists t,t1;
create table t (id int not null, store_id int not null ) partition by range (store_id) (partition p0 values less than (6), partition p1 values less than (11), partition p2 values less than (16), partition p3 values less than (21));
create table t1(id int not null, store_id int not null);
insert into t values (1, 1);
insert into t values (2, 17);
insert into t1 values (0, 18);
alter table t exchange partition p3 with table t1;
alter table t remove partitioning;
select * from t;
id store_id
0 18
1 1
select *,_tidb_rowid from t;
id store_id _tidb_rowid
0 18 30257
1 1 30001
drop table t, t1;
create table t (id int not null, store_id int not null ) partition by range (store_id) (partition p0 values less than (6), partition p1 values less than (11), partition p2 values less than (16), partition p3 values less than (21));
create table t1(id int not null, store_id int not null);
insert into t values (1, 1);
insert into t values (2, 17);
insert into t1 values (0, 18);
alter table t exchange partition p3 with table t1;
select *, _tidb_rowid from t;
id store_id _tidb_rowid
0 18 1
1 1 1
select *, _tidb_rowid from t1;
id store_id _tidb_rowid
2 17 2
alter table t reorganize partition p0, p1, p2, p3 into (partition pMax values less than (maxvalue));
select * from t;
id store_id
0 18
1 1
select *,_tidb_rowid from t;
id store_id _tidb_rowid
0 18 30257
1 1 30001
drop table t, t1;
33 changes: 32 additions & 1 deletion tests/integrationtest/t/ddl/db_partition.test
Original file line number Diff line number Diff line change
Expand Up @@ -2209,4 +2209,35 @@ create table t5 ( time_recorded timestamp )
partition by range(unix_timestamp(time_recorded)) (
partition p1 values less than (1559192604)
);
set @@session.tidb_enable_table_partition = default;
set @@session.tidb_enable_table_partition = default;

--echo # 53385
drop table if exists t,t1;
create table t (id int not null, store_id int not null ) partition by range (store_id) (partition p0 values less than (6), partition p1 values less than (11), partition p2 values less than (16), partition p3 values less than (21));
create table t1(id int not null, store_id int not null);
insert into t values (1, 1);
insert into t values (2, 17);
insert into t1 values (0, 18);
alter table t exchange partition p3 with table t1;
alter table t remove partitioning;
--sorted_result
select * from t;
--sorted_result
select *,_tidb_rowid from t;
drop table t, t1;
create table t (id int not null, store_id int not null ) partition by range (store_id) (partition p0 values less than (6), partition p1 values less than (11), partition p2 values less than (16), partition p3 values less than (21));
create table t1(id int not null, store_id int not null);
insert into t values (1, 1);
insert into t values (2, 17);
insert into t1 values (0, 18);
alter table t exchange partition p3 with table t1;
--sorted_result
select *, _tidb_rowid from t;
--sorted_result
select *, _tidb_rowid from t1;
alter table t reorganize partition p0, p1, p2, p3 into (partition pMax values less than (maxvalue));
--sorted_result
select * from t;
--sorted_result
select *,_tidb_rowid from t;
drop table t, t1;

0 comments on commit 20d69a7

Please sign in to comment.