From ca9c4e2b9680d54eeaeacdd4d844b4427211b64b Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 16 May 2023 10:47:17 +0800 Subject: [PATCH] schema, sink(ticdc): fix exchange partition (#8955) (#8962) close pingcap/tiflow#8914 --- cdc/entry/schema/snapshot.go | 41 ++++++++++------ .../cloudstorage/cloud_storage_ddl_sink.go | 47 ++++++++++++------- .../data/prepare.sql | 10 ++-- .../data/prepare.sql | 10 ++-- 4 files changed, 66 insertions(+), 42 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index e923480aaae..264dbc14dea 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -55,6 +55,10 @@ func (s *Snapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) { case timodel.ActionRenameTables: // DDL on multiple tables, ignore pre table info return nil, nil + case timodel.ActionExchangeTablePartition: + // get the table will be exchanged + table, _, err := s.inner.getSourceTable(job.BinlogInfo.TableInfo) + return table, err default: binlogInfo := job.BinlogInfo if binlogInfo == nil { @@ -885,26 +889,21 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er return nil } -// exchangePartition find the partition's id in the old table info of targetTable, -// and find the sourceTable's id in the new table info of targetTable. -// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot. -// Finally, update both the targetTable's info and the sourceTable's info in snapshot. -func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error { - var sourceTable *model.TableInfo +func (s *snapshot) getSourceTable(targetTable *timodel.TableInfo) (*model.TableInfo, int64, error) { oldTable, ok := s.physicalTableByID(targetTable.ID) if !ok { - return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID) + return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID) } oldPartitions := oldTable.GetPartitionInfo() if oldPartitions == nil { - return cerror.ErrSnapshotTableNotFound. + return nil, 0, cerror.ErrSnapshotTableNotFound. GenWithStack("table %d is not a partitioned table", oldTable.ID) } newPartitions := targetTable.GetPartitionInfo() if newPartitions == nil { - return cerror.ErrSnapshotTableNotFound. + return nil, 0, cerror.ErrSnapshotTableNotFound. GenWithStack("table %d is not a partitioned table", targetTable.ID) } @@ -926,14 +925,13 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin } } if len(diff) != 1 { - return cerror.ErrExchangePartition. + return nil, 0, cerror.ErrExchangePartition. GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff)) } - sourceTable, ok = s.physicalTableByID(diff[0]) + sourceTable, ok := s.physicalTableByID(diff[0]) if !ok { - return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0]) + return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0]) } - // 3.find the exchanged partition info diff = diff[:0] for id := range oldIDs { @@ -942,13 +940,26 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin } } if len(diff) != 1 { - return cerror.ErrExchangePartition. + return nil, 0, cerror.ErrExchangePartition. GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff)) } exchangedPartitionID := diff[0] + return sourceTable, exchangedPartitionID, nil +} + +// exchangePartition find the partition's id in the old table info of targetTable, +// and find the sourceTable's id in the new table info of targetTable. +// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot. +// Finally, update both the targetTable's info and the sourceTable's info in snapshot. +func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error { + var sourceTable *model.TableInfo + sourceTable, exchangedPartitionID, err := s.getSourceTable(targetTable.TableInfo) + if err != nil { + return errors.Trace(err) + } // 4.update the targetTable - err := s.updatePartition(targetTable, currentTS) + err = s.updatePartition(targetTable, currentTS) if err != nil { return errors.Trace(err) } diff --git a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index 3deab248e4f..82959002319 100644 --- a/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -62,29 +63,41 @@ func NewDDLSink(ctx context.Context, sinkURI *url.URL) (*DDLSink, error) { // WriteDDLEvent writes the ddl event to the cloud storage. func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + writeFile := func(def cloudstorage.TableDefinition) error { + encodedDef, err := def.MarshalWithQuery() + if err != nil { + return errors.Trace(err) + } + + path, err := def.GenerateSchemaFilePath() + if err != nil { + return errors.Trace(err) + } + log.Debug("write ddl event to external storage", + zap.String("path", path), zap.Any("ddl", ddl)) + return d.statistics.RecordDDLExecution(func() error { + err1 := d.storage.WriteFile(ctx, path, encodedDef) + if err1 != nil { + return err1 + } + + return nil + }) + } + var def cloudstorage.TableDefinition def.FromDDLEvent(ddl) - encodedDef, err := def.MarshalWithQuery() - if err != nil { + if err := writeFile(def); err != nil { return errors.Trace(err) } - path, err := def.GenerateSchemaFilePath() - if err != nil { - return errors.Trace(err) + if ddl.Type == timodel.ActionExchangeTablePartition { + // For exchange partition, we need to write the schema of the source table. + var sourceTableDef cloudstorage.TableDefinition + sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version) + return writeFile(sourceTableDef) } - log.Debug("write ddl event to external storage", - zap.String("path", path), zap.Any("ddl", ddl)) - err = d.statistics.RecordDDLExecution(func() error { - err1 := d.storage.WriteFile(ctx, path, encodedDef) - if err1 != nil { - return err1 - } - - return nil - }) - - return errors.Trace(err) + return nil } // WriteCheckpointTs writes the checkpoint ts to the cloud storage. diff --git a/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql b/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql index 9c1ca0aa229..3f2632e1c48 100644 --- a/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql +++ b/tests/integration_tests/canal_json_storage_partition_table/data/prepare.sql @@ -21,11 +21,11 @@ alter table t1 drop partition p1; insert into t1 values (7),(8),(9); update t1 set a=a+10 where a=9; -/* TODO(CharlesCheung): EXCHANGE PARTITION will be supported in the future */ --- create table t2 (a int primary key); --- ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; --- insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ --- insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/ +/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */ +create table t2 (a int primary key); +ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; +insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ +insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/ ALTER TABLE t1 REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (5), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN (21)); insert into t1 values (-1),(6),(13); diff --git a/tests/integration_tests/csv_storage_partition_table/data/prepare.sql b/tests/integration_tests/csv_storage_partition_table/data/prepare.sql index d696ed60114..5dcbc6f96a3 100644 --- a/tests/integration_tests/csv_storage_partition_table/data/prepare.sql +++ b/tests/integration_tests/csv_storage_partition_table/data/prepare.sql @@ -21,11 +21,11 @@ alter table t1 drop partition p1; insert into t1 values (7),(8),(9); -- update t1 set a=a+10 where a=9; -/* TODO(CharlesCheung): EXCHANGE PARTITION will be supported in the future */ --- create table t2 (a int primary key); --- ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; --- insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ --- insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/ +/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */ +create table t2 (a int primary key); +ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2; +insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/ +insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/ ALTER TABLE t1 REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (5), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN (21)); insert into t1 values (-1),(6),(13);