Skip to content

Commit

Permalink
redo(ticdc): use uuid in s3 log file to avoid name conflict (#5595) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 15, 2022
1 parent 74ac962 commit 97e9cde
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 74 deletions.
31 changes: 23 additions & 8 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const (
// RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information
// layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s"
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
)

// InitS3storage init a storage used for s3,
// s3URI should be like s3URI="s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"
var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
Expand Down Expand Up @@ -58,6 +67,13 @@ var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStor
return s3storage, nil
}

// logFormat2ParseFormat converts redo log file name format to the space separated
// format, which can be read and parsed by sscanf. Besides remove the suffix `%s`
// which is used as file name extension, since we will parse extension first.
func logFormat2ParseFormat(fmtStr string) string {
return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s")
}

// ParseLogFileName extract the commitTs, fileType from log fileName
func ParseLogFileName(name string) (uint64, string, error) {
ext := filepath.Ext(name)
Expand All @@ -66,7 +82,9 @@ func ParseLogFileName(name string) (uint64, string, error) {
}

// if .sort, the name should be like
// fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), LogEXT)+SortLogEXT
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT
if ext == SortLogEXT {
name = strings.TrimSuffix(name, SortLogEXT)
ext = filepath.Ext(name)
Expand All @@ -75,15 +93,12 @@ func ParseLogFileName(name string) (uint64, string, error) {
return 0, "", nil
}

var commitTs, d1 uint64
var s1, s2, fileType string
var commitTs uint64
var s1, s2, fileType, uid string
// the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
formatStr := "%s %s %d %s %d" + LogEXT
if ext == TmpEXT {
formatStr += TmpEXT
}
formatStr := logFormat2ParseFormat(RedoLogFileFormatV1)
name = strings.ReplaceAll(name, "_", " ")
_, err := fmt.Sscanf(name, formatStr, &s1, &s2, &d1, &fileType, &commitTs)
_, err := fmt.Sscanf(name, formatStr, &s1, &s2, &fileType, &commitTs, &uid)
if err != nil {
return 0, "", errors.Annotatef(err, "bad log name: %s", name)
}
Expand Down
77 changes: 69 additions & 8 deletions cdc/redo/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package common
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func TestParseLogFileName(t *testing.T) {
type arg struct {
name string
}
// the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
tests := []struct {
name string
args arg
Expand All @@ -36,39 +35,99 @@ func TestParseLogFileName(t *testing.T) {
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT),
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT),
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + SortLogEXT,
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
Expand All @@ -90,7 +149,9 @@ func TestParseLogFileName(t *testing.T) {
{
name: "err wrong format ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf("%s_%s_%s_%d%s%s", /* a wrong format */
"cp", "test",
DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT,
},
wantErr: ".*bad log name*.",
},
Expand Down
26 changes: 20 additions & 6 deletions cdc/redo/reader/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/pkg/leakutil"
"github.com/pingcap/tiflow/pkg/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -62,7 +63,10 @@ func TestReaderRead(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w, err := writer.NewWriter(ctx, cfg)
uuidGen := uuid.NewConstGenerator("const-uuid")
w, err := writer.NewWriter(ctx, cfg,
writer.WithUUIDGenerator(func() uuid.Generator { return uuidGen }),
)
require.Nil(t, err)
log := &model.RedoLog{
RedoRow: &model.RedoRowChangedEvent{Row: &model.RowChangedEvent{CommitTs: 1123}},
Expand All @@ -75,7 +79,9 @@ func TestReaderRead(t *testing.T) {
err = w.Close()
require.Nil(t, err)
require.True(t, !w.IsRunning())
fileName := fmt.Sprintf("%s_%s_%d_%s_%d%s", cfg.CaptureID, cfg.ChangeFeedID, cfg.CreateTime.Unix(), cfg.FileType, 11, common.LogEXT)
fileName := fmt.Sprintf(common.RedoLogFileFormatV1, cfg.CaptureID,
cfg.ChangeFeedID,
cfg.FileType, 11, uuidGen.NewString(), common.LogEXT)
path := filepath.Join(cfg.Dir, fileName)
info, err := os.Stat(path)
require.Nil(t, err)
Expand Down Expand Up @@ -108,7 +114,10 @@ func TestReaderOpenSelectedFiles(t *testing.T) {
MaxLogSize: 100000,
Dir: dir,
}
fileName := fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf", time.Now().Unix(), common.DefaultDDLLogFileType, 11, common.LogEXT+common.TmpEXT)
uuidGen := uuid.NewGenerator()
fileName := fmt.Sprintf(common.RedoLogFileFormatV1, "cp",
"test-cf", common.DefaultDDLLogFileType, 11,
uuidGen.NewString(), common.LogEXT+common.TmpEXT)
w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string {
return fileName
}))
Expand All @@ -134,21 +143,26 @@ func TestReaderOpenSelectedFiles(t *testing.T) {
require.Nil(t, err)

// no data, wil not open
fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf11", time.Now().Unix(), common.DefaultDDLLogFileType, 10, common.LogEXT)
fileName = fmt.Sprintf(common.RedoLogFileFormatV1, "cp",
"test-cf11", common.DefaultDDLLogFileType, 10,
uuidGen.NewString(), common.LogEXT)
path = filepath.Join(dir, fileName)
_, err = os.Create(path)
require.Nil(t, err)

// SortLogEXT, wil open
fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf111", time.Now().Unix(), common.DefaultDDLLogFileType, 10, common.LogEXT) + common.SortLogEXT
fileName = fmt.Sprintf(common.RedoLogFileFormatV1, "cp",
"test-cf111", common.DefaultDDLLogFileType, 10, uuidGen.NewString(),
common.LogEXT) + common.SortLogEXT
path = filepath.Join(dir, fileName)
f1, err := os.Create(path)
require.Nil(t, err)

dir1, err := ioutil.TempDir("", "redo-openSelectedFiles1")
require.Nil(t, err)
defer os.RemoveAll(dir1) //nolint:errcheck
fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf", time.Now().Unix(), common.DefaultDDLLogFileType, 11, common.LogEXT+"test")
fileName = fmt.Sprintf(common.RedoLogFileFormatV1, "cp", "test-cf",
common.DefaultDDLLogFileType, 11, uuidGen.NewString(), common.LogEXT+"test")
path = filepath.Join(dir1, fileName)
_, err = os.Create(path)
require.Nil(t, err)
Expand Down
9 changes: 7 additions & 2 deletions cdc/redo/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pingcap/errors"
mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -84,7 +85,9 @@ func TestLogReaderResetReader(t *testing.T) {
MaxLogSize: 100000,
Dir: dir,
}
fileName := fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf100", time.Now().Unix(), common.DefaultDDLLogFileType, 100, common.LogEXT)
fileName := fmt.Sprintf(common.RedoLogFileFormatV1, "cp",
"test-cf100",
common.DefaultDDLLogFileType, 100, uuid.New().String(), common.LogEXT)
w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string {
return fileName
}))
Expand All @@ -103,7 +106,9 @@ func TestLogReaderResetReader(t *testing.T) {
f, err := os.Open(path)
require.Nil(t, err)

fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf10", time.Now().Unix(), common.DefaultRowLogFileType, 10, common.LogEXT)
fileName = fmt.Sprintf(common.RedoLogFileFormatV1, "cp",
"test-cf10",
common.DefaultRowLogFileType, 10, uuid.New().String(), common.LogEXT)
w, err = writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string {
return fileName
}))
Expand Down
Loading

0 comments on commit 97e9cde

Please sign in to comment.