Skip to content

Commit

Permalink
log_backup: add lock for truncating (#49469) (#49710)
Browse files Browse the repository at this point in the history
close #49414
  • Loading branch information
ti-chi-bot authored Feb 27, 2024
1 parent 26e9a3a commit cba3c91
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 10 deletions.
5 changes: 3 additions & 2 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ func (ms *StreamMetadataSet) removeDataFilesAndUpdateMetadata(ctx context.Contex
num = int64(len(removed))

if ms.DryRun {
log.Debug("dry run, skip deletion ...")
log.Info("dry run, skip deletion ...")
return num, notDeleted, nil
}

// remove data file groups
for _, f := range removed {
log.Debug("Deleting file", zap.String("path", f.Path))
log.Info("Deleting file", zap.String("path", f.Path))
if err := storage.DeleteFile(ctx, f.Path); err != nil {
log.Warn("File not deleted.", zap.String("path", f.Path), logutil.ShortError(err))
notDeleted = append(notDeleted, f.Path)
Expand All @@ -249,6 +249,7 @@ func (ms *StreamMetadataSet) removeDataFilesAndUpdateMetadata(ctx context.Contex
ReplaceMetadata(meta, remainedDataFiles)

if ms.BeforeDoWriteBack != nil && ms.BeforeDoWriteBack(metaPath, meta) {
log.Info("Skipped writeback meta by the hook.", zap.String("meta", metaPath))
return num, notDeleted, nil
}

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"local.go",
"local_unix.go",
"local_windows.go",
"locking.go",
"memstore.go",
"noop.go",
"parse.go",
Expand Down Expand Up @@ -64,6 +65,7 @@ go_test(
"compress_test.go",
"gcs_test.go",
"local_test.go",
"locking_test.go",
"memstore_test.go",
"parse_test.go",
"s3_test.go",
Expand All @@ -72,7 +74,7 @@ go_test(
],
embed = [":storage"],
flaky = True,
shard_count = 47,
shard_count = 49,
deps = [
"//br/pkg/mock",
"@com_github_aws_aws_sdk_go//aws",
Expand Down
124 changes: 124 additions & 0 deletions br/pkg/storage/locking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package storage

import (
"context"
"encoding/json"
"fmt"
"os"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"go.uber.org/zap"
)

// LockMeta is the meta information of a lock.
type LockMeta struct {
LockedAt time.Time `json:"locked_at"`
LockerHost string `json:"locker_host"`
LockerPID int `json:"locker_pid"`
Hint string `json:"hint"`
}

func (l LockMeta) String() string {
return fmt.Sprintf("Locked(at: %s, host: %s, pid: %d, hint: %s)", l.LockedAt.Format(time.DateTime), l.LockerHost, l.LockerPID, l.Hint)
}

// ErrLocked is the error returned when the lock is held by others.
type ErrLocked struct {
Meta LockMeta
}

func (e ErrLocked) Error() string {
return fmt.Sprintf("locked, meta = %s", e.Meta)
}

// MakeLockMeta creates a LockMeta by the current node's metadata.
// Including current time and hostname, etc..
func MakeLockMeta(hint string) LockMeta {
hname, err := os.Hostname()
if err != nil {
hname = fmt.Sprintf("UnknownHost(err=%s)", err)
}
now := time.Now()
meta := LockMeta{
LockedAt: now,
LockerHost: hname,
Hint: hint,
LockerPID: os.Getpid(),
}
return meta
}

func readLockMeta(ctx context.Context, storage ExternalStorage, path string) (LockMeta, error) {
file, err := storage.ReadFile(ctx, path)
if err != nil {
return LockMeta{}, errors.Annotatef(err, "failed to read existed lock file %s", path)
}
meta := LockMeta{}
err = json.Unmarshal(file, &meta)
if err != nil {
return meta, errors.Annotatef(err, "failed to parse lock file %s", path)
}

return meta, nil
}

func putLockMeta(ctx context.Context, storage ExternalStorage, path string, meta LockMeta) error {
file, err := json.Marshal(meta)
if err != nil {
return errors.Annotatef(err, "failed to marshal lock meta %s", path)
}
err = storage.WriteFile(ctx, path, file)
if err != nil {
return errors.Annotatef(err, "failed to write lock meta at %s", path)
}
return nil
}

// TryLockRemote tries to create a "lock file" at the external storage.
// If success, we will create a file at the path provided. So others may not access the file then.
// Will return a `ErrLocked` if there is another process already creates the lock file.
// This isn't a strict lock like flock in linux: that means, the lock might be forced removed by
// manually deleting the "lock file" in external storage.
func TryLockRemote(ctx context.Context, storage ExternalStorage, path, hint string) (err error) {
defer func() {
log.Info("Trying lock remote file.", zap.String("path", path), zap.String("hint", hint), logutil.ShortError(err))
}()
exists, err := storage.FileExists(ctx, path)
if err != nil {
return errors.Annotatef(err, "failed to check lock file %s exists", path)
}
if exists {
meta, err := readLockMeta(ctx, storage, path)
if err != nil {
return err
}
return ErrLocked{Meta: meta}
}

meta := MakeLockMeta(hint)
return putLockMeta(ctx, storage, path, meta)
}

// UnlockRemote removes the lock file at the specified path.
// Removing that file will release the lock.
func UnlockRemote(ctx context.Context, storage ExternalStorage, path string) error {
meta, err := readLockMeta(ctx, storage, path)
if err != nil {
return err
}
// NOTE: this is for debug usage. For now, there isn't an Compare-And-Swap
// operation in our ExternalStorage abstraction.
// So, once our lock has been overwritten or we are overwriting other's lock,
// this information will be useful for troubleshooting.
log.Info("Releasing lock.", zap.Stringer("meta", meta), zap.String("path", path))
err = storage.DeleteFile(ctx, path)
if err != nil {
return errors.Annotatef(err, "failed to delete lock file %s", path)
}
return nil
}
61 changes: 61 additions & 0 deletions br/pkg/storage/locking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package storage_test

import (
"context"
"os"
"path/filepath"
"testing"

backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
)

func createMockStorage(t *testing.T) (storage.ExternalStorage, string) {
tempdir := t.TempDir()
storage, err := storage.New(context.Background(), &backup.StorageBackend{
Backend: &backup.StorageBackend_Local{
Local: &backup.Local{
Path: tempdir,
},
},
}, nil)
require.NoError(t, err)
return storage, tempdir
}

func requireFileExists(t *testing.T, path string) {
_, err := os.Stat(path)
require.NoError(t, err)
}

func requireFileNotExists(t *testing.T, path string) {
_, err := os.Stat(path)
require.True(t, os.IsNotExist(err))
}

func TestTryLockRemote(t *testing.T) {
ctx := context.Background()
strg, pth := createMockStorage(t)
err := storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.NoError(t, err)
requireFileExists(t, filepath.Join(pth, "test.lock"))
err = storage.UnlockRemote(ctx, strg, "test.lock")
require.NoError(t, err)
requireFileNotExists(t, filepath.Join(pth, "test.lock"))
}

func TestConflictLock(t *testing.T) {
ctx := context.Background()
strg, pth := createMockStorage(t)
err := storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.NoError(t, err)
err = storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.ErrorContains(t, err, "locked, meta = Locked")
requireFileExists(t, filepath.Join(pth, "test.lock"))
err = storage.UnlockRemote(ctx, strg, "test.lock")
require.NoError(t, err)
requireFileNotExists(t, filepath.Join(pth, "test.lock"))
}
23 changes: 17 additions & 6 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ const (
flagStreamStartTS = "start-ts"
flagStreamEndTS = "end-ts"
flagGCSafePointTTS = "gc-ttl"

truncateLockPath = "truncating.lock"
hintOnTruncateLock = "There might be another truncate task running, or a truncate task that didn't exit properly. " +
"You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage."
)

var (
Expand Down Expand Up @@ -951,7 +955,7 @@ func RunStreamStatus(
}

// RunStreamTruncate truncates the log that belong to (0, until-ts)
func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) (err error) {
console := glue.GetConsole(g)
em := color.New(color.Bold).SprintFunc()
warn := color.New(color.Bold, color.FgHiRed).SprintFunc()
Expand All @@ -965,12 +969,19 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()

storage, err := cfg.makeStorage(ctx)
extStorage, err := cfg.makeStorage(ctx)
if err != nil {
return err
}
if err := storage.TryLockRemote(ctx, extStorage, truncateLockPath, hintOnTruncateLock); err != nil {
return err
}
defer utils.WithCleanUp(&err, 10*time.Second, func(ctx context.Context) error {
//nolint:all_revive
return storage.UnlockRemote(ctx, extStorage, truncateLockPath)
})

sp, err := restore.GetTSFromFile(ctx, storage, restore.TruncateSafePointFileName)
sp, err := restore.GetTSFromFile(ctx, extStorage, restore.TruncateSafePointFileName)
if err != nil {
return err
}
Expand All @@ -988,7 +999,7 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
}
shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until)
shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, extStorage, cfg.Until)
if err != nil {
return err
}
Expand Down Expand Up @@ -1016,7 +1027,7 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre

if cfg.Until > sp && !cfg.DryRun {
if err := restore.SetTSToFile(
ctx, storage, cfg.Until, restore.TruncateSafePointFileName); err != nil {
ctx, extStorage, cfg.Until, restore.TruncateSafePointFileName); err != nil {
return err
}
}
Expand All @@ -1030,7 +1041,7 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
)
defer p.Close()

notDeleted, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, storage, p.IncBy)
notDeleted, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, extStorage, p.IncBy)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 36,
shard_count = 37,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand Down
18 changes: 18 additions & 0 deletions br/pkg/utils/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"go.uber.org/multierr"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -131,3 +132,20 @@ func CheckStoreLiveness(s *metapb.Store) error {
}
return nil
}

// WithCleanUp runs a function with a timeout, and register its error to its argument if there is one.
// This is useful while you want to run some must run but error-prone code in a defer context.
// Simple usage:
//
// func foo() (err error) {
// defer WithCleanUp(&err, time.Second, func(ctx context.Context) error {
// // do something
// return nil
// })
// }
func WithCleanUp(errOut *error, timeout time.Duration, fn func(context.Context) error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := fn(ctx)
*errOut = multierr.Combine(err, *errOut)
}
36 changes: 36 additions & 0 deletions br/pkg/utils/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package utils

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
)

func TestIsTypeCompatible(t *testing.T) {
Expand Down Expand Up @@ -136,3 +140,35 @@ func TestIsTypeCompatible(t *testing.T) {
require.True(t, IsTypeCompatible(*src, *target))
}
}

func TestWithCleanUp(t *testing.T) {
err1 := errors.New("meow?")
err2 := errors.New("nya?")

case1 := func() (err error) {
defer WithCleanUp(&err, time.Second, func(ctx context.Context) error {
//nolint:all_revive
return err1
})
return nil
}
require.ErrorIs(t, case1(), err1)

case2 := func() (err error) {
defer WithCleanUp(&err, time.Second, func(ctx context.Context) error {
//nolint:all_revive
return err1
})
return err2
}
require.ElementsMatch(t, []error{err1, err2}, multierr.Errors(case2()))

case3 := func() (err error) {
defer WithCleanUp(&err, time.Second, func(ctx context.Context) error {
//nolint:all_revive
return nil
})
return nil
}
require.NoError(t, case3())
}

0 comments on commit cba3c91

Please sign in to comment.