Skip to content

Commit

Permalink
br/stream: Added toolkit for managing migrations (pingcap#55665)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Oct 21, 2024
1 parent 73584bb commit 64c8d31
Show file tree
Hide file tree
Showing 11 changed files with 1,946 additions and 156 deletions.
4 changes: 4 additions & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "storage",
srcs = [
"azblob.go",
"batch.go",
"compress.go",
"flags.go",
"gcs.go",
Expand All @@ -27,6 +28,7 @@ go_library(
deps = [
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/utils/iter",
"//pkg/lightning/log",
"//pkg/sessionctx/variable",
"//pkg/util",
Expand Down Expand Up @@ -76,6 +78,7 @@ go_library(
"@org_golang_x_net//http2",
"@org_golang_x_oauth2//google",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -85,6 +88,7 @@ go_test(
timeout = "short",
srcs = [
"azblob_test.go",
"batch_test.go",
"compress_test.go",
"gcs_test.go",
"local_test.go",
Expand Down
164 changes: 164 additions & 0 deletions br/pkg/storage/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package storage

import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"sync"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"go.uber.org/multierr"
)

// Effect is an side effect that happens in the batch storage.
type Effect any

// EffPut is the side effect of a call to `WriteFile`.
type EffPut struct {
File string `json:"file"`
Content []byte `json:"content"`
}

// EffDeleteFiles is the side effect of a call to `DeleteFiles`.
type EffDeleteFiles struct {
Files []string `json:"files"`
}

// EffDeleteFile is the side effect of a call to `DeleteFile`.
type EffDeleteFile string

// EffRename is the side effect of a call to `Rename`.
type EffRename struct {
From string `json:"from"`
To string `json:"to"`
}

// JSONEffects converts a slices of effects into json.
// The json will be a tagged union: `{"type": $go_type_name, "effect": $effect}`
func JSONEffects(es []Effect, output io.Writer) error {
type Typed struct {
Type string `json:"type"`
Eff Effect `json:"effect"`
}

out := make([]Typed, 0, len(es))
for _, eff := range es {
out = append(out, Typed{
Type: fmt.Sprintf("%T", eff),
Eff: eff,
})
}

return json.NewEncoder(output).Encode(out)
}

func SaveJSONEffectsToTmp(es []Effect) (string, error) {
// Save the json to a subdir so user can redirect the output path by symlinking...
tmp, err := os.CreateTemp(os.TempDir(), "br-effects-*.json")
if err != nil {
return "", err
}
if err := JSONEffects(es, tmp); err != nil {
return "", err
}
return tmp.Name(), nil
}

// Batched is a wrapper of an external storage that suspends all write operations ("effects").
// If `Close()` without calling `Commit()`, nothing will happen in the underlying external storage.
// In that case, we have done a "dry run".
//
// You may use `ReadOnlyEffects()` to get the history of the effects.
// But don't modify the returned slice!
//
// You may use `Commit()` to execute all suspended effects.
type Batched struct {
ExternalStorage
effectsMu sync.Mutex
// It will be one of:
// EffPut, EffDeleteFiles, EffDeleteFile, EffRename
effects []Effect
}

// Batch wraps an external storage instance to a batched version.
func Batch(s ExternalStorage) *Batched {
return &Batched{ExternalStorage: s}
}

// Fetch all effects from the batched storage.
//
// **The returned slice should not be modified.**
func (d *Batched) ReadOnlyEffects() []Effect {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
return d.effects
}

// CleanEffects cleans all suspended effects.
func (d *Batched) CleanEffects() {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = nil
}

func (d *Batched) DeleteFiles(ctx context.Context, names []string) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = append(d.effects, EffDeleteFiles{Files: names})
return nil
}

func (d *Batched) DeleteFile(ctx context.Context, name string) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = append(d.effects, EffDeleteFile(name))
return nil
}

func (d *Batched) WriteFile(ctx context.Context, name string, data []byte) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = append(d.effects, EffPut{File: name, Content: data})
return nil
}

func (d *Batched) Rename(ctx context.Context, oldName, newName string) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()
d.effects = append(d.effects, EffRename{From: oldName, To: newName})
return nil
}

func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error) {
return nil, errors.Annotatef(berrors.ErrStorageUnknown, "ExternalStorage.Create isn't allowed in batch mode for now.")
}

// Commit performs all effects recorded so long in the REAL external storage.
// This will cleanup all of the suspended effects.
func (d *Batched) Commit(ctx context.Context) error {
d.effectsMu.Lock()
defer d.effectsMu.Unlock()

var err error
for _, eff := range d.effects {
switch e := eff.(type) {
case EffPut:
err = multierr.Combine(d.ExternalStorage.WriteFile(ctx, e.File, e.Content), err)
case EffDeleteFiles:
err = multierr.Combine(d.ExternalStorage.DeleteFiles(ctx, e.Files), err)
case EffDeleteFile:
err = multierr.Combine(d.ExternalStorage.DeleteFile(ctx, string(e)), err)
case EffRename:
err = multierr.Combine(d.ExternalStorage.Rename(ctx, e.From, e.To), err)
default:
return errors.Annotatef(berrors.ErrStorageUnknown, "Unknown effect type %T", eff)
}
}

d.effects = nil

return nil
}
108 changes: 108 additions & 0 deletions br/pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package storage_test

import (
"context"
"io"
"os"
"testing"

. "github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
)

func TestBatched(t *testing.T) {
ctx := context.Background()
bat := Batch(nil) // Passing nil as we don't need actual storage operations

// Test operations
operations := []struct {
name string
op func() error
expected []Effect
}{
{
name: "DeleteFiles",
op: func() error {
return bat.DeleteFiles(ctx, []string{"file1.txt", "file2.txt"})
},
expected: []Effect{EffDeleteFiles{Files: []string{"file1.txt", "file2.txt"}}},
},
{
name: "DeleteFile",
op: func() error {
return bat.DeleteFile(ctx, "file3.txt")
},
expected: []Effect{EffDeleteFile("file3.txt")},
},
{
name: "WriteFile",
op: func() error {
return bat.WriteFile(ctx, "file4.txt", []byte("content"))
},
expected: []Effect{EffPut{File: "file4.txt", Content: []byte("content")}},
},
{
name: "Rename",
op: func() error {
return bat.Rename(ctx, "oldName.txt", "newName.txt")
},
expected: []Effect{EffRename{From: "oldName.txt", To: "newName.txt"}},
},
{
name: "SequenceOfOperations",
op: func() error {
if err := bat.DeleteFile(ctx, "file5.txt"); err != nil {
return err
}
if err := bat.WriteFile(ctx, "file6.txt", []byte("new content")); err != nil {
return err
}
return bat.Rename(ctx, "file6.txt", "fileRenamed.txt")
},
expected: []Effect{
EffDeleteFile("file5.txt"),
EffPut{File: "file6.txt", Content: []byte("new content")},
EffRename{From: "file6.txt", To: "fileRenamed.txt"},
}},
}

for _, op := range operations {
t.Run(op.name, func(t *testing.T) {
require.NoError(t, op.op())

effects := bat.ReadOnlyEffects()
require.Equal(t, len(op.expected), len(effects))
for i, effect := range effects {
require.Equal(t, op.expected[i], effect)
}

// Reset effects for the next test
bat.CleanEffects()
})
}
}

func TestJSONEffects(t *testing.T) {
effects := []Effect{
EffPut{File: "example.txt", Content: []byte("Hello, world")},
EffDeleteFiles{Files: []string{"old_file.txt", "temp.txt"}},
EffDeleteFile("obsolete.txt"),
EffRename{From: "old_name.txt", To: "new_name.txt"},
}

tmp, err := SaveJSONEffectsToTmp(effects)
require.NoError(t, err)
f, err := os.Open(tmp)
require.NoError(t, err)
buf, err := io.ReadAll(f)
require.NoError(t, err)

expectedJSON := `[
{"type":"storage.EffPut","effect":{"file":"example.txt","content":"SGVsbG8sIHdvcmxk"}},
{"type":"storage.EffDeleteFiles","effect":{"files":["old_file.txt","temp.txt"]}},
{"type":"storage.EffDeleteFile","effect":"obsolete.txt"},
{"type":"storage.EffRename","effect":{"from":"old_name.txt","to":"new_name.txt"}}
]`

require.JSONEq(t, expectedJSON, string(buf), "Output JSON should match expected JSON")
}
49 changes: 49 additions & 0 deletions br/pkg/storage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"context"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
)

Expand Down Expand Up @@ -37,3 +39,50 @@ var activeUploadWorkerCnt atomic.Int64
func GetActiveUploadWorkerCount() int64 {
return activeUploadWorkerCnt.Load()
}

// UnmarshalDir iterates over a prefix, then "unmarshal" the content of each file it met with the unmarshal function.
// Returning an iterator that yields the unmarshaled content.
// The "unmarshal" function should put the result of unmarshalling to the `target` argument.
func UnmarshalDir[T any](ctx context.Context, walkOpt *WalkOption, s ExternalStorage, unmarshal func(target *T, name string, content []byte) error) iter.TryNextor[*T] {
ch := make(chan *T)
errCh := make(chan error, 1)
reader := func() {
defer close(ch)
err := s.WalkDir(ctx, walkOpt, func(path string, size int64) error {
metaBytes, err := s.ReadFile(ctx, path)
if err != nil {
return errors.Annotatef(err, "failed during reading file %s", path)
}
var meta T
if err := unmarshal(&meta, path, metaBytes); err != nil {
return errors.Annotatef(err, "failed to parse subcompaction meta of file %s", path)
}
select {
case ch <- &meta:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
if err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
}
}
go reader()
return iter.Func(func(ctx context.Context) iter.IterResult[*T] {
select {
case <-ctx.Done():
return iter.Throw[*T](ctx.Err())
case err := <-errCh:
return iter.Throw[*T](err)
case meta, ok := <-ch:
if !ok {
return iter.Done[*T]()
}
return iter.Emit(meta)
}
})
}
Loading

0 comments on commit 64c8d31

Please sign in to comment.