Skip to content

Commit

Permalink
redo/(ticdc): Optimize redolog I/O performance (#6262)
Browse files Browse the repository at this point in the history
close #5920
  • Loading branch information
zhaoxinyu authored Jul 18, 2022
1 parent e351bac commit b2f5bd4
Show file tree
Hide file tree
Showing 16 changed files with 690 additions and 150 deletions.
171 changes: 100 additions & 71 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,16 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/fsutil"
"github.com/pingcap/tiflow/pkg/uuid"
)

const (
// pageBytes is the alignment for flushing records to the backing Writer.
// It should be a multiple of the minimum sector size so that log can safely
// distinguish between torn writes and ordinary data corruption.
pageBytes = 8 * common.MinSectorSize
)

const (
defaultFlushIntervalInMs = 1000
defaultS3Timeout = 3 * time.Second
pageBytes = 8 * common.MinSectorSize
defaultS3Timeout = 3 * time.Second
)

var (
Expand Down Expand Up @@ -83,10 +80,9 @@ type FileWriterConfig struct {
FileType string
CreateTime time.Time
// MaxLogSize is the maximum size of log in megabyte, defaults to defaultMaxLogSize.
MaxLogSize int64
FlushIntervalInMs int64
S3Storage bool
S3URI url.URL
MaxLogSize int64
S3Storage bool
S3URI url.URL
}

// Option define the writerOptions
Expand Down Expand Up @@ -136,6 +132,7 @@ type Writer struct {
storage storage.ExternalStorage
sync.RWMutex
uuidGenerator uuid.Generator
allocator *fsutil.FileAllocator

metricFsyncDuration prometheus.Observer
metricFlushAllDuration prometheus.Observer
Expand All @@ -148,9 +145,6 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri
return nil, cerror.WrapError(cerror.ErrRedoConfigInvalid, errors.New("FileWriterConfig can not be nil"))
}

if cfg.FlushIntervalInMs == 0 {
cfg.FlushIntervalInMs = defaultFlushIntervalInMs
}
cfg.MaxLogSize *= megabyte
if cfg.MaxLogSize == 0 {
cfg.MaxLogSize = defaultMaxLogSize
Expand All @@ -168,6 +162,7 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri
for _, opt := range opts {
opt(op)
}

w := &Writer{
cfg: cfg,
op: op,
Expand All @@ -187,6 +182,13 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri
w.uuidGenerator = uuid.NewGenerator()
}

// if we use S3 as the remote storage, a file allocator can be leveraged to
// pre-allocate files for us.
// TODO: test whether this improvement can also be applied to NFS.
if cfg.S3Storage {
w.allocator = fsutil.NewFileAllocator(cfg.Dir, cfg.FileType, defaultMaxLogSize)
}

w.running.Store(true)
return w, nil
}
Expand All @@ -203,7 +205,7 @@ func (w *Writer) Write(rawData []byte) (int, error) {
}

if w.file == nil {
if err := w.openOrNew(len(rawData)); err != nil {
if err := w.openNew(); err != nil {
return 0, err
}
}
Expand All @@ -213,6 +215,7 @@ func (w *Writer) Write(rawData []byte) (int, error) {
return 0, err
}
}

if w.maxCommitTS.Load() < w.eventCommitTS.Load() {
w.maxCommitTS.Store(w.eventCommitTS.Load())
}
Expand All @@ -227,8 +230,12 @@ func (w *Writer) Write(rawData []byte) (int, error) {
}

n, err := w.bw.Write(rawData)
if err != nil {
return 0, err
}
w.metricWriteBytes.Add(float64(n))
w.size += int64(n)

return n, err
}

Expand Down Expand Up @@ -264,6 +271,11 @@ func (w *Writer) Close() error {
// always set to false when closed, since if having err may not get fixed just by retry
defer w.running.Store(false)

if w.allocator != nil {
w.allocator.Close()
w.allocator = nil
}

if !w.IsRunning() {
return nil
}
Expand Down Expand Up @@ -291,25 +303,46 @@ func (w *Writer) close() error {
if w.file == nil {
return nil
}
err := w.flushAll()
if err != nil {

if err := w.flush(); err != nil {
return err
}

if w.cfg.S3Storage {
off, err := w.file.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
// offset equals to 0 means that no written happened for current file,
// we can simply return
if off == 0 {
return nil
}
// a file created by a file allocator needs to be truncated
// to save disk space and network bandwidth.
if err := w.file.Truncate(off); err != nil {
return err
}
}

// rename the file name from commitTs.log.tmp to maxCommitTS.log if closed safely
// after rename, the file name could be used for search, since the ts is the max ts for all events in the file.
w.commitTS.Store(w.maxCommitTS.Load())
err = os.Rename(w.file.Name(), w.filePath())
err := os.Rename(w.file.Name(), w.filePath())
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

// We only write content to S3 before closing the local file.
// By this way, we no longer need renaming object in S3.
if w.cfg.S3Storage {
ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout)
defer cancel()

err = w.renameInS3(ctx, w.file.Name(), w.ongoingFilePath)
err = w.writeToS3(ctx, w.ongoingFilePath)
if err != nil {
w.file.Close()
w.file = nil
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
}
}
Expand All @@ -319,14 +352,6 @@ func (w *Writer) close() error {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

func (w *Writer) renameInS3(ctx context.Context, oldPath, newPath string) error {
err := w.writeToS3(ctx, newPath)
if err != nil {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
}
return cerror.WrapError(cerror.ErrS3StorageAPI, w.storage.DeleteFile(ctx, filepath.Base(oldPath)))
}

func (w *Writer) getLogFileName() string {
if w.op != nil && w.op.getLogFileName != nil {
return w.op.getLogFileName()
Expand Down Expand Up @@ -357,16 +382,29 @@ func openTruncFile(name string) (*os.File, error) {
func (w *Writer) openNew() error {
err := os.MkdirAll(w.cfg.Dir, common.DefaultDirMode)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotatef(err, "can't make dir: %s for new redo logfile", w.cfg.Dir))
return cerror.WrapError(cerror.ErrRedoFileOp,
errors.Annotatef(err, "can't make dir: %s for new redo logfile", w.cfg.Dir))
}

// reset ts used in file name when new file
w.commitTS.Store(w.eventCommitTS.Load())
w.maxCommitTS.Store(w.eventCommitTS.Load())
path := w.filePath() + common.TmpEXT
f, err := openTruncFile(path)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't open new redo logfile"))
var f *os.File
if w.allocator == nil {
w.commitTS.Store(w.eventCommitTS.Load())
w.maxCommitTS.Store(w.eventCommitTS.Load())
path := w.filePath() + common.TmpEXT
f, err = openTruncFile(path)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp,
errors.Annotate(err, "can't open new redolog file"))
}
} else {
// if there is a file allocator, we use the pre-created file
// supplied by the allocator to boost performance
f, err = w.allocator.Open()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp,
errors.Annotate(err, "can't open new redolog file with file allocator"))
}
}
w.file = f
w.size = 0
Expand All @@ -377,38 +415,6 @@ func (w *Writer) openNew() error {
return nil
}

func (w *Writer) openOrNew(writeLen int) error {
path := w.ongoingFilePath
if path == "" {
return w.openNew()
}
info, err := os.Stat(path)
if os.IsNotExist(err) {
return w.openNew()
}
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "error getting log file info"))
}

if info.Size()+int64(writeLen) >= w.cfg.MaxLogSize {
return w.rotate()
}

file, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, common.DefaultFileMode)
if err != nil {
// return err let the caller decide next move
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

w.file = file
w.size = info.Size()
err = w.newPageWriter()
if err != nil {
return err
}
return nil
}

func (w *Writer) newPageWriter() error {
offset, err := w.file.Seek(0, io.SeekCurrent)
if err != nil {
Expand Down Expand Up @@ -518,7 +524,8 @@ func (w *Writer) getShouldRemovedFiles(checkPointTs uint64) ([]os.FileInfo, erro
return logFiles, nil
}

func (w *Writer) flushAll() error {
// flushAndRotateFile flushes the file to disk and rotate it if S3 storage is used.
func (w *Writer) flushAndRotateFile() error {
if w.file == nil {
return nil
}
Expand All @@ -528,14 +535,23 @@ func (w *Writer) flushAll() error {
if err != nil {
return err
}

if !w.cfg.S3Storage {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout)
defer cancel()
if w.size == 0 {
return nil
}

// for s3 storage, when the file is flushed to disk, we need an immediate
// file rotate. Otherwise, the existing file content would be repeatedly written to S3,
// which could cause considerable network bandwidth waste.
err = w.rotate()
if err != nil {
return nil
}

err = w.writeToS3(ctx, w.file.Name())
w.metricFlushAllDuration.Observe(time.Since(start).Seconds())

return err
Expand All @@ -546,7 +562,7 @@ func (w *Writer) Flush() error {
w.Lock()
defer w.Unlock()

return w.flushAll()
return w.flushAndRotateFile()
}

func (w *Writer) flush() error {
Expand Down Expand Up @@ -574,5 +590,18 @@ func (w *Writer) writeToS3(ctx context.Context, name string) error {
}

// Key in s3: aws.String(rs.options.Prefix + name), prefix should be changefeed name
return cerror.WrapError(cerror.ErrS3StorageAPI, w.storage.WriteFile(ctx, filepath.Base(name), fileData))
err = w.storage.WriteFile(ctx, filepath.Base(name), fileData)
if err != nil {
return cerror.WrapError(cerror.ErrS3StorageAPI, err)
}

// in case the page cache piling up triggered the OS memory reclaming which may cause
// I/O latency spike, we mandatorily drop the page cache of the file when it is successfully
// written to S3.
err = fsutil.DropPageCache(name)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

return nil
}
Loading

0 comments on commit b2f5bd4

Please sign in to comment.