Skip to content

Commit

Permalink
This is an automated cherry-pick of #8955
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed May 15, 2023
1 parent 0287295 commit 908a56d
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 18 deletions.
41 changes: 26 additions & 15 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (s *Snapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) {
case timodel.ActionRenameTables:
// DDL on multiple tables, ignore pre table info
return nil, nil
case timodel.ActionExchangeTablePartition:
// get the table will be exchanged
table, _, err := s.inner.getSourceTable(job.BinlogInfo.TableInfo)
return table, err
default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down Expand Up @@ -907,26 +911,21 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er
return nil
}

// exchangePartition find the partition's id in the old table info of targetTable,
// and find the sourceTable's id in the new table info of targetTable.
// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot.
// Finally, update both the targetTable's info and the sourceTable's info in snapshot.
func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error {
var sourceTable *model.TableInfo
func (s *snapshot) getSourceTable(targetTable *timodel.TableInfo) (*model.TableInfo, int64, error) {
oldTable, ok := s.physicalTableByID(targetTable.ID)
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID)
return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID)
}

oldPartitions := oldTable.GetPartitionInfo()
if oldPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
return nil, 0, cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", oldTable.ID)
}

newPartitions := targetTable.GetPartitionInfo()
if newPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
return nil, 0, cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", targetTable.ID)
}

Expand All @@ -948,14 +947,13 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
return nil, 0, cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}
sourceTable, ok = s.physicalTableByID(diff[0])
sourceTable, ok := s.physicalTableByID(diff[0])
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0])
return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0])
}

// 3.find the exchanged partition info
diff = diff[:0]
for id := range oldIDs {
Expand All @@ -964,13 +962,26 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
return nil, 0, cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}

exchangedPartitionID := diff[0]
return sourceTable, exchangedPartitionID, nil
}

// exchangePartition find the partition's id in the old table info of targetTable,
// and find the sourceTable's id in the new table info of targetTable.
// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot.
// Finally, update both the targetTable's info and the sourceTable's info in snapshot.
func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error {
var sourceTable *model.TableInfo
sourceTable, exchangedPartitionID, err := s.getSourceTable(targetTable.TableInfo)
if err != nil {
return errors.Trace(err)
}
// 4.update the targetTable
err := s.updatePartition(targetTable, currentTS)
err = s.updatePartition(targetTable, currentTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
41 changes: 38 additions & 3 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink"
Expand Down Expand Up @@ -58,6 +59,7 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er
return d, nil
}

<<<<<<< HEAD:cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
func generateSchemaPath(def cloudstorage.TableDefinition) string {
return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion)
}
Expand All @@ -80,12 +82,45 @@ func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
return err1
=======
// WriteDDLEvent writes the ddl event to the cloud storage.
func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
writeFile := func(def cloudstorage.TableDefinition) error {
encodedDef, err := def.MarshalWithQuery()
if err != nil {
return errors.Trace(err)
>>>>>>> 03285b8660 (schema, sink(ticdc): fix exchange partition (#8955)):cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
}

return nil
})
path, err := def.GenerateSchemaFilePath()
if err != nil {
return errors.Trace(err)
}
log.Debug("write ddl event to external storage",
zap.String("path", path), zap.Any("ddl", ddl))
return d.statistics.RecordDDLExecution(func() error {
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
return err1
}

return nil
})
}

return errors.Trace(err)
var def cloudstorage.TableDefinition
def.FromDDLEvent(ddl)
if err := writeFile(def); err != nil {
return errors.Trace(err)
}

if ddl.Type == timodel.ActionExchangeTablePartition {
// For exchange partition, we need to write the schema of the source table.
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version)
return writeFile(sourceTableDef)
}
return nil
}

func (d *ddlSink) WriteCheckpointTs(ctx context.Context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
update t1 set a=a+10 where a=9;

<<<<<<< HEAD
=======
/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */
>>>>>>> 03285b8660 (schema, sink(ticdc): fix exchange partition (#8955))
create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
-- update t1 set a=a+10 where a=9;

<<<<<<< HEAD
=======
/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */
>>>>>>> 03285b8660 (schema, sink(ticdc): fix exchange partition (#8955))
create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
Expand Down

0 comments on commit 908a56d

Please sign in to comment.