Skip to content

Commit

Permalink
br: fix flaky test TestConcurrentLock (#57591)
Browse files Browse the repository at this point in the history
ref #56523
  • Loading branch information
YuJuncen authored Dec 3, 2024
1 parent b273109 commit 5598c11
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 10 deletions.
4 changes: 4 additions & 0 deletions br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ type AzureBlobStorage struct {
cpkInfo *blob.CPKInfo
}

func (*AzureBlobStorage) MarkStrongConsistency() {
// See https://github.com/MicrosoftDocs/azure-docs/issues/105331#issuecomment-1450252384
}

func newAzureBlobStorage(ctx context.Context, options *backuppb.AzureBlobStorage, opts *ExternalStorageOptions) (*AzureBlobStorage, error) {
clientBuilder, err := getAzureServiceClientBuilder(options, opts)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ type GCSStorage struct {
clients []*storage.Client
}

func (s *GCSStorage) MarkStrongConsistency() {
// See https://cloud.google.com/storage/docs/consistency#strongly_consistent_operations
}

// GetBucketHandle gets the handle to the GCS API on the bucket.
func (s *GCSStorage) GetBucketHandle() *storage.BucketHandle {
i := s.idx.Inc() % int64(len(s.handles))
Expand Down
20 changes: 18 additions & 2 deletions br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -123,8 +124,23 @@ func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(strin
base := filepath.Join(l.base, opt.SubDir)
return filepath.Walk(base, func(path string, f os.FileInfo, err error) error {
if os.IsNotExist(err) {
// if path not exists, we should return nil to continue.
return nil
log.Info("Local Storage Hint: WalkDir yields a tomestone, a race may happen.", zap.String("path", path))
if !opt.IncludeTombstone {
// if path not exists and the client doesn't require its tombstone,
// we should return nil to continue.
return nil
}
path, err = filepath.Rel(l.base, path)
if err != nil {
log.Panic("filepath.Walk returns a path that isn't a subdir of the base dir.",
zap.String("path", path), zap.String("base", l.base), logutil.ShortError(err))
}
if !strings.HasPrefix(path, opt.ObjPrefix) {
return nil
}
// NOTE: This may cause a tombstone of the dir emit to the caller when
// call `Walk` in a non-exist dir.
return fn(path, TombstoneSize)
}
if err != nil {
return errors.Trace(err)
Expand Down
13 changes: 11 additions & 2 deletions br/pkg/storage/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func (cx *VerifyWriteContext) IntentFileName() string {
// - There shouldn't be any other intention files.
// - Verify() returns no error. (If there is one.)
func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.UUID, error) {
if _, ok := s.(StrongConsisency); !ok {
log.Warn("The external storage implementation doesn't provide a strong consistency guarantee. "+
"Please avoid concurrently accessing it if possible.",
zap.String("type", fmt.Sprintf("%T", s)))
}

txnID := uuid.New()
cx := VerifyWriteContext{
Context: ctx,
Expand All @@ -82,7 +88,7 @@ func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.U
if err := checkConflict(); err != nil {
return uuid.UUID{}, errors.Annotate(err, "during initial check")
}
failpoint.Inject("exclusive-write-commit-to-1", func() {})
failpoint.InjectCall("exclusive-write-commit-to-1")

if err := s.WriteFile(cx, intentFileName, []byte{}); err != nil {
return uuid.UUID{}, errors.Annotate(err, "during writing intention file")
Expand All @@ -97,7 +103,7 @@ func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.U
if err := checkConflict(); err != nil {
return uuid.UUID{}, errors.Annotate(err, "during checking whether there are other intentions")
}
failpoint.Inject("exclusive-write-commit-to-2", func() {})
failpoint.InjectCall("exclusive-write-commit-to-2")

return txnID, s.WriteFile(cx, w.Target, w.Content(txnID))
}
Expand All @@ -106,9 +112,12 @@ func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.U
func (cx VerifyWriteContext) assertNoOtherOfPrefixExpect(pfx string, expect string) error {
fileName := path.Base(pfx)
dirName := path.Dir(pfx)

return cx.Storage.WalkDir(cx, &WalkOption{
SubDir: dirName,
ObjPrefix: fileName,
// We'd better read a deleted intention...
IncludeTombstone: true,
}, func(path string, size int64) error {
if path != expect {
return fmt.Errorf("there is conflict file %s", path)
Expand Down
34 changes: 29 additions & 5 deletions br/pkg/storage/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"os"
"path/filepath"
"sync/atomic"
"testing"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -86,8 +87,28 @@ func TestConcurrentLock(t *testing.T) {
errChA := make(chan error, 1)
errChB := make(chan error, 1)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1", "1*pause"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2", "1*pause"))
waitRecvTwice := func(ch chan<- struct{}) func() {
return func() {
ch <- struct{}{}
ch <- struct{}{}
}
}

asyncOnceFunc := func(f func()) func() {
run := new(atomic.Bool)
return func() {
if run.CompareAndSwap(false, true) {
f()
}
}
}
chA := make(chan struct{})
onceA := asyncOnceFunc(waitRecvTwice(chA))
chB := make(chan struct{})
onceB := asyncOnceFunc(waitRecvTwice(chB))

require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1", onceA))
require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2", onceB))

go func() {
_, err := storage.TryLockRemote(ctx, strg, "test.lock", "I wanna read it, but I hesitated before send my intention!")
Expand All @@ -99,16 +120,19 @@ func TestConcurrentLock(t *testing.T) {
errChB <- err
}()

failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2")
<-chA
<-chB

<-chB
<-chA

// There is exactly one error.
errA := <-errChA
errB := <-errChB
if errA == nil {
require.Error(t, errB)
} else {
require.NoError(t, errB)
require.NoError(t, errB, "%s", errA)
}

requireFileExists(t, filepath.Join(path, "test.lock"))
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type S3Storage struct {
options *backuppb.S3
}

func (*S3Storage) MarkStrongConsistency() {
// See https://aws.amazon.com/cn/s3/consistency/
}

// GetS3APIHandle gets the handle to the S3 API.
func (rs *S3Storage) GetS3APIHandle() s3iface.S3API {
return rs.svc
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ import (
// Permission represents the permission we need to check in create storage.
type Permission string

// StrongConsistency is a marker interface that indicates the storage is strong consistent
// over its `Read`, `Write` and `WalkDir` APIs.
type StrongConsisency interface {
MarkStrongConsistency()
}

const (
// AccessBuckets represents bucket access permission
// it replace the origin skip-check-path.
Expand All @@ -33,7 +39,8 @@ const (
// we cannot check DeleteObject permission alone, so we use PutAndDeleteObject instead.
PutAndDeleteObject Permission = "PutAndDeleteObject"

DefaultRequestConcurrency uint = 128
DefaultRequestConcurrency uint = 128
TombstoneSize int64 = -1
)

// WalkOption is the option of storage.WalkDir.
Expand Down Expand Up @@ -62,6 +69,14 @@ type WalkOption struct {
// to reduce the possibility of timeout on an extremely slow connection, or
// perform testing.
ListCount int64
// IncludeTombstone will allow `Walk` to emit removed files during walking.
//
// In most cases, `Walk` runs over a snapshot, if a file in the snapshot
// was deleted during walking, the file will be ignored. Set this to `true`
// will make them be sent to the callback.
//
// The size of a deleted file should be `TombstoneSize`.
IncludeTombstone bool
}

// ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods.
Expand Down

0 comments on commit 5598c11

Please sign in to comment.