Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup: use history iterator to scan ddl jobs (#54100) #56896

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ go_test(
embed = [":backup"],
flaky = True,
race = "on",
shard_count = 12,
shard_count = 13,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb",
Expand Down
94 changes: 66 additions & 28 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"io"
"math/rand"
"os"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -786,8 +787,37 @@
if err != nil {
return errors.Trace(err)
}

// determine whether the jobs need to be append into `allJobs`
appendJobsFn := func(jobs []*model.Job) ([]*model.Job, bool) {
appendJobs := make([]*model.Job, 0, len(jobs))
for _, job := range jobs {
if skipUnsupportedDDLJob(job) {
continue
}
if job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion <= lastSchemaVersion {
// early exits to stop unnecessary scan
return appendJobs, true
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
appendJobs = append(appendJobs, job)
}
}
return appendJobs, false
}

newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs := make([]*model.Job, 0)
var allJobs []*model.Job
err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error {
allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx(), newestMeta)
if err != nil {
Expand All @@ -800,41 +830,49 @@
return errors.Trace(err)
}

historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
// filter out the jobs
allJobs, _ = appendJobsFn(allJobs)

historyJobsIter, err := ddl.GetLastHistoryDDLJobsIterator(newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

count := 0
count := len(allJobs)

cacheJobs := make([]*model.Job, 0, ddl.DefNumHistoryJobs)
for {
cacheJobs, err = historyJobsIter.GetLastJobs(ddl.DefNumHistoryJobs, cacheJobs)
if err != nil {
return errors.Trace(err)
}

Check warning on line 848 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L847-L848

Added lines #L847 - L848 were not covered by tests
if len(cacheJobs) == 0 {
// no more jobs
break

Check warning on line 851 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L850-L851

Added lines #L850 - L851 were not covered by tests
}
jobs, finished := appendJobsFn(cacheJobs)
count += len(jobs)
allJobs = append(allJobs, jobs...)
if finished {
// no more jobs between [LastTS, ts]
break
}
}
log.Debug("get complete jobs", zap.Int("jobs", count))
// sort by job id with ascend order
sort.Slice(allJobs, func(i, j int) bool {
return allJobs[i].ID < allJobs[j].ID
})
for _, job := range allJobs {
if skipUnsupportedDDLJob(job) {
continue
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)

Check warning on line 869 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L869

Added line #L869 was not covered by tests
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
}
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)
}
count++
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)

Check warning on line 873 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L873

Added line #L873 was not covered by tests
}
}
log.Debug("get completed jobs", zap.Int("jobs", count))
return nil
}

Expand Down
59 changes: 59 additions & 0 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,65 @@ func TestOnBackupRegionErrorResponse(t *testing.T) {
}
}

func TestGetHistoryDDLJobs(t *testing.T) {
s := createBackupSuite(t)

tk := testkit.NewTestKit(t, s.cluster.Storage)
lastTS1, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
lastTS2, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
tk.MustExec("DROP TABLE test_db.test_table1;")
tk.MustExec("DROP DATABASE test_db;")
tk.MustExec("CREATE DATABASE test_db;")
tk.MustExec("USE test_db;")
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
tk.MustExec("RENAME TABLE test_table1 to test_table;")
tk.MustExec("RENAME TABLE test_table to test_table2;")
tk.MustExec("RENAME TABLE test_table2 to test_table;")
lastTS3, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("TRUNCATE TABLE test_table;")
ts, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)

checkFn := func(lastTS uint64, ts uint64, jobsCount int) {
cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT}
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoError(t, err, "Flush failed", err)
err = metaWriter.FlushBackupMeta(ctx)
require.NoError(t, err, "Finally flush backup meta failed", err)

metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile)
require.NoError(t, err)
mockMeta := &backuppb.BackupMeta{}
err = proto.Unmarshal(metaBytes, mockMeta)
require.NoError(t, err)
// check the schema version
metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher)
allDDLJobsBytes, err := metaReader.ReadDDLs(ctx)
require.NoError(t, err)
var allDDLJobs []*model.Job
err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs)
require.NoError(t, err)
require.Len(t, allDDLJobs, jobsCount)
}

checkFn(lastTS1, ts, 11)
checkFn(lastTS2, ts, 9)
checkFn(lastTS1, lastTS2, 2)
checkFn(lastTS3, ts, 1)
}

func TestSkipUnsupportedDDLJob(t *testing.T) {
s := createBackupSuite(t)

Expand Down