Skip to content

Commit

Permalink
backup: use history iterator to scan ddl jobs (pingcap#54100) (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 11, 2024
1 parent 29b4499 commit b51e9e7
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 29 deletions.
2 changes: 1 addition & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ go_test(
embed = [":backup"],
flaky = True,
race = "on",
shard_count = 12,
shard_count = 13,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb",
Expand Down
94 changes: 66 additions & 28 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"math/rand"
"os"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -786,8 +787,37 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
if err != nil {
return errors.Trace(err)
}

// determine whether the jobs need to be append into `allJobs`
appendJobsFn := func(jobs []*model.Job) ([]*model.Job, bool) {
appendJobs := make([]*model.Job, 0, len(jobs))
for _, job := range jobs {
if skipUnsupportedDDLJob(job) {
continue
}
if job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion <= lastSchemaVersion {
// early exits to stop unnecessary scan
return appendJobs, true
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
appendJobs = append(appendJobs, job)
}
}
return appendJobs, false
}

newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs := make([]*model.Job, 0)
var allJobs []*model.Job
err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error {
allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx(), newestMeta)
if err != nil {
Expand All @@ -800,41 +830,49 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
return errors.Trace(err)
}

historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
// filter out the jobs
allJobs, _ = appendJobsFn(allJobs)

historyJobsIter, err := ddl.GetLastHistoryDDLJobsIterator(newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

count := 0
count := len(allJobs)

cacheJobs := make([]*model.Job, 0, ddl.DefNumHistoryJobs)
for {
cacheJobs, err = historyJobsIter.GetLastJobs(ddl.DefNumHistoryJobs, cacheJobs)
if err != nil {
return errors.Trace(err)
}
if len(cacheJobs) == 0 {
// no more jobs
break
}
jobs, finished := appendJobsFn(cacheJobs)
count += len(jobs)
allJobs = append(allJobs, jobs...)
if finished {
// no more jobs between [LastTS, ts]
break
}
}
log.Debug("get complete jobs", zap.Int("jobs", count))
// sort by job id with ascend order
sort.Slice(allJobs, func(i, j int) bool {
return allJobs[i].ID < allJobs[j].ID
})
for _, job := range allJobs {
if skipUnsupportedDDLJob(job) {
continue
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
}
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)
}
count++
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)
}
}
log.Debug("get completed jobs", zap.Int("jobs", count))
return nil
}

Expand Down
59 changes: 59 additions & 0 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,65 @@ func TestOnBackupRegionErrorResponse(t *testing.T) {
}
}

func TestGetHistoryDDLJobs(t *testing.T) {
s := createBackupSuite(t)

tk := testkit.NewTestKit(t, s.cluster.Storage)
lastTS1, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
lastTS2, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
tk.MustExec("DROP TABLE test_db.test_table1;")
tk.MustExec("DROP DATABASE test_db;")
tk.MustExec("CREATE DATABASE test_db;")
tk.MustExec("USE test_db;")
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
tk.MustExec("RENAME TABLE test_table1 to test_table;")
tk.MustExec("RENAME TABLE test_table to test_table2;")
tk.MustExec("RENAME TABLE test_table2 to test_table;")
lastTS3, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("TRUNCATE TABLE test_table;")
ts, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)

checkFn := func(lastTS uint64, ts uint64, jobsCount int) {
cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT}
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoError(t, err, "Flush failed", err)
err = metaWriter.FlushBackupMeta(ctx)
require.NoError(t, err, "Finally flush backup meta failed", err)

metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile)
require.NoError(t, err)
mockMeta := &backuppb.BackupMeta{}
err = proto.Unmarshal(metaBytes, mockMeta)
require.NoError(t, err)
// check the schema version
metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher)
allDDLJobsBytes, err := metaReader.ReadDDLs(ctx)
require.NoError(t, err)
var allDDLJobs []*model.Job
err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs)
require.NoError(t, err)
require.Len(t, allDDLJobs, jobsCount)
}

checkFn(lastTS1, ts, 11)
checkFn(lastTS2, ts, 9)
checkFn(lastTS1, lastTS2, 2)
checkFn(lastTS3, ts, 1)
}

func TestSkipUnsupportedDDLJob(t *testing.T) {
s := createBackupSuite(t)

Expand Down

0 comments on commit b51e9e7

Please sign in to comment.