Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: support remove and add partitioning #9670

Merged
merged 6 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Snapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) {
case timodel.ActionCreateTable, timodel.ActionCreateView, timodel.ActionRecoverTable:
// no pre table info
return nil, nil
case timodel.ActionRenameTable, timodel.ActionDropTable, timodel.ActionDropView, timodel.ActionTruncateTable:
case timodel.ActionRenameTable, timodel.ActionDropTable, timodel.ActionDropView, timodel.ActionTruncateTable, timodel.ActionAlterTablePartitioning, timodel.ActionRemovePartitioning:
// get the table will be dropped
table, ok := s.PhysicalTableByID(job.TableID)
if !ok {
Expand Down Expand Up @@ -400,17 +400,18 @@ func (s *Snapshot) Drop() {
s.inner.drop()
}

func getWrapTableInfo(job *timodel.Job) *model.TableInfo {
return model.WrapTableInfo(job.SchemaID, job.SchemaName,
job.BinlogInfo.FinishedTS,
job.BinlogInfo.TableInfo)
}

// DoHandleDDL is like HandleDDL but doesn't fill schema name into job.
// NOTE: it's public because some tests in the upper package need this.
func (s *Snapshot) DoHandleDDL(job *timodel.Job) error {
s.rwlock.Lock()
defer s.rwlock.Unlock()

getWrapTableInfo := func(job *timodel.Job) *model.TableInfo {
return model.WrapTableInfo(job.SchemaID, job.SchemaName,
job.BinlogInfo.FinishedTS,
job.BinlogInfo.TableInfo)
}
switch job.Type {
case timodel.ActionCreateSchema:
// get the DBInfo from job rawArgs
Expand Down Expand Up @@ -480,6 +481,11 @@ func (s *Snapshot) DoHandleDDL(job *timodel.Job) error {
if err != nil {
return errors.Trace(err)
}
case timodel.ActionRemovePartitioning, timodel.ActionAlterTablePartitioning:
err := s.inner.alterPartitioning(job)
if err != nil {
return errors.Trace(err)
}
default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down Expand Up @@ -1004,6 +1010,27 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
return nil
}

// alterPartitioning changes the table id and updates the TableInfo (including the partitioning info)
func (s *snapshot) alterPartitioning(job *timodel.Job) error {
// first drop the table (will work with both partitioned and non-partitioned tables
err := s.dropTable(job.TableID, job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}
// (re)create table, again will work with both partitioned and non-paritioned tables
// it uses the model.TableInfo written to the job.BinlogInfo, which is the final one
err = s.createTable(getWrapTableInfo(job), job.BinlogInfo.FinishedTS)
if err != nil {
return errors.Trace(err)
}

log.Info("handle alter partitioning success",
zap.Int64("OldID", job.TableID),
zap.Int64("NewID", job.BinlogInfo.TableInfo.ID),
zap.String("Name", job.TableName))
return nil
}

func (s *snapshot) renameTables(job *timodel.Job, currentTs uint64) error {
var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64
var newTableNames, oldSchemaNames []*timodel.CIStr
Expand Down
4 changes: 4 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var nonGlobalDDLs = map[timodel.ActionType]struct{}{
timodel.ActionReorganizePartition: {},
timodel.ActionAlterTTLInfo: {},
timodel.ActionAlterTTLRemove: {},
timodel.ActionAlterTablePartitioning: {},
timodel.ActionRemovePartitioning: {},
}

var redoBarrierDDLs = map[timodel.ActionType]struct{}{
Expand All @@ -86,6 +88,8 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{
timodel.ActionTruncateTablePartition: {},
timodel.ActionRecoverTable: {},
timodel.ActionReorganizePartition: {},
timodel.ActionAlterTablePartitioning: {},
timodel.ActionRemovePartitioning: {},
}

// ddlManager holds the pending DDL events of all tables and responsible for
Expand Down
2 changes: 2 additions & 0 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ var allowDDLList = []timodel.ActionType{
timodel.ActionReorganizePartition,
timodel.ActionAlterTTLInfo,
timodel.ActionAlterTTLRemove,
timodel.ActionAlterTablePartitioning,
timodel.ActionRemovePartitioning,
}

// Filter are safe for concurrent use.
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/codec/canal/canal_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func convertDdlEventType(e *model.DDLEvent) canal.EventType {
mm.ActionSetDefaultValue, mm.ActionModifyTableComment, mm.ActionRenameIndex, mm.ActionAddTablePartition,
mm.ActionDropTablePartition, mm.ActionModifyTableCharsetAndCollate, mm.ActionTruncateTablePartition,
mm.ActionAlterIndexVisibility, mm.ActionMultiSchemaChange, mm.ActionReorganizePartition,
mm.ActionAlterTablePartitioning, mm.ActionRemovePartitioning,
// AddColumns and DropColumns are removed in TiDB v6.2.0, see https://github.com/pingcap/tidb/pull/35862.
mm.ActionAddColumns, mm.ActionDropColumns:
return canal.EventType_ALTER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ check-struct-only = false

target-instance = "tidb0"

target-check-tables = ["partition_table.?*"]
target-check-tables = ["partition_table*.*"]

[data-sources]
[data-sources.mysql1]
Expand Down
13 changes: 12 additions & 1 deletion tests/integration_tests/partition_table/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
drop database if exists `partition_table`;
set @@global.tidb_enable_exchange_partition=on;
drop database if exists `partition_table2`;
create database `partition_table`;
use `partition_table`;

Expand All @@ -21,6 +21,12 @@ alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
update t1 set a=a+10 where a=9;

/* Remove partitioning + add partitioning back again */
alter table t remove partitioning;
insert into t values (20),(21),(22),(23),(24),(25);
alter table t partition by hash (a) partitions 5;
insert into t values (30),(31),(32),(33),(34),(35);

/* exchange partition case 1: source table and target table in same database */
create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
Expand All @@ -43,4 +49,9 @@ insert into t1 values (-3),(5),(14),(22),(30),(100);
update t1 set a=a-16 where a=12;
delete from t1 where a = 29;

/* Change partitioning to key based and then back to range */
alter table t1 partition by key(a) partitions 7;
insert into t1 values (-2001),(2001),(2002),(-2002),(-2003),(2003),(-2004),(2004),(-2005),(2005),(2006),(-2006),(2007),(-2007);
ALTER TABLE t1 partition by range(a) (partition p0 values less than (5), PARTITION p2 VALUES LESS THAN (20), PARTITION p3 VALUES LESS THAN (26), PARTITION p4 VALUES LESS THAN (35), PARTITION pMax VALUES LESS THAN (MAXVALUE));

create table finish_mark (a int primary key);