Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Remove statement from closure to prevent concurrent operation during backup system table #29730

Merged
merged 9 commits into from
Nov 15, 2021
9 changes: 6 additions & 3 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ func (ss *Schemas) BackupSchemas(
metaWriter.StartWriteMetasAsync(ctx, op)
for _, s := range ss.schemas {
schema := s
// Because schema.dbInfo is a pointer that many tables point to.
// Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations.
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}

workerPool.ApplyOnErrorGroup(errg, func() error {
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
Expand Down
42 changes: 42 additions & 0 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package backup_test

import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"

"github.com/golang/protobuf/proto"
Expand All @@ -16,6 +18,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand Down Expand Up @@ -260,3 +263,42 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchemaWithBrokenStats(c *
c.Assert(schemas2[0].Info, DeepEquals, schemas[0].Info)
c.Assert(schemas2[0].DB, DeepEquals, schemas[0].DB)
}

func (s *testBackupSchemaSuite) TestBackupSchemasForSystemTable(c *C) {
tk := testkit.NewTestKit(c, s.mock.Storage)
es2 := s.GetRandomStorage(c)

systemTablesCount := 32
tablePrefix := "systable"
tk.MustExec("use mysql")
for i := 1; i <= systemTablesCount; i++ {
query := fmt.Sprintf("create table %s%d (a char(1));", tablePrefix, i)
tk.MustExec(query)
}

f, err := filter.Parse([]string{"mysql.systable*"})
c.Assert(err, IsNil)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(s.mock.Storage, f, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, systemTablesCount)

ctx := context.Background()
cipher := backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
}
updateCh := new(simpleProgress)

metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, &cipher)
err = backupSchemas.BackupSchemas(ctx, metaWriter2, s.mock.Storage, nil,
math.MaxUint64, 1, variable.DefChecksumTableConcurrency, true, updateCh)
c.Assert(err, IsNil)
err = metaWriter2.FlushBackupMeta(ctx)
c.Assert(err, IsNil)

schemas2 := s.GetSchemasFromMeta(c, es2)
c.Assert(schemas2, HasLen, systemTablesCount)
for _, schema := range schemas2 {
c.Assert(schema.DB.Name, Equals, utils.TemporaryDBName("mysql"))
c.Assert(strings.HasPrefix(schema.Info.Name.O, tablePrefix), Equals, true)
}
}