From a84506cdefff75fd09164d41c224e004edc4c778 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 7 Jun 2021 13:31:36 +0800 Subject: [PATCH] modify unified sorter IO error handling --- cdc/puller/sorter/file_backend.go | 58 ++++++++++++++--------- cdc/puller/sorter/file_backend_test.go | 64 ++++++++++++++++++++++++++ pkg/errors/errors.go | 2 +- 3 files changed, 101 insertions(+), 23 deletions(-) create mode 100644 cdc/puller/sorter/file_backend_test.go diff --git a/cdc/puller/sorter/file_backend.go b/cdc/puller/sorter/file_backend.go index e56735f0e1a..6ded3d1aaa1 100644 --- a/cdc/puller/sorter/file_backend.go +++ b/cdc/puller/sorter/file_backend.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + cerrors "github.com/pingcap/ticdc/pkg/errors" "go.uber.org/zap" ) @@ -46,12 +47,12 @@ type fileBackEnd struct { func newFileBackEnd(fileName string, serde serializerDeserializer) (*fileBackEnd, error) { f, err := os.Create(fileName) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Trace(wrapIOError(err)) } err = f.Close() if err != nil { - return nil, errors.Trace(err) + return nil, errors.Trace(wrapIOError(err)) } log.Debug("new FileSorterBackEnd created", zap.String("filename", fileName)) @@ -65,7 +66,7 @@ func newFileBackEnd(fileName string, serde serializerDeserializer) (*fileBackEnd func (f *fileBackEnd) reader() (backEndReader, error) { fd, err := os.OpenFile(f.fileName, os.O_RDWR, 0o644) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Trace(wrapIOError(err)) } atomic.AddInt64(&openFDCount, 1) @@ -74,7 +75,7 @@ func (f *fileBackEnd) reader() (backEndReader, error) { failpoint.Inject("sorterDebug", func() { info, err := fd.Stat() if err != nil { - failpoint.Return(nil, errors.Trace(err)) + failpoint.Return(nil, errors.Trace(wrapIOError(err))) } totalSize = info.Size() }) @@ -94,7 +95,7 @@ func (f *fileBackEnd) reader() (backEndReader, error) { err = ret.readHeader() if err != nil { - return nil, errors.Trace(err) + return nil, errors.Trace(wrapIOError(err)) } return ret, nil @@ -103,7 +104,7 @@ func (f *fileBackEnd) reader() (backEndReader, error) { func (f *fileBackEnd) writer() (backEndWriter, error) { fd, err := os.OpenFile(f.fileName, os.O_TRUNC|os.O_RDWR, 0o644) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Trace(wrapIOError(err)) } atomic.AddInt64(&openFDCount, 1) @@ -122,7 +123,7 @@ func (f *fileBackEnd) writer() (backEndWriter, error) { err = ret.writeFileHeader() if err != nil { - return nil, errors.Trace(err) + return nil, errors.Trace(wrapIOError(err)) } return ret, nil @@ -142,10 +143,10 @@ func (f *fileBackEnd) free() error { err := os.Remove(f.fileName) if err != nil { failpoint.Inject("sorterDebug", func() { - failpoint.Return(errors.Trace(err)) + failpoint.Return(errors.Trace(wrapIOError(err))) }) // ignore this error in production to provide some resilience - log.Warn("fileBackEnd: failed to remove file", zap.Error(err)) + log.Warn("fileBackEnd: failed to remove file", zap.Error(wrapIOError(err))) } return nil @@ -223,7 +224,7 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) { } return nil, nil } - return nil, errors.Trace(err) + return nil, errors.Trace(wrapIOError(err)) } if m != blockMagic { @@ -233,7 +234,7 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) { var size uint32 err = binary.Read(r.reader, binary.LittleEndian, &size) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Trace(wrapIOError(err)) } if cap(r.rawBytesBuf) < int(size) { @@ -245,7 +246,7 @@ func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) { // short reads are possible with bufio, hence the need for io.ReadFull n, err := io.ReadFull(r.reader, r.rawBytesBuf) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Trace(wrapIOError(err)) } if n != int(size) { @@ -296,7 +297,7 @@ func (r *fileBackEndReader) resetAndClose() error { failpoint.Inject("sorterDebug", func() { info, err1 := r.f.Stat() if err1 != nil { - failpoint.Return(errors.Trace(err)) + failpoint.Return(errors.Trace(wrapIOError(err))) } log.Info("file debug info", zap.String("filename", info.Name()), @@ -350,7 +351,7 @@ func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error { var err error w.rawBytesBuf, err = w.backEnd.serde.marshal(event, w.rawBytesBuf) if err != nil { - return errors.Trace(err) + return errors.Trace(wrapIOError(err)) } size := len(w.rawBytesBuf) @@ -360,12 +361,12 @@ func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error { err = binary.Write(w.writer, binary.LittleEndian, uint32(blockMagic)) if err != nil { - return errors.Trace(err) + return errors.Trace(wrapIOError(err)) } err = binary.Write(w.writer, binary.LittleEndian, uint32(size)) if err != nil { - return errors.Trace(err) + return errors.Trace(wrapIOError(err)) } // short writes are possible with bufio @@ -373,7 +374,7 @@ func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error { for offset < size { n, err := w.writer.Write(w.rawBytesBuf[offset:]) if err != nil { - return errors.Trace(err) + return errors.Trace(wrapIOError(err)) } offset += n } @@ -402,24 +403,24 @@ func (w *fileBackEndWriter) flushAndClose() error { err := w.writer.Flush() if err != nil { - return errors.Trace(err) + return errors.Trace(wrapIOError(err)) } _, err = w.f.Seek(numFileEntriesOffset, 0 /* relative to the beginning of the file */) if err != nil { - return errors.Trace(err) + return errors.Trace(wrapIOError(err)) } // write the total number of entries in the file to the header err = binary.Write(w.f, binary.LittleEndian, uint64(w.eventsWritten)) if err != nil { - return errors.Trace(err) + return errors.Trace(wrapIOError(err)) } err = w.f.Close() if err != nil { failpoint.Inject("sorterDebug", func() { - failpoint.Return(errors.Trace(err)) + failpoint.Return(errors.Trace(wrapIOError(err))) }) log.Warn("fileBackEndReader: could not close file", zap.Error(err)) return nil @@ -435,3 +436,16 @@ func (w *fileBackEndWriter) flushAndClose() error { return nil } + +// wrapIOError should be called when the error is to be returned to an caller outside this file and +// if the error could be caused by a filesystem-related error. +func wrapIOError(err error) error { + cause := errors.Cause(err) + switch cause.(type) { + case *os.PathError: + // We don't generate stack in this helper function to avoid confusion. + return cerrors.ErrUnifiedSorterIOError.FastGenByArgs(err.Error()) + default: + return err + } +} diff --git a/cdc/puller/sorter/file_backend_test.go b/cdc/puller/sorter/file_backend_test.go new file mode 100644 index 00000000000..c1035058446 --- /dev/null +++ b/cdc/puller/sorter/file_backend_test.go @@ -0,0 +1,64 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "io" + "os" + + "github.com/pingcap/check" + "github.com/pingcap/ticdc/cdc/model" + cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/util/testleak" +) + +type fileBackendSuite struct{} + +var _ = check.SerialSuites(&fileBackendSuite{}) + +func (s *fileBackendSuite) TestWrapIOError(c *check.C) { + defer testleak.AfterTest(c)() + + fullFile, err := os.OpenFile("/dev/full", os.O_RDWR, 0) + c.Assert(err, check.IsNil) + defer fullFile.Close() //nolint:errcheck + _, err = fullFile.WriteString("test") + wrapped := wrapIOError(err) + // tests that the error message gives the user some informative description + c.Assert(wrapped, check.ErrorMatches, ".*review the settings.*no space.*") + + eof := wrapIOError(io.EOF) + // tests that the function does not change io.EOF + c.Assert(eof, check.Equals, io.EOF) +} + +func (s *fileBackendSuite) TestNoSpace(c *check.C) { + defer testleak.AfterTest(c)() + + fb := &fileBackEnd{ + fileName: "/dev/full", + serde: &msgPackGenSerde{}, + } + w, err := fb.writer() + c.Assert(err, check.IsNil) + + err = w.writeNext(model.NewPolymorphicEvent(generateMockRawKV(0))) + if err == nil { + // Due to write buffering, `writeNext` might not return an error when the filesystem is full. + err = w.flushAndClose() + } + + c.Assert(err, check.ErrorMatches, ".*review the settings.*no space.*") + c.Assert(cerrors.ErrUnifiedSorterIOError.Equal(err), check.IsTrue) +} diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 7050662ce32..a9fe78a52a6 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -208,7 +208,7 @@ var ( ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating")) ErrIllegalUnifiedSorterParameter = errors.Normalize("illegal parameter for unified sorter: %s", errors.RFCCodeText("CDC:ErrIllegalUnifiedSorterParameter")) ErrAsyncIOCancelled = errors.Normalize("asynchronous IO operation is cancelled. Internal use only, report a bug if seen in log", errors.RFCCodeText("CDC:ErrAsyncIOCancelled")) - ErrUnifiedSorterIOError = errors.Normalize("unified sorter IO error", errors.RFCCodeText("CDC:ErrUnifiedSorterIOError")) + ErrUnifiedSorterIOError = errors.Normalize("unified sorter IO error. Make sure your sort-dir is configured correctly by passing a valid argument or toml file to `cdc server`, or if you use TiUP, review the settings in `tiup cluster edit-config`. Details: %s", errors.RFCCodeText("CDC:ErrUnifiedSorterIOError")) ErrConflictingFileLocks = errors.Normalize("file lock conflict: %s", errors.RFCCodeText("ErrConflictingFileLocks")) ErrSortDirLockError = errors.Normalize("error encountered when locking sort-dir", errors.RFCCodeText("ErrSortDirLockError"))