Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

external_storage: implement locking #56597

Merged
merged 7 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 224 additions & 29 deletions br/pkg/storage/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,131 @@
package storage

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math/rand"
"os"
"path"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"go.uber.org/multierr"
"go.uber.org/zap"
)

// conditionalPut is a write that in a strong consistency storage.
//
// It provides a `Verify` hook and a `VerifyWriteContext`, you may check
// the conditions you wanting there.
//
// if the write is success and the file wasn't deleted, no other `conditionalPut`
// over the same file was success.
//
// For more details, check docs/design/2024-10-11-put-and-verify-transactions-for-external-storages.md.
type conditionalPut struct {
// Target is the target file of this txn.
// There shouldn't be other files shares this prefix with this file, or the txn will fail.
Target string
// Content is the content that needed to be written to that file.
Content func(txnID uuid.UUID) []byte
// Verify allows you add other preconditions to the write.
// This will be called when the write is allowed and about to be performed.
// If `Verify()` returns an error, the write will be aborted.
Verify func(ctx VerifyWriteContext) error
}

type VerifyWriteContext struct {
context.Context
Target string
Storage ExternalStorage
TxnID uuid.UUID
}

func (cx *VerifyWriteContext) IntentFileName() string {
return fmt.Sprintf("%s.INTENT.%s", cx.Target, hex.EncodeToString(cx.TxnID[:]))
}

// CommitTo commits the write to the external storage.
// It contains two phases:
// - Intention phase, it will write an "intention" file named "$Target_$TxnID".
// - Put phase, here it actually write the "$Target" down.
//
// In each phase, before writing, it will verify whether the storage is suitable for writing, that is:
// - There shouldn't be any other intention files.
// - Verify() returns no error. (If there is one.)
func (w conditionalPut) CommitTo(ctx context.Context, s ExternalStorage) (uuid.UUID, error) {
txnID := uuid.New()
cx := VerifyWriteContext{
Context: ctx,
Target: w.Target,
Storage: s,
TxnID: txnID,
}
intentFileName := cx.IntentFileName()
checkConflict := func() error {
var err error
if w.Verify != nil {
err = multierr.Append(err, w.Verify(cx))
}
return multierr.Append(err, cx.assertOnlyMyIntent())
}

if err := checkConflict(); err != nil {
return uuid.UUID{}, errors.Annotate(err, "during initial check")
}
failpoint.Inject("exclusive-write-commit-to-1", func() {})

if err := s.WriteFile(cx, intentFileName, []byte{}); err != nil {
return uuid.UUID{}, errors.Annotate(err, "during writing intention file")
}

deleteIntentionFile := func() {
if err := s.DeleteFile(cx, intentFileName); err != nil {
log.Warn("Cannot delete the intention file, you may delete it manually.", zap.String("file", intentFileName), logutil.ShortError(err))
}
}
defer deleteIntentionFile()
if err := checkConflict(); err != nil {
return uuid.UUID{}, errors.Annotate(err, "during checking whether there are other intentions")
}
failpoint.Inject("exclusive-write-commit-to-2", func() {})

return txnID, s.WriteFile(cx, w.Target, w.Content(txnID))
}

// assertNoOtherOfPrefixExpect asserts that there is no other file with the same prefix than the expect file.
func (cx VerifyWriteContext) assertNoOtherOfPrefixExpect(pfx string, expect string) error {
fileName := path.Base(pfx)
dirName := path.Dir(pfx)
return cx.Storage.WalkDir(cx, &WalkOption{
SubDir: dirName,
ObjPrefix: fileName,
}, func(path string, size int64) error {
if path != expect {
return fmt.Errorf("there is conflict file %s", path)
}
return nil
})
}

// assertOnlyMyIntent asserts that there is no other intention file than our intention file.
func (cx VerifyWriteContext) assertOnlyMyIntent() error {
return cx.assertNoOtherOfPrefixExpect(cx.Target, cx.IntentFileName())
}

// LockMeta is the meta information of a lock.
type LockMeta struct {
LockedAt time.Time `json:"locked_at"`
LockerHost string `json:"locker_host"`
LockerPID int `json:"locker_pid"`
TxnID []byte `json:"txn_id"`
Hint string `json:"hint"`
}

Expand Down Expand Up @@ -67,58 +175,145 @@ func readLockMeta(ctx context.Context, storage ExternalStorage, path string) (Lo
return meta, nil
}

func putLockMeta(ctx context.Context, storage ExternalStorage, path string, meta LockMeta) error {
file, err := json.Marshal(meta)
if err != nil {
return errors.Annotatef(err, "failed to marshal lock meta %s", path)
}
err = storage.WriteFile(ctx, path, file)
type RemoteLock struct {
txnID uuid.UUID
storage ExternalStorage
path string
}

func tryFetchRemoteLock(ctx context.Context, storage ExternalStorage, path string) error {
meta, err := readLockMeta(ctx, storage, path)
if err != nil {
return errors.Annotatef(err, "failed to write lock meta at %s", path)
return err
}
return nil
return ErrLocked{Meta: meta}
}

// TryLockRemote tries to create a "lock file" at the external storage.
// If success, we will create a file at the path provided. So others may not access the file then.
// Will return a `ErrLocked` if there is another process already creates the lock file.
// This isn't a strict lock like flock in linux: that means, the lock might be forced removed by
// manually deleting the "lock file" in external storage.
func TryLockRemote(ctx context.Context, storage ExternalStorage, path, hint string) (err error) {
defer func() {
log.Info("Trying lock remote file.", zap.String("path", path), zap.String("hint", hint), logutil.ShortError(err))
}()
exists, err := storage.FileExists(ctx, path)
if err != nil {
return errors.Annotatef(err, "failed to check lock file %s exists", path)
}
if exists {
meta, err := readLockMeta(ctx, storage, path)
if err != nil {
return err
}
return ErrLocked{Meta: meta}
func TryLockRemote(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use TryLockRemoteWrite instead of TryLockRemote?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for compatibility. Write locks requires an extra suffix.

writer := conditionalPut{
Target: path,
Content: func(txnID uuid.UUID) []byte {
meta := MakeLockMeta(hint)
meta.TxnID = txnID[:]
res, err := json.Marshal(meta)
if err != nil {
log.Panic(
"Unreachable: a trivial object cannot be marshaled to JSON.",
zap.String("path", path),
logutil.ShortError(err),
)
}
return res
},
}

meta := MakeLockMeta(hint)
return putLockMeta(ctx, storage, path, meta)
lock.storage = storage
lock.path = path
lock.txnID, err = writer.CommitTo(ctx, storage)
if err != nil {
err = errors.Annotatef(err, "there is something about the lock: %s", tryFetchRemoteLock(ctx, storage, path))
}
return
}

// UnlockRemote removes the lock file at the specified path.
// Removing that file will release the lock.
func UnlockRemote(ctx context.Context, storage ExternalStorage, path string) error {
meta, err := readLockMeta(ctx, storage, path)
func (l RemoteLock) Unlock(ctx context.Context) error {
meta, err := readLockMeta(ctx, l.storage, l.path)
if err != nil {
return err
}
// NOTE: this is for debug usage. For now, there isn't an Compare-And-Swap
// operation in our ExternalStorage abstraction.
// So, once our lock has been overwritten or we are overwriting other's lock,
// this information will be useful for troubleshooting.
log.Info("Releasing lock.", zap.Stringer("meta", meta), zap.String("path", path))
err = storage.DeleteFile(ctx, path)
if !bytes.Equal(l.txnID[:], meta.TxnID) {
return errors.Errorf("Txn ID mismatch: remote is %v, our is %v", meta.TxnID, l.txnID)
}

log.Info("Releasing lock.", zap.Stringer("meta", meta), zap.String("path", l.path))
err = l.storage.DeleteFile(ctx, l.path)
if err != nil {
return errors.Annotatef(err, "failed to delete lock file %s", path)
return errors.Annotatef(err, "failed to delete lock file %s", l.path)
}
return nil
}

func writeLockName(path string) string {
return fmt.Sprintf("%s.WRIT", path)
}

func newReadLockName(path string) string {
readID := rand.Int63()
return fmt.Sprintf("%s.READ.%016x", path, readID)
}

func TryLockRemoteWrite(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error) {
target := writeLockName(path)
writer := conditionalPut{
Target: target,
Content: func(txnID uuid.UUID) []byte {
meta := MakeLockMeta(hint)
meta.TxnID = txnID[:]
res, err := json.Marshal(meta)
if err != nil {
log.Panic(
"Unreachable: a plain object cannot be marshaled to JSON.",
zap.String("path", path),
logutil.ShortError(err),
)
}
return res
},
Verify: func(ctx VerifyWriteContext) error {
return ctx.assertNoOtherOfPrefixExpect(path, ctx.IntentFileName())
},
}

lock.storage = storage
lock.path = target
lock.txnID, err = writer.CommitTo(ctx, storage)
if err != nil {
err = errors.Annotatef(err, "there is something about the lock: %s", tryFetchRemoteLock(ctx, storage, target))
}
return
}

func TryLockRemoteRead(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error) {
target := newReadLockName(path)
writeLock := writeLockName(path)
writer := conditionalPut{
Target: target,
Content: func(txnID uuid.UUID) []byte {
meta := MakeLockMeta(hint)
meta.TxnID = txnID[:]
res, err := json.Marshal(meta)
if err != nil {
log.Panic(
"Unreachable: a trivial object cannot be marshaled to JSON.",
zap.String("path", path),
logutil.ShortError(err),
)
}
return res
},
Verify: func(ctx VerifyWriteContext) error {
return ctx.assertNoOtherOfPrefixExpect(writeLock, "")
},
}

lock.storage = storage
lock.path = target
lock.txnID, err = writer.CommitTo(ctx, storage)
if err != nil {
err = errors.Annotatef(err, "failed to commit the lock due to existing lock: "+
"there is something about the lock: %s", tryFetchRemoteLock(ctx, storage, writeLock))
}

return
}
66 changes: 60 additions & 6 deletions br/pkg/storage/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"testing"

"github.com/pingcap/failpoint"
backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -39,23 +40,76 @@ func requireFileNotExists(t *testing.T, path string) {
func TestTryLockRemote(t *testing.T) {
ctx := context.Background()
strg, pth := createMockStorage(t)
err := storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
lock, err := storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.NoError(t, err)
requireFileExists(t, filepath.Join(pth, "test.lock"))
err = storage.UnlockRemote(ctx, strg, "test.lock")
err = lock.Unlock(ctx)
require.NoError(t, err)
requireFileNotExists(t, filepath.Join(pth, "test.lock"))
}

func TestConflictLock(t *testing.T) {
ctx := context.Background()
strg, pth := createMockStorage(t)
err := storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
lock, err := storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.NoError(t, err)
err = storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.ErrorContains(t, err, "locked, meta = Locked")
_, err = storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.ErrorContains(t, err, "conflict file test.lock")
requireFileExists(t, filepath.Join(pth, "test.lock"))
err = storage.UnlockRemote(ctx, strg, "test.lock")
err = lock.Unlock(ctx)
require.NoError(t, err)
requireFileNotExists(t, filepath.Join(pth, "test.lock"))
}

func TestRWLock(t *testing.T) {
ctx := context.Background()
strg, path := createMockStorage(t)
lock, err := storage.TryLockRemoteRead(ctx, strg, "test.lock", "I wanna read it!")
require.NoError(t, err)
lock2, err := storage.TryLockRemoteRead(ctx, strg, "test.lock", "I wanna read it too!")
require.NoError(t, err)
_, err = storage.TryLockRemoteWrite(ctx, strg, "test.lock", "I wanna write it, you get out!")
require.Error(t, err)
require.NoError(t, lock.Unlock(ctx))
require.NoError(t, lock2.Unlock(ctx))
l, err := storage.TryLockRemoteWrite(ctx, strg, "test.lock", "Can I have a write lock?")
require.NoError(t, err)
requireFileExists(t, filepath.Join(path, "test.lock.WRIT"))
require.NoError(t, l.Unlock(ctx))
requireFileNotExists(t, filepath.Join(path, "test.lock.WRIT"))
}

func TestConcurrentLock(t *testing.T) {
ctx := context.Background()
strg, path := createMockStorage(t)

errChA := make(chan error, 1)
errChB := make(chan error, 1)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1", "1*pause"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2", "1*pause"))

go func() {
_, err := storage.TryLockRemote(ctx, strg, "test.lock", "I wanna read it, but I hesitated before send my intention!")
errChA <- err
}()

go func() {
_, err := storage.TryLockRemote(ctx, strg, "test.lock", "I wanna read it too, but I hesitated before committing!")
errChB <- err
}()

failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-1")
failpoint.Disable("github.com/pingcap/tidb/br/pkg/storage/exclusive-write-commit-to-2")

// There is exactly one error.
errA := <-errChA
errB := <-errChB
if errA == nil {
require.Error(t, errB)
} else {
require.NoError(t, errB)
}

requireFileExists(t, filepath.Join(path, "test.lock"))
}
Loading
Loading