Skip to content

Commit

Permalink
Merge branch 'master' into migrate_executor_test
Browse files Browse the repository at this point in the history
  • Loading branch information
ichn-hu authored Dec 2, 2021
2 parents 22beaa3 + 450d72f commit 2036050
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 7 deletions.
32 changes: 25 additions & 7 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,9 @@ func (bc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
"there may be some backup files in the path already, "+
"please specify a correct backup directory!", bc.storage.URI()+"/"+metautil.MetaFile)
}
exist, err = bc.storage.FileExists(ctx, metautil.LockFile)
err = CheckBackupStorageIsLocked(ctx, bc.storage)
if err != nil {
return errors.Annotatef(err, "error occurred when checking %s file", metautil.LockFile)
}
if exist {
return errors.Annotatef(berrors.ErrInvalidArgument, "backup lock file exists in %v, "+
"there may be some backup files in the path already, "+
"please specify a correct backup directory!", bc.storage.URI()+"/"+metautil.LockFile)
return err
}
bc.backend = backend
return nil
Expand All @@ -198,6 +193,29 @@ func (bc *Client) GetClusterID() uint64 {
return bc.clusterID
}

// CheckBackupStorageIsLocked checks whether backups is locked.
// which means we found other backup progress already write
// some data files into the same backup directory or cloud prefix.
func CheckBackupStorageIsLocked(ctx context.Context, s storage.ExternalStorage) error {
exist, err := s.FileExists(ctx, metautil.LockFile)
if err != nil {
return errors.Annotatef(err, "error occurred when checking %s file", metautil.LockFile)
}
if exist {
err = s.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
// should return error to break the walkDir when found lock file and other .sst files.
if strings.HasSuffix(path, ".sst") {
return errors.Annotatef(berrors.ErrInvalidArgument, "backup lock file and sst file exist in %v, "+
"there are some backup files in the path already, "+
"please specify a correct backup directory!", s.URI()+"/"+metautil.LockFile)
}
return nil
})
return err
}
return nil
}

// BuildTableRanges returns the key ranges encompassing the entire table,
// and its partitions if exists.
func BuildTableRanges(tbl *model.TableInfo) ([]kv.KeyRange, error) {
Expand Down
34 changes: 34 additions & 0 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func (r *testBackup) SetUpSuite(c *C) {

}

func (r *testBackup) resetStorage(c *C) {
var err error
base := c.MkDir()
r.storage, err = storage.NewLocalStorage(base)
c.Assert(err, IsNil)
}

func (r *testBackup) TestGetTS(c *C) {
var (
err error
Expand Down Expand Up @@ -335,3 +342,30 @@ func (r *testBackup) TestskipUnsupportedDDLJob(c *C) {
c.Assert(err, IsNil)
c.Assert(len(allDDLJobs), Equals, 8)
}

func (r *testBackup) TestCheckBackupIsLocked(c *C) {
ctx := context.Background()

r.resetStorage(c)
// check passed with an empty storage
err := backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, IsNil)

// check passed with only a lock file
err = r.storage.WriteFile(ctx, metautil.LockFile, nil)
c.Assert(err, IsNil)
err = backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, IsNil)

// check passed with a lock file and other non-sst files.
err = r.storage.WriteFile(ctx, "1.txt", nil)
c.Assert(err, IsNil)
err = backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, IsNil)

// check failed
err = r.storage.WriteFile(ctx, "1.sst", nil)
c.Assert(err, IsNil)
err = backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, ErrorMatches, "backup lock file and sst file exist in(.+)")
}
5 changes: 5 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,11 @@ func setDatumAutoIDAndCast(ctx sessionctx.Context, d *types.Datum, id int64, col
d.SetAutoID(id, col.Flag)
var err error
*d, err = table.CastValue(ctx, *d, col.ToInfo(), false, false)
if err == nil && d.GetInt64() < id {
// Auto ID is out of range, the truncated ID is possible to duplicate with an existing ID.
// To prevent updating unrelated rows in the REPLACE statement, it is better to throw an error.
return autoid.ErrAutoincReadFailed
}
return err
}

Expand Down
15 changes: 15 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,3 +1820,18 @@ func (s *testAutoRandomSuite) TestInsertIssue29892(c *C) {
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "Duplicate entry"), Equals, true)
}

// https://github.com/pingcap/tidb/issues/29483.
func (s *testSuite13) TestReplaceAllocatingAutoID(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("drop database if exists replace_auto_id;")
tk.MustExec("create database replace_auto_id;")
tk.MustExec(`use replace_auto_id;`)

tk.MustExec("SET sql_mode='NO_ENGINE_SUBSTITUTION';")
tk.MustExec("DROP TABLE IF EXISTS t1;")
tk.MustExec("CREATE TABLE t1 (a tinyint not null auto_increment primary key, b char(20));")
tk.MustExec("INSERT INTO t1 VALUES (127,'maxvalue');")
// Note that this error is different from MySQL's duplicated primary key error.
tk.MustGetErrCode("REPLACE INTO t1 VALUES (0,'newmaxvalue');", errno.ErrAutoincReadFailed)
}

0 comments on commit 2036050

Please sign in to comment.