Skip to content

Commit

Permalink
redo(ticdc): set default timeout for accessing external storage to 15…
Browse files Browse the repository at this point in the history
… minutes (#8181) (#8205)

close #8089
  • Loading branch information
ti-chi-bot authored Mar 27, 2023
1 parent e85be56 commit 13f462b
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 113 deletions.
16 changes: 8 additions & 8 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ const (
// pageBytes is the alignment for flushing records to the backing Writer.
// It should be a multiple of the minimum sector size so that log can safely
// distinguish between torn writes and ordinary data corruption.
pageBytes = 8 * redo.MinSectorSize
defaultS3Timeout = 15 * time.Second
pageBytes = 8 * redo.MinSectorSize
)

var (
Expand Down Expand Up @@ -299,7 +298,9 @@ func (w *Writer) Close() error {
common.RedoWriteBytesGauge.
DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID)

return w.close()
ctx, cancel := context.WithTimeout(context.Background(), redo.CloseTimeout)
defer cancel()
return w.close(ctx)
}

// IsRunning implement IsRunning interface
Expand All @@ -311,7 +312,7 @@ func (w *Writer) isGCRunning() bool {
return w.gcRunning.Load()
}

func (w *Writer) close() error {
func (w *Writer) close(ctx context.Context) error {
if w.file == nil {
return nil
}
Expand Down Expand Up @@ -359,9 +360,6 @@ func (w *Writer) close() error {
// We only write content to S3 before closing the local file.
// By this way, we no longer need renaming object in S3.
if w.cfg.UseExternalStorage {
ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout)
defer cancel()

err = w.writeToS3(ctx, w.ongoingFilePath)
if err != nil {
w.file.Close()
Expand Down Expand Up @@ -449,7 +447,9 @@ func (w *Writer) newPageWriter() error {
}

func (w *Writer) rotate() error {
if err := w.close(); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), redo.DefaultTimeout)
defer cancel()
if err := w.close(ctx); err != nil {
return err
}
return w.openNew()
Expand Down
93 changes: 41 additions & 52 deletions cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,15 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/uuid"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -84,7 +82,7 @@ func NewRedoLogWriter(

scheme := uri.Scheme
if !redo.IsValidConsistentStorage(scheme) {
return nil, cerror.ErrConsistentStorage.GenWithStackByArgs(scheme)
return nil, errors.ErrConsistentStorage.GenWithStackByArgs(scheme)
}
if redo.IsBlackholeStorage(scheme) {
return NewBlackHoleWriter(), nil
Expand Down Expand Up @@ -149,7 +147,8 @@ func newLogWriter(
ctx context.Context, cfg *logWriterConfig, opts ...Option,
) (lw *logWriter, err error) {
if cfg == nil {
return nil, cerror.WrapError(cerror.ErrRedoConfigInvalid, errors.New("LogWriterConfig can not be nil"))
err := errors.New("LogWriterConfig can not be nil")
return nil, errors.WrapError(errors.ErrRedoConfigInvalid, err)
}

lw = &logWriter{cfg: cfg}
Expand Down Expand Up @@ -222,7 +221,7 @@ func newLogWriter(
func (l *logWriter) preCleanUpS3(ctx context.Context) error {
ret, err := l.extStorage.FileExists(ctx, l.getDeletedChangefeedMarker())
if err != nil {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
if !ret {
return nil
Expand All @@ -244,8 +243,8 @@ func (l *logWriter) preCleanUpS3(ctx context.Context) error {
return err
}
err = l.extStorage.DeleteFile(ctx, l.getDeletedChangefeedMarker())
if !isNotExistInS3(err) {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
if !util.IsNotExistInExtStorage(err) {
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}

return nil
Expand All @@ -265,12 +264,13 @@ func (l *logWriter) initMeta(ctx context.Context) error {
if os.IsNotExist(err) {
return nil
}
return cerror.WrapError(cerror.ErrRedoMetaInitialize, errors.Annotate(err, "read meta file fail"))
err = errors.Annotate(err, "read meta file fail")
return errors.WrapError(errors.ErrRedoMetaInitialize, err)
}

_, err = l.meta.UnmarshalMsg(data)
if err != nil {
return cerror.WrapError(cerror.ErrRedoMetaInitialize, err)
return errors.WrapError(errors.ErrRedoMetaInitialize, err)
}

return nil
Expand Down Expand Up @@ -299,7 +299,7 @@ func (l *logWriter) WriteLog(
}

if l.isStopped() {
return cerror.ErrRedoWriterStopped.GenWithStackByArgs()
return errors.ErrRedoWriterStopped.GenWithStackByArgs()
}
if len(rows) == 0 {
return nil
Expand All @@ -316,7 +316,7 @@ func (l *logWriter) WriteLog(
rl.Type = model.RedoLogTypeRow
data, err := rl.MarshalMsg(nil)
if err != nil {
return cerror.WrapError(cerror.ErrMarshalFailed, err)
return errors.WrapError(errors.ErrMarshalFailed, err)
}

l.rowWriter.AdvanceTs(r.Row.CommitTs)
Expand All @@ -339,7 +339,7 @@ func (l *logWriter) SendDDL(ctx context.Context, ddl *model.RedoDDLEvent) error
}

if l.isStopped() {
return cerror.ErrRedoWriterStopped.GenWithStackByArgs()
return errors.ErrRedoWriterStopped.GenWithStackByArgs()
}
if ddl == nil || ddl.DDL == nil {
return nil
Expand All @@ -353,7 +353,7 @@ func (l *logWriter) SendDDL(ctx context.Context, ddl *model.RedoDDLEvent) error
rl.Type = model.RedoLogTypeDDL
data, err := rl.MarshalMsg(nil)
if err != nil {
return cerror.WrapError(cerror.ErrMarshalFailed, err)
return errors.WrapError(errors.ErrMarshalFailed, err)
}

l.ddlWriter.AdvanceTs(ddl.DDL.CommitTs)
Expand All @@ -370,10 +370,10 @@ func (l *logWriter) FlushLog(ctx context.Context, checkpointTs, resolvedTs model
}

if l.isStopped() {
return cerror.ErrRedoWriterStopped.GenWithStackByArgs()
return errors.ErrRedoWriterStopped.GenWithStackByArgs()
}

return l.flush(checkpointTs, resolvedTs)
return l.flush(ctx, checkpointTs, resolvedTs)
}

// GetMeta implement GetMeta api
Expand All @@ -396,7 +396,7 @@ func (l *logWriter) DeleteAllLogs(ctx context.Context) (err error) {
log.Warn("read removed log dir fail", zap.Error(err))
return nil
}
return cerror.WrapError(cerror.ErrRedoFileOp,
return errors.WrapError(errors.ErrRedoFileOp,
errors.Annotatef(err, "can't read log file directory: %s", l.cfg.Dir))
}

Expand All @@ -412,7 +412,7 @@ func (l *logWriter) DeleteAllLogs(ctx context.Context) (err error) {
log.Warn("removed log dir fail", zap.Error(err))
return nil
}
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
} else {
for _, file := range filteredFiles {
Expand All @@ -421,7 +421,7 @@ func (l *logWriter) DeleteAllLogs(ctx context.Context) (err error) {
log.Warn("removed log dir fail", zap.Error(err))
return nil
}
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
}
}
Expand Down Expand Up @@ -458,7 +458,7 @@ func (l *logWriter) getDeletedChangefeedMarker() string {
}

func (l *logWriter) writeDeletedMarkerToS3(ctx context.Context) error {
return cerror.WrapError(cerror.ErrExternalStorageAPI,
return errors.WrapError(errors.ErrExternalStorageAPI,
l.extStorage.WriteFile(ctx, l.getDeletedChangefeedMarker(), []byte("D")))
}

Expand All @@ -470,8 +470,8 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error {
err := l.extStorage.DeleteFile(eCtx, name)
if err != nil {
// if fail then retry, may end up with notExit err, ignore the error
if !isNotExistInS3(err) {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
if !util.IsNotExistInExtStorage(err) {
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
}
return nil
Expand All @@ -480,27 +480,14 @@ func (l *logWriter) deleteFilesInS3(ctx context.Context, files []string) error {
return eg.Wait()
}

func isNotExistInS3(err error) bool {
// TODO: support other storage
if err != nil {
if aerr, ok := errors.Cause(err).(awserr.Error); ok { // nolint:errorlint
switch aerr.Code() {
case s3.ErrCodeNoSuchKey:
return true
}
}
}
return false
}

var getAllFilesInS3 = func(ctx context.Context, l *logWriter) ([]string, error) {
files := []string{}
err := l.extStorage.WalkDir(ctx, &storage.WalkOption{}, func(path string, _ int64) error {
files = append(files, path)
return nil
})
if err != nil {
return nil, cerror.WrapError(cerror.ErrExternalStorageAPI, err)
return nil, errors.WrapError(errors.ErrExternalStorageAPI, err)
}

return files, nil
Expand All @@ -521,15 +508,15 @@ func (l *logWriter) Close() (err error) {
}

// flush flushes all the buffered data to the disk.
func (l *logWriter) flush(checkpointTs, resolvedTs model.Ts) (err error) {
func (l *logWriter) flush(ctx context.Context, checkpointTs, resolvedTs model.Ts) (err error) {
if l.cfg.EmitDDLEvents {
err = multierr.Append(err, l.ddlWriter.Flush())
}
if l.cfg.EmitRowEvents {
err = multierr.Append(err, l.rowWriter.Flush())
}
if l.cfg.EmitMeta {
err = multierr.Append(err, l.flushLogMeta(checkpointTs, resolvedTs))
err = multierr.Append(err, l.flushLogMeta(ctx, checkpointTs, resolvedTs))
}
return
}
Expand Down Expand Up @@ -575,12 +562,12 @@ func (l *logWriter) maybeUpdateMeta(checkpointTs, resolvedTs uint64) ([]byte, er

data, err := l.meta.MarshalMsg(nil)
if err != nil {
err = cerror.WrapError(cerror.ErrMarshalFailed, err)
err = errors.WrapError(errors.ErrMarshalFailed, err)
}
return data, err
}

func (l *logWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error {
func (l *logWriter) flushLogMeta(ctx context.Context, checkpointTs, resolvedTs uint64) error {
data, err := l.maybeUpdateMeta(checkpointTs, resolvedTs)
if err != nil {
return err
Expand All @@ -592,34 +579,31 @@ func (l *logWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error {
if !l.cfg.UseExternalStorage {
return l.flushMetaToLocal(data)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultS3Timeout)
defer cancel()
return l.flushMetaToS3(ctx, data)
}

func (l *logWriter) flushMetaToLocal(data []byte) error {
if err := os.MkdirAll(l.cfg.Dir, redo.DefaultDirMode); err != nil {
e := errors.Annotate(err, "can't make dir for new redo logfile")
return cerror.WrapError(cerror.ErrRedoFileOp, e)
return errors.WrapError(errors.ErrRedoFileOp, e)
}

metaFile, err := openTruncFile(l.filePath())
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
_, err = metaFile.Write(data)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
err = metaFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}

if l.preMetaFile != "" {
if err := os.Remove(l.preMetaFile); err != nil && !os.IsNotExist(err) {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return errors.WrapError(errors.ErrRedoFileOp, err)
}
}
l.preMetaFile = metaFile.Name()
Expand All @@ -631,12 +615,17 @@ func (l *logWriter) flushMetaToS3(ctx context.Context, data []byte) error {
start := time.Now()
metaFile := l.getMetafileName()
if err := l.extStorage.WriteFile(ctx, metaFile, data); err != nil {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}

if l.preMetaFile != "" {
if err := l.extStorage.DeleteFile(ctx, l.preMetaFile); err != nil && !isNotExistInS3(err) {
return cerror.WrapError(cerror.ErrExternalStorageAPI, err)
if l.preMetaFile == metaFile {
// This should only happen when use a constant uuid generator in test.
return nil
}
err := l.extStorage.DeleteFile(ctx, l.preMetaFile)
if err != nil && !util.IsNotExistInExtStorage(err) {
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
}
l.preMetaFile = metaFile
Expand Down
Loading

0 comments on commit 13f462b

Please sign in to comment.