diff --git a/cdc/redo/common/util.go b/cdc/redo/common/util.go index 5ba7321583a..fdfd4478104 100644 --- a/cdc/redo/common/util.go +++ b/cdc/redo/common/util.go @@ -26,6 +26,15 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" ) +const ( + // RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information + // layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName + RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s" + // RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information + // layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName + RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s" +) + // InitS3storage init a storage used for s3, // s3URI should be like s3URI="s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/" var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) { @@ -58,6 +67,13 @@ var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStor return s3storage, nil } +// logFormat2ParseFormat converts redo log file name format to the space separated +// format, which can be read and parsed by sscanf. Besides remove the suffix `%s` +// which is used as file name extension, since we will parse extension first. +func logFormat2ParseFormat(fmtStr string) string { + return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s") +} + // ParseLogFileName extract the commitTs, fileType from log fileName func ParseLogFileName(name string) (uint64, string, error) { ext := filepath.Ext(name) @@ -66,7 +82,9 @@ func ParseLogFileName(name string) (uint64, string, error) { } // if .sort, the name should be like - // fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), LogEXT)+SortLogEXT + // fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID, + // w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID, + // w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT if ext == SortLogEXT { name = strings.TrimSuffix(name, SortLogEXT) ext = filepath.Ext(name) @@ -75,15 +93,12 @@ func ParseLogFileName(name string) (uint64, string, error) { return 0, "", nil } - var commitTs, d1 uint64 - var s1, s2, fileType string + var commitTs uint64 + var s1, s2, fileType, uid string // the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT) - formatStr := "%s %s %d %s %d" + LogEXT - if ext == TmpEXT { - formatStr += TmpEXT - } + formatStr := logFormat2ParseFormat(RedoLogFileFormatV1) name = strings.ReplaceAll(name, "_", " ") - _, err := fmt.Sscanf(name, formatStr, &s1, &s2, &d1, &fileType, &commitTs) + _, err := fmt.Sscanf(name, formatStr, &s1, &s2, &fileType, &commitTs, &uid) if err != nil { return 0, "", errors.Annotatef(err, "bad log name: %s", name) } diff --git a/cdc/redo/common/util_test.go b/cdc/redo/common/util_test.go index acd9d92b4b8..d1aaadc98d5 100644 --- a/cdc/redo/common/util_test.go +++ b/cdc/redo/common/util_test.go @@ -16,8 +16,8 @@ package common import ( "fmt" "testing" - "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" ) @@ -25,7 +25,6 @@ func TestParseLogFileName(t *testing.T) { type arg struct { name string } - // the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT) tests := []struct { name string args arg @@ -36,7 +35,29 @@ func TestParseLogFileName(t *testing.T) { { name: "happy row .log", args: arg{ - name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT), + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultRowLogFileType, 1, uuid.New().String(), LogEXT), + }, + wantTs: 1, + wantFileType: DefaultRowLogFileType, + }, + { + name: "happy row .log", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultRowLogFileType, 1, uuid.New().String(), LogEXT), + }, + wantTs: 1, + wantFileType: DefaultRowLogFileType, + }, + { + name: "happy row .tmp", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT, }, wantTs: 1, wantFileType: DefaultRowLogFileType, @@ -44,7 +65,9 @@ func TestParseLogFileName(t *testing.T) { { name: "happy row .tmp", args: arg{ - name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT) + TmpEXT, + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultRowLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT, }, wantTs: 1, wantFileType: DefaultRowLogFileType, @@ -52,7 +75,19 @@ func TestParseLogFileName(t *testing.T) { { name: "happy ddl .log", args: arg{ - name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT), + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT), + }, + wantTs: 1, + wantFileType: DefaultDDLLogFileType, + }, + { + name: "happy ddl .log", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT), }, wantTs: 1, wantFileType: DefaultDDLLogFileType, @@ -60,7 +95,29 @@ func TestParseLogFileName(t *testing.T) { { name: "happy ddl .sort", args: arg{ - name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + SortLogEXT, + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT, + }, + wantTs: 1, + wantFileType: DefaultDDLLogFileType, + }, + { + name: "happy ddl .sort", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + SortLogEXT, + }, + wantTs: 1, + wantFileType: DefaultDDLLogFileType, + }, + { + name: "happy ddl .tmp", + args: arg{ + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT, }, wantTs: 1, wantFileType: DefaultDDLLogFileType, @@ -68,7 +125,9 @@ func TestParseLogFileName(t *testing.T) { { name: "happy ddl .tmp", args: arg{ - name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT, + name: fmt.Sprintf(RedoLogFileFormatV1, "cp", + "test", + DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT, }, wantTs: 1, wantFileType: DefaultDDLLogFileType, @@ -90,7 +149,9 @@ func TestParseLogFileName(t *testing.T) { { name: "err wrong format ddl .tmp", args: arg{ - name: fmt.Sprintf("%s_%s_%d_%s%d%s", "cp", "test", time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT, + name: fmt.Sprintf("%s_%s_%s_%d%s%s", /* a wrong format */ + "cp", "test", + DefaultDDLLogFileType, 1, uuid.New().String(), LogEXT) + TmpEXT, }, wantErr: ".*bad log name*.", }, diff --git a/cdc/redo/reader/file_test.go b/cdc/redo/reader/file_test.go index 47c55dbf303..dd04b1f7e7a 100644 --- a/cdc/redo/reader/file_test.go +++ b/cdc/redo/reader/file_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/writer" "github.com/pingcap/tiflow/pkg/leakutil" + "github.com/pingcap/tiflow/pkg/uuid" "github.com/stretchr/testify/require" "golang.org/x/net/context" ) @@ -62,7 +63,10 @@ func TestReaderRead(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - w, err := writer.NewWriter(ctx, cfg) + uuidGen := uuid.NewConstGenerator("const-uuid") + w, err := writer.NewWriter(ctx, cfg, + writer.WithUUIDGenerator(func() uuid.Generator { return uuidGen }), + ) require.Nil(t, err) log := &model.RedoLog{ RedoRow: &model.RedoRowChangedEvent{Row: &model.RowChangedEvent{CommitTs: 1123}}, @@ -75,7 +79,9 @@ func TestReaderRead(t *testing.T) { err = w.Close() require.Nil(t, err) require.True(t, !w.IsRunning()) - fileName := fmt.Sprintf("%s_%s_%d_%s_%d%s", cfg.CaptureID, cfg.ChangeFeedID, cfg.CreateTime.Unix(), cfg.FileType, 11, common.LogEXT) + fileName := fmt.Sprintf(common.RedoLogFileFormatV1, cfg.CaptureID, + cfg.ChangeFeedID, + cfg.FileType, 11, uuidGen.NewString(), common.LogEXT) path := filepath.Join(cfg.Dir, fileName) info, err := os.Stat(path) require.Nil(t, err) @@ -108,7 +114,10 @@ func TestReaderOpenSelectedFiles(t *testing.T) { MaxLogSize: 100000, Dir: dir, } - fileName := fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf", time.Now().Unix(), common.DefaultDDLLogFileType, 11, common.LogEXT+common.TmpEXT) + uuidGen := uuid.NewGenerator() + fileName := fmt.Sprintf(common.RedoLogFileFormatV1, "cp", + "test-cf", common.DefaultDDLLogFileType, 11, + uuidGen.NewString(), common.LogEXT+common.TmpEXT) w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string { return fileName })) @@ -134,13 +143,17 @@ func TestReaderOpenSelectedFiles(t *testing.T) { require.Nil(t, err) // no data, wil not open - fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf11", time.Now().Unix(), common.DefaultDDLLogFileType, 10, common.LogEXT) + fileName = fmt.Sprintf(common.RedoLogFileFormatV1, "cp", + "test-cf11", common.DefaultDDLLogFileType, 10, + uuidGen.NewString(), common.LogEXT) path = filepath.Join(dir, fileName) _, err = os.Create(path) require.Nil(t, err) // SortLogEXT, wil open - fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf111", time.Now().Unix(), common.DefaultDDLLogFileType, 10, common.LogEXT) + common.SortLogEXT + fileName = fmt.Sprintf(common.RedoLogFileFormatV1, "cp", + "test-cf111", common.DefaultDDLLogFileType, 10, uuidGen.NewString(), + common.LogEXT) + common.SortLogEXT path = filepath.Join(dir, fileName) f1, err := os.Create(path) require.Nil(t, err) @@ -148,7 +161,8 @@ func TestReaderOpenSelectedFiles(t *testing.T) { dir1, err := ioutil.TempDir("", "redo-openSelectedFiles1") require.Nil(t, err) defer os.RemoveAll(dir1) //nolint:errcheck - fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf", time.Now().Unix(), common.DefaultDDLLogFileType, 11, common.LogEXT+"test") + fileName = fmt.Sprintf(common.RedoLogFileFormatV1, "cp", "test-cf", + common.DefaultDDLLogFileType, 11, uuidGen.NewString(), common.LogEXT+"test") path = filepath.Join(dir1, fileName) _, err = os.Create(path) require.Nil(t, err) diff --git a/cdc/redo/reader/reader_test.go b/cdc/redo/reader/reader_test.go index dd963d7cf55..0c637197c92 100644 --- a/cdc/redo/reader/reader_test.go +++ b/cdc/redo/reader/reader_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/google/uuid" "github.com/pingcap/errors" mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage" "github.com/pingcap/tidb/br/pkg/storage" @@ -84,7 +85,9 @@ func TestLogReaderResetReader(t *testing.T) { MaxLogSize: 100000, Dir: dir, } - fileName := fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf100", time.Now().Unix(), common.DefaultDDLLogFileType, 100, common.LogEXT) + fileName := fmt.Sprintf(common.RedoLogFileFormatV1, "cp", + "test-cf100", + common.DefaultDDLLogFileType, 100, uuid.New().String(), common.LogEXT) w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string { return fileName })) @@ -103,7 +106,9 @@ func TestLogReaderResetReader(t *testing.T) { f, err := os.Open(path) require.Nil(t, err) - fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp", "test-cf10", time.Now().Unix(), common.DefaultRowLogFileType, 10, common.LogEXT) + fileName = fmt.Sprintf(common.RedoLogFileFormatV1, "cp", + "test-cf10", + common.DefaultRowLogFileType, 10, uuid.New().String(), common.LogEXT) w, err = writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string { return fileName })) diff --git a/cdc/redo/writer/file.go b/cdc/redo/writer/file.go index 0e55d7d89a3..eb846c3c161 100644 --- a/cdc/redo/writer/file.go +++ b/cdc/redo/writer/file.go @@ -29,13 +29,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tiflow/cdc/redo/common" - cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/uber-go/atomic" pioutil "go.etcd.io/etcd/pkg/ioutil" "go.uber.org/multierr" "go.uber.org/zap" + + "github.com/pingcap/tiflow/cdc/redo/common" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/uuid" ) const ( @@ -91,7 +93,8 @@ type FileWriterConfig struct { type Option func(writer *writerOptions) type writerOptions struct { - getLogFileName func() string + getLogFileName func() string + getUUIDGenerator func() uuid.Generator } // WithLogFileName provide the Option for fileName @@ -103,6 +106,15 @@ func WithLogFileName(f func() string) Option { } } +// WithUUIDGenerator provides the Option for uuid generator +func WithUUIDGenerator(f func() uuid.Generator) Option { + return func(o *writerOptions) { + if f != nil { + o.getUUIDGenerator = f + } + } +} + // Writer is a redo log event Writer which writes redo log events to a file. type Writer struct { cfg *FileWriterConfig @@ -117,10 +129,13 @@ type Writer struct { gcRunning atomic.Bool size int64 file *os.File - bw *pioutil.PageWriter - uint64buf []byte - storage storage.ExternalStorage + // record the filepath that is being written, and has not been flushed + ongoingFilePath string + bw *pioutil.PageWriter + uint64buf []byte + storage storage.ExternalStorage sync.RWMutex + uuidGenerator uuid.Generator metricFsyncDuration prometheus.Observer metricFlushAllDuration prometheus.Observer @@ -163,6 +178,11 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri metricFlushAllDuration: redoFlushAllDurationHistogram.WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID), metricWriteBytes: redoWriteBytesGauge.WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID), } + if w.op.getUUIDGenerator != nil { + w.uuidGenerator = w.op.getUUIDGenerator() + } else { + w.uuidGenerator = uuid.NewGenerator() + } w.running.Store(true) go w.runFlushToDisk(ctx, cfg.FlushIntervalInMs) @@ -308,7 +328,7 @@ func (w *Writer) close() error { ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout) defer cancel() - err = w.renameInS3(ctx, w.file.Name(), w.filePath()) + err = w.renameInS3(ctx, w.file.Name(), w.ongoingFilePath) if err != nil { return cerror.WrapError(cerror.ErrS3StorageAPI, err) } @@ -331,11 +351,18 @@ func (w *Writer) getLogFileName() string { if w.op != nil && w.op.getLogFileName != nil { return w.op.getLogFileName() } - return fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.CaptureID, w.cfg.ChangeFeedID, w.cfg.CreateTime.Unix(), w.cfg.FileType, w.commitTS.Load(), common.LogEXT) + uid := w.uuidGenerator.NewString() + return fmt.Sprintf(common.RedoLogFileFormatV1, + w.cfg.CaptureID, w.cfg.ChangeFeedID, w.cfg.FileType, + w.commitTS.Load(), uid, common.LogEXT) } +// filePath always creates a new, unique file path, note this function is not +// thread-safe, writer needs to ensure lock is acquired when calling it. func (w *Writer) filePath() string { - return filepath.Join(w.cfg.Dir, w.getLogFileName()) + fp := filepath.Join(w.cfg.Dir, w.getLogFileName()) + w.ongoingFilePath = fp + return fp } func openTruncFile(name string) (*os.File, error) { @@ -366,7 +393,10 @@ func (w *Writer) openNew() error { } func (w *Writer) openOrNew(writeLen int) error { - path := w.filePath() + path := w.ongoingFilePath + if path == "" { + return w.openNew() + } info, err := os.Stat(path) if os.IsNotExist(err) { return w.openNew() diff --git a/cdc/redo/writer/file_test.go b/cdc/redo/writer/file_test.go index e22fdf430bf..9527daa07f5 100644 --- a/cdc/redo/writer/file_test.go +++ b/cdc/redo/writer/file_test.go @@ -28,6 +28,7 @@ import ( mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage" "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/pkg/leakutil" + "github.com/pingcap/tiflow/pkg/uuid" "github.com/stretchr/testify/require" "github.com/uber-go/atomic" ) @@ -47,6 +48,7 @@ func TestWriterWrite(t *testing.T) { require.Nil(t, err) defer os.RemoveAll(dir) + uuidGen := uuid.NewConstGenerator("const-uuid") w := &Writer{ cfg: &FileWriterConfig{ MaxLogSize: 10, @@ -61,13 +63,17 @@ func TestWriterWrite(t *testing.T) { metricWriteBytes: redoWriteBytesGauge.WithLabelValues("cp", "test-cf"), metricFsyncDuration: redoFsyncDurationHistogram.WithLabelValues("cp", "test-cf"), metricFlushAllDuration: redoFlushAllDurationHistogram.WithLabelValues("cp", "test-cf"), + uuidGenerator: uuidGen, } w.eventCommitTS.Store(1) _, err = w.Write([]byte("tes1t11111")) require.Nil(t, err) + var fileName string // create a .tmp file - fileName := fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.CaptureID, w.cfg.ChangeFeedID, w.cfg.CreateTime.Unix(), w.cfg.FileType, 1, common.LogEXT) + common.TmpEXT + fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w.cfg.CaptureID, + w.cfg.ChangeFeedID, + w.cfg.FileType, 1, uuidGen.NewString(), common.LogEXT) + common.TmpEXT path := filepath.Join(w.cfg.Dir, fileName) info, err := os.Stat(path) require.Nil(t, err) @@ -81,13 +87,17 @@ func TestWriterWrite(t *testing.T) { require.Nil(t, err) // after rotate, rename to .log - fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.CaptureID, w.cfg.ChangeFeedID, w.cfg.CreateTime.Unix(), w.cfg.FileType, 1, common.LogEXT) + fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w.cfg.CaptureID, + w.cfg.ChangeFeedID, + w.cfg.FileType, 1, uuidGen.NewString(), common.LogEXT) path = filepath.Join(w.cfg.Dir, fileName) info, err = os.Stat(path) require.Nil(t, err) require.Equal(t, fileName, info.Name()) // create a .tmp file with first eventCommitTS as name - fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.CaptureID, w.cfg.ChangeFeedID, w.cfg.CreateTime.Unix(), w.cfg.FileType, 12, common.LogEXT) + common.TmpEXT + fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w.cfg.CaptureID, + w.cfg.ChangeFeedID, + w.cfg.FileType, 12, uuidGen.NewString(), common.LogEXT) + common.TmpEXT path = filepath.Join(w.cfg.Dir, fileName) info, err = os.Stat(path) require.Nil(t, err) @@ -96,7 +106,9 @@ func TestWriterWrite(t *testing.T) { require.Nil(t, err) require.False(t, w.IsRunning()) // safe close, rename to .log with max eventCommitTS as name - fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.CaptureID, w.cfg.ChangeFeedID, w.cfg.CreateTime.Unix(), w.cfg.FileType, 22, common.LogEXT) + fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w.cfg.CaptureID, + w.cfg.ChangeFeedID, + w.cfg.FileType, 22, uuidGen.NewString(), common.LogEXT) path = filepath.Join(w.cfg.Dir, fileName) info, err = os.Stat(path) require.Nil(t, err) @@ -111,18 +123,24 @@ func TestWriterWrite(t *testing.T) { FileType: common.DefaultRowLogFileType, CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}), }, - uint64buf: make([]byte, 8), - running: *atomic.NewBool(true), - metricWriteBytes: redoWriteBytesGauge.WithLabelValues("cp", "test-cf11"), - metricFsyncDuration: redoFsyncDurationHistogram.WithLabelValues("cp", "test-cf11"), - metricFlushAllDuration: redoFlushAllDurationHistogram.WithLabelValues("cp", "test-cf11"), + uint64buf: make([]byte, 8), + running: *atomic.NewBool(true), + metricWriteBytes: redoWriteBytesGauge. + WithLabelValues("default", "test-cf11"), + metricFsyncDuration: redoFsyncDurationHistogram. + WithLabelValues("default", "test-cf11"), + metricFlushAllDuration: redoFlushAllDurationHistogram. + WithLabelValues("default", "test-cf11"), + uuidGenerator: uuidGen, } w1.eventCommitTS.Store(1) _, err = w1.Write([]byte("tes1t11111")) require.Nil(t, err) // create a .tmp file - fileName = fmt.Sprintf("%s_%s_%d_%s_%d%s", w1.cfg.CaptureID, w1.cfg.ChangeFeedID, w1.cfg.CreateTime.Unix(), w1.cfg.FileType, 1, common.LogEXT) + common.TmpEXT + fileName = fmt.Sprintf(common.RedoLogFileFormatV1, w1.cfg.CaptureID, + w1.cfg.ChangeFeedID, + w1.cfg.FileType, 1, uuidGen.NewString(), common.LogEXT) + common.TmpEXT path = filepath.Join(w1.cfg.Dir, fileName) info, err = os.Stat(path) require.Nil(t, err) @@ -141,22 +159,34 @@ func TestWriterGC(t *testing.T) { require.Nil(t, err) defer os.RemoveAll(dir) + uuidGen := uuid.NewConstGenerator("const-uuid") controller := gomock.NewController(t) mockStorage := mockstorage.NewMockExternalStorage(controller) - mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_946688461_row_1.log.tmp", gomock.Any()).Return(nil).Times(1) - mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_946688461_row_1.log", gomock.Any()).Return(nil).Times(1) - mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_946688461_row_1.log.tmp").Return(nil).Times(1) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_1_const-uuid.log.tmp", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_1_const-uuid.log", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_row_1_const-uuid.log.tmp"). + Return(nil).Times(1) - mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_946688461_row_2.log.tmp", gomock.Any()).Return(nil).Times(1) - mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_946688461_row_2.log", gomock.Any()).Return(nil).Times(1) - mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_946688461_row_2.log.tmp").Return(nil).Times(1) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_2_const-uuid.log.tmp", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_2_const-uuid.log", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_row_2_const-uuid.log.tmp"). + Return(nil).Times(1) - mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_946688461_row_3.log.tmp", gomock.Any()).Return(nil).Times(1) - mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_946688461_row_3.log", gomock.Any()).Return(nil).Times(1) - mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_946688461_row_3.log.tmp").Return(nil).Times(1) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_3_const-uuid.log.tmp", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_3_const-uuid.log", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_row_3_const-uuid.log.tmp"). + Return(nil).Times(1) - mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_946688461_row_1.log").Return(errors.New("ignore err")).Times(1) - mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_946688461_row_2.log").Return(errors.New("ignore err")).Times(1) + mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_row_1_const-uuid.log"). + Return(errors.New("ignore err")).Times(1) + mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_row_2_const-uuid.log"). + Return(errors.New("ignore err")).Times(1) megabyte = 1 cfg := &FileWriterConfig{ @@ -170,12 +200,16 @@ func TestWriterGC(t *testing.T) { S3Storage: true, } w := &Writer{ - cfg: cfg, - uint64buf: make([]byte, 8), - storage: mockStorage, - metricWriteBytes: redoWriteBytesGauge.WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID), - metricFsyncDuration: redoFsyncDurationHistogram.WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID), - metricFlushAllDuration: redoFlushAllDurationHistogram.WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID), + cfg: cfg, + uint64buf: make([]byte, 8), + storage: mockStorage, + metricWriteBytes: redoWriteBytesGauge. + WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID), + metricFsyncDuration: redoFsyncDurationHistogram. + WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID), + metricFlushAllDuration: redoFlushAllDurationHistogram. + WithLabelValues(cfg.CaptureID, cfg.ChangeFeedID), + uuidGenerator: uuidGen, } w.running.Store(true) w.eventCommitTS.Store(1) @@ -236,11 +270,14 @@ func TestNewWriter(t *testing.T) { require.Nil(t, err) defer os.RemoveAll(dir) + uuidGen := uuid.NewConstGenerator("const-uuid") w, err := NewWriter(context.Background(), &FileWriterConfig{ Dir: "sdfsf", S3Storage: true, S3URI: *s3URI, - }) + }, + WithUUIDGenerator(func() uuid.Generator { return uuidGen }), + ) require.Nil(t, err) time.Sleep(time.Duration(defaultFlushIntervalInMs+1) * time.Millisecond) err = w.Close() @@ -249,9 +286,12 @@ func TestNewWriter(t *testing.T) { controller := gomock.NewController(t) mockStorage := mockstorage.NewMockExternalStorage(controller) - mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_946688461_ddl_0.log.tmp", gomock.Any()).Return(nil).Times(2) - mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_946688461_ddl_0.log", gomock.Any()).Return(nil).Times(1) - mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_946688461_ddl_0.log.tmp").Return(nil).Times(1) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_ddl_0_const-uuid.log.tmp", + gomock.Any()).Return(nil).Times(2) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_ddl_0_const-uuid.log", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_ddl_0_const-uuid.log.tmp"). + Return(nil).Times(1) w = &Writer{ cfg: &FileWriterConfig{ @@ -263,11 +303,15 @@ func TestNewWriter(t *testing.T) { S3Storage: true, MaxLogSize: defaultMaxLogSize, }, - uint64buf: make([]byte, 8), - storage: mockStorage, - metricWriteBytes: redoWriteBytesGauge.WithLabelValues("cp", "test"), - metricFsyncDuration: redoFsyncDurationHistogram.WithLabelValues("cp", "test"), - metricFlushAllDuration: redoFlushAllDurationHistogram.WithLabelValues("cp", "test"), + uint64buf: make([]byte, 8), + storage: mockStorage, + metricWriteBytes: redoWriteBytesGauge. + WithLabelValues("default", "test"), + metricFsyncDuration: redoFsyncDurationHistogram. + WithLabelValues("default", "test"), + metricFlushAllDuration: redoFlushAllDurationHistogram. + WithLabelValues("default", "test"), + uuidGenerator: uuidGen, } w.running.Store(true) _, err = w.Write([]byte("test")) @@ -281,3 +325,68 @@ func TestNewWriter(t *testing.T) { require.Equal(t, w.running.Load(), false) time.Sleep(time.Duration(defaultFlushIntervalInMs+1) * time.Millisecond) } + +func TestRotateFile(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, err := NewWriter(ctx, nil) + require.NotNil(t, err) + + controller := gomock.NewController(t) + mockStorage := mockstorage.NewMockExternalStorage(controller) + + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_0_uuid-1.log.tmp", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_0_uuid-2.log", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_row_0_uuid-1.log.tmp"). + Return(nil).Times(1) + + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_0_uuid-3.log.tmp", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().WriteFile(gomock.Any(), "cp_test_row_100_uuid-4.log", + gomock.Any()).Return(nil).Times(1) + mockStorage.EXPECT().DeleteFile(gomock.Any(), "cp_test_row_0_uuid-3.log.tmp"). + Return(nil).Times(1) + + dir := t.TempDir() + uuidGen := uuid.NewMock() + uuidGen.Push("uuid-1") + uuidGen.Push("uuid-2") + uuidGen.Push("uuid-3") + uuidGen.Push("uuid-4") + uuidGen.Push("uuid-5") + w := &Writer{ + cfg: &FileWriterConfig{ + Dir: dir, + CaptureID: "cp", + ChangeFeedID: "test", + FileType: common.DefaultRowLogFileType, + CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}), + S3Storage: true, + MaxLogSize: defaultMaxLogSize, + }, + uint64buf: make([]byte, 8), + metricWriteBytes: redoWriteBytesGauge. + WithLabelValues("default", "test"), + metricFsyncDuration: redoFsyncDurationHistogram. + WithLabelValues("default", "test"), + metricFlushAllDuration: redoFlushAllDurationHistogram. + WithLabelValues("default", "test"), + storage: mockStorage, + uuidGenerator: uuidGen, + } + + w.running.Store(true) + _, err = w.Write([]byte("test")) + require.Nil(t, err) + + err = w.rotate() + require.Nil(t, err) + + w.AdvanceTs(100) + _, err = w.Write([]byte("test")) + require.Nil(t, err) + err = w.rotate() + require.Nil(t, err) +} diff --git a/cdc/redo/writer/writer.go b/cdc/redo/writer/writer.go index 5c104b4040d..a2ffc7b18c8 100644 --- a/cdc/redo/writer/writer.go +++ b/cdc/redo/writer/writer.go @@ -108,7 +108,9 @@ type LogWriter struct { } // NewLogWriter creates a LogWriter instance. It is guaranteed only one LogWriter per changefeed -func NewLogWriter(ctx context.Context, cfg *LogWriterConfig) (*LogWriter, error) { +func NewLogWriter( + ctx context.Context, cfg *LogWriterConfig, opts ...Option, +) (*LogWriter, error) { if cfg == nil { return nil, cerror.WrapError(cerror.ErrRedoConfigInvalid, errors.New("LogWriterConfig can not be nil")) } @@ -150,11 +152,11 @@ func NewLogWriter(ctx context.Context, cfg *LogWriterConfig) (*LogWriter, error) logWriter = &LogWriter{ cfg: cfg, } - logWriter.rowWriter, err = NewWriter(ctx, rowCfg) + logWriter.rowWriter, err = NewWriter(ctx, rowCfg, opts...) if err != nil { return nil, err } - logWriter.ddlWriter, err = NewWriter(ctx, ddlCfg) + logWriter.ddlWriter, err = NewWriter(ctx, ddlCfg, opts...) if err != nil { return nil, err } diff --git a/cdc/redo/writer/writer_test.go b/cdc/redo/writer/writer_test.go index d071ca30474..15c59f1e4b8 100644 --- a/cdc/redo/writer/writer_test.go +++ b/cdc/redo/writer/writer_test.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/uuid" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/multierr" @@ -618,7 +619,10 @@ func TestNewLogWriter(t *testing.T) { CreateTime: time.Date(2000, 1, 1, 1, 1, 1, 1, &time.Location{}), FlushIntervalInMs: 5, } - ll, err := NewLogWriter(ctx, cfg) + uuidGen := uuid.NewConstGenerator("const-uuid") + ll, err := NewLogWriter(ctx, cfg, + WithUUIDGenerator(func() uuid.Generator { return uuidGen }), + ) require.Nil(t, err) time.Sleep(time.Duration(defaultGCIntervalInMs+1) * time.Millisecond) require.Equal(t, map[int64]uint64{}, ll.meta.ResolvedTsList) diff --git a/pkg/uuid/mock.go b/pkg/uuid/mock.go new file mode 100644 index 00000000000..c3055a8e1d7 --- /dev/null +++ b/pkg/uuid/mock.go @@ -0,0 +1,56 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package uuid + +import "github.com/pingcap/log" + +// MockGenerator is a mocked uuid generator +type MockGenerator struct { + list []string +} + +// NewMock creates a new MockGenerator instance +func NewMock() *MockGenerator { + return &MockGenerator{} +} + +// NewString implements Generator.NewString +func (g *MockGenerator) NewString() (ret string) { + if len(g.list) == 0 { + log.L().Panic("Empty uuid list. Please use Push() to add a uuid to the list.") + } + + ret, g.list = g.list[0], g.list[1:] + return +} + +// Push adds a candidate uuid in FIFO list +func (g *MockGenerator) Push(uuid string) { + g.list = append(g.list, uuid) +} + +// ConstGenerator is a mocked uuid generator, which always generate a pre defined uuid +type ConstGenerator struct { + uid string +} + +// NewConstGenerator creates a new ConstGenerator instance +func NewConstGenerator(uid string) *ConstGenerator { + return &ConstGenerator{uid: uid} +} + +// NewString implements Generator.NewString +func (g *ConstGenerator) NewString() string { + return g.uid +} diff --git a/pkg/uuid/uuid.go b/pkg/uuid/uuid.go new file mode 100644 index 00000000000..f542ff87282 --- /dev/null +++ b/pkg/uuid/uuid.go @@ -0,0 +1,32 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package uuid + +import guuid "github.com/google/uuid" + +// Generator defines an interface that can generate a uuid +type Generator interface { + NewString() string +} + +type generatorImpl struct{} + +func (g *generatorImpl) NewString() string { + return guuid.New().String() +} + +// NewGenerator creates a new generatorImpl instance +func NewGenerator() Generator { + return &generatorImpl{} +} diff --git a/pkg/uuid/uuid_test.go b/pkg/uuid/uuid_test.go new file mode 100644 index 00000000000..1513c67185a --- /dev/null +++ b/pkg/uuid/uuid_test.go @@ -0,0 +1,53 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package uuid + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGenerator(t *testing.T) { + t.Parallel() + + gen := NewGenerator() + uuid1 := gen.NewString() + uuid2 := gen.NewString() + require.NotEqual(t, uuid1, uuid2) +} + +func TestMockGenerator(t *testing.T) { + t.Parallel() + + gen := NewMock() + require.Panics(t, func() { gen.NewString() }) + + uuids := []string{"uuid1", "uuid2", "uuid3"} + for _, uid := range uuids { + gen.Push(uid) + } + for _, uid := range uuids { + require.Equal(t, uid, gen.NewString()) + } +} + +func TestConstGenerator(t *testing.T) { + t.Parallel() + + uid := "const-uuid" + gen := NewConstGenerator(uid) + for i := 0; i < 3; i++ { + require.Equal(t, uid, gen.NewString()) + } +}