From b4259d5cedca38dd4d4d03f1f5ef10bceba34262 Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Fri, 8 Jul 2022 19:07:03 +0800 Subject: [PATCH] migration(ticdc): fix backup key issue (#6232) ref pingcap/tiflow#5301 --- pkg/etcd/etcd.go | 3 +++ pkg/etcd/etcd_test.go | 7 +++++++ pkg/migrate/migrate.go | 14 +++++++------- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 08851a69352..0edf4d02966 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -91,6 +91,9 @@ func GetEtcdKeyJob(clusterID string, changeFeedID model.ChangeFeedID) string { // MigrateBackupKey is the key of backup data during a migration. func MigrateBackupKey(version int, backupKey string) string { + if strings.HasPrefix(backupKey, "/") { + return fmt.Sprintf("%s/%d%s", migrateBackupPrefix, version, backupKey) + } return fmt.Sprintf("%s/%d/%s", migrateBackupPrefix, version, backupKey) } diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index 57513015595..5b02717b97b 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -432,3 +432,10 @@ func TestExtractKeySuffix(t *testing.T) { } } } + +func TestMigrateBackupKey(t *testing.T) { + key := MigrateBackupKey(1, "/tidb/cdc/capture/abcd") + require.Equal(t, "/tidb/cdc/__backup__/1/tidb/cdc/capture/abcd", key) + key = MigrateBackupKey(1, "abcdc") + require.Equal(t, "/tidb/cdc/__backup__/1/abcdc", key) +} diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index 4232d7e7256..940c7d49878 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -156,7 +156,7 @@ func createPDClient(ctx context.Context, // 1. check and put metaVersion // 2. campaign old owner // 3. update keys -// 4. check meta data consistency +// 4. check metadata consistency // 5. update metaVersion func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersion int) error { pdClient, err := m.createPDClientFunc(ctx, @@ -292,7 +292,7 @@ func cleanOldData(ctx context.Context, client *etcd.Client) { if strings.HasPrefix(key, oldChangefeedPrefix) { value = maskChangefeedInfo(kvPair.Value) } - // 0 is the backup verion. For now, we only support verion 0 + // 0 is the backup version. For now, we only support version 0 newKey := etcd.MigrateBackupKey(0, key) log.Info("renaming old etcd data", zap.String("key", key), @@ -312,7 +312,7 @@ func cleanOldData(ctx context.Context, client *etcd.Client) { } } -// old key prefix that should be remove +// old key prefix that should be removed var oldKeyPrefix = []string{ "/tidb/cdc/changefeed/info", "/tidb/cdc/job", @@ -469,7 +469,7 @@ func (m *migrator) Migrate(ctx context.Context) error { return m.migrate(ctx, version == noMetaVersion, oldVersion) } -// ShouldMigrate checks if we should migrate etcd meta data +// ShouldMigrate checks if we should migrate etcd metadata func (m *migrator) ShouldMigrate(ctx context.Context) (bool, error) { version, err := getMetaVersion(ctx, m.cli.Client, m.cli.ClusterID) if err != nil { @@ -568,17 +568,17 @@ func getMetaVersion(ctx context.Context, cli *etcd.Client, clusterID string) (in type NoOpMigrator struct{} // ShouldMigrate checks if we need to migrate metadata -func (f *NoOpMigrator) ShouldMigrate(ctx context.Context) (bool, error) { +func (f *NoOpMigrator) ShouldMigrate(_ context.Context) (bool, error) { return false, nil } // Migrate migrates the cdc metadata -func (f *NoOpMigrator) Migrate(ctx context.Context) error { +func (f *NoOpMigrator) Migrate(_ context.Context) error { return nil } // WaitMetaVersionMatched wait util migration is done -func (f *NoOpMigrator) WaitMetaVersionMatched(ctx context.Context) error { +func (f *NoOpMigrator) WaitMetaVersionMatched(_ context.Context) error { return nil }