From 34791401ae3f8c17ea78ef28bb98dcec0d7142c6 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 10:30:42 +0800 Subject: [PATCH 01/11] pump/*: Refine some log in storage/ --- pump/server.go | 2 +- pump/storage/log.go | 23 +++++++++-------------- pump/storage/storage.go | 21 +++++++++++++++------ pump/storage/vlog.go | 29 ++++++++++++++++------------- 4 files changed, 41 insertions(+), 34 deletions(-) diff --git a/pump/server.go b/pump/server.go index 496835fae..83467d3fd 100644 --- a/pump/server.go +++ b/pump/server.go @@ -240,7 +240,7 @@ func (s *Server) writeBinlog(ctx context.Context, in *binlog.WriteBinlogReq, isF errHandle: lossBinlogCacheCounter.Add(1) - log.Errorf("write binlog error %v", err) + log.Errorf("write binlog error %+v", err) ret.Errmsg = err.Error() return ret, err } diff --git a/pump/storage/log.go b/pump/storage/log.go index 4c191bd66..fd999da7f 100644 --- a/pump/storage/log.go +++ b/pump/storage/log.go @@ -73,12 +73,12 @@ func encodeRecord(writer io.Writer, payload []byte) (int, error) { n, err := writer.Write(header) if err != nil { - return n, errors.Trace(err) + return n, errors.Annotate(err, "write header failed") } n, err = writer.Write(payload) if err != nil { - return int(headerLength) + n, errors.Trace(err) + return int(headerLength) + n, errors.Annotate(err, "write payload failed") } return int(headerLength) + len(payload), nil @@ -108,14 +108,12 @@ func (r *Record) isValid() bool { func newLogFile(fid uint32, name string) (lf *logFile, err error) { fd, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { - err = errors.Trace(err) - return + return nil, errors.Annotatef(err, "open file %s failed", name) } info, err := fd.Stat() if err != nil { - err = errors.Trace(err) - return + return nil, errors.Annotatef(err, "stat file %s failed", name) } logReporter := func(bytes int, reason error) { @@ -200,8 +198,8 @@ func (lf *logFile) finalize() error { return errors.Trace(lf.fdatasync()) } -func (lf *logFile) close() { - lf.fd.Close() +func (lf *logFile) close() error { + return lf.fd.Close() } // recover scan all the record get the state like maxTS which only saved when the file is finalized @@ -272,8 +270,7 @@ func readRecord(reader io.Reader) (record *Record, err error) { record = new(Record) err = record.readHeader(reader) if err != nil { - err = errors.Trace(err) - return + return nil, errors.Annotate(err, "read header failed") } if record.magic != recordMagic { @@ -286,8 +283,7 @@ func readRecord(reader io.Reader) (record *Record, err error) { record.payload = make([]byte, record.length) _, err = io.ReadFull(reader, record.payload) if err != nil { - err = errors.Trace(err) - return + return nil, errors.Annotate(err, "read failed") } } else { buf := new(bytes.Buffer) @@ -300,8 +296,7 @@ func readRecord(reader io.Reader) (record *Record, err error) { } if !record.isValid() { - err = errors.New("checksum mismatch") - return + return nil, errors.New("checksum mismatch") } return diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 911868266..ccef44326 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -525,6 +525,7 @@ func (a *Append) GCTS(ts int64) { } func (a *Append) doGCTS(ts int64) { + log.Info("start gc ts: ", ts) irange := &util.Range{ Start: encodeTSKey(0), Limit: encodeTSKey(ts + 1), @@ -550,6 +551,7 @@ func (a *Append) doGCTS(ts int64) { } a.vlog.gcTS(ts) + log.Info("finish gc ts: ", ts) } // MaxCommitTS implement Storage.MaxCommitTS @@ -820,23 +822,23 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error { vpData, err := a.metadata.Get(encodeTSKey(cbinlog.StartTs), nil) if err != nil { - errors.Trace(err) + return errors.Annotatef(err, "get value pointer failed ts(%d)", cbinlog.StartTs) } err = vp.UnmarshalBinary(vpData) if err != nil { - errors.Trace(err) + return errors.Trace(err) } pvalue, err := a.vlog.readValue(vp) if err != nil { - errors.Trace(err) + return errors.Annotatef(err, "read value failed, vp: %+v", vp) } pbinlog := new(pb.Binlog) err = pbinlog.Unmarshal(pvalue) if err != nil { - errors.Trace(err) + return errors.Trace(err) } cbinlog.StartTs = pbinlog.StartTs @@ -895,7 +897,7 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte value, err := a.vlog.readValue(vp) if err != nil { - log.Error(err) + log.Error(errors.ErrorStack(err)) iter.Release() errorCount.WithLabelValues("read_value").Add(1.0) return @@ -904,7 +906,7 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte binlog := new(pb.Binlog) err = binlog.Unmarshal(value) if err != nil { - log.Error(err) + log.Error(errors.ErrorStack(err)) iter.Release() return } @@ -919,6 +921,13 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte } else { err = a.feedPreWriteValue(binlog) if err != nil { + if errors.Cause(err) == leveldb.ErrNotFound { + // prevent the pump client write C-binlog to not the according P-binlog of pump instance. + // the old version client indeed write C-binlog to random instance once write failed. + log.Error("feedPreWriteValue not found: %+v", binlog) + continue + } + errorCount.WithLabelValues("feed_pre_write_value").Add(1.0) log.Error(err) iter.Release() diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 75c8f88fa..28979f335 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -254,16 +254,15 @@ func (vlog *valueLog) close() error { if vlog.writableOffset() >= finalizeFileSizeAtClose { err = curFile.finalize() if err != nil { - return errors.Trace(err) + return errors.Annotatef(err, "finalize file %s failed", curFile.path) } } for _, logFile := range vlog.filesMap { - err = logFile.fd.Close() + err = logFile.close() if err != nil { - return err + return errors.Annotatef(err, "close %s failed", logFile.path) } - } return nil @@ -272,16 +271,16 @@ func (vlog *valueLog) close() error { func (vlog *valueLog) readValue(vp valuePointer) ([]byte, error) { logFile, err := vlog.getFileRLocked(vp.Fid) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Annotatef(err, "get file failed: %d", vp.Fid) } + defer logFile.lock.RUnlock() + record, err := logFile.readRecord(vp.Offset) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Annotatef(err, "read failed at: %+v", vp) } - logFile.lock.RUnlock() - return record.payload, nil } @@ -305,7 +304,7 @@ func (vlog *valueLog) write(reqs []*request) error { if vlog.sync { err = curFile.fdatasync() if err != nil { - return errors.Trace(err) + return errors.Annotate(err, "fdatasync failed") } } @@ -320,13 +319,13 @@ func (vlog *valueLog) write(reqs []*request) error { if vlog.writableOffset() > vlog.opt.ValueLogFileSize { err := curFile.finalize() if err != nil { - return errors.Trace(err) + return errors.Annotate(err, "finalize file failed") } id := atomic.AddUint32(&vlog.maxFid, 1) curFile, err = vlog.createLogFile(id) if err != nil { - return errors.Trace(err) + return errors.Annotate(err, "create file failed") } } return nil @@ -346,7 +345,7 @@ func (vlog *valueLog) write(reqs []*request) error { if writeNow { if err := toDisk(); err != nil { - return err + return errors.Annotate(err, "write to disk failed") } } } @@ -415,7 +414,11 @@ func (vlog *valueLog) gcTS(gcTS int64) { for _, logFile := range toDeleteFiles { logFile.lock.Lock() - err := os.Remove(logFile.path) + err := logFile.close() + if err != nil { + log.Errorf("close file %s err: %+v", logFile.path, err) + } + err = os.Remove(logFile.path) if err != nil { log.Errorf("remove file %s err: %v", logFile.path, err) } From 601f02e55b6cb28b02293e86eb4ff8185b08dba4 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 13:06:01 +0800 Subject: [PATCH 02/11] refine comment --- pump/storage/storage.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index ccef44326..9064f17aa 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -922,9 +922,11 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte err = a.feedPreWriteValue(binlog) if err != nil { if errors.Cause(err) == leveldb.ErrNotFound { - // prevent the pump client write C-binlog to not the according P-binlog of pump instance. - // the old version client indeed write C-binlog to random instance once write failed. - log.Error("feedPreWriteValue not found: %+v", binlog) + // In pump-client, a C-binlog should always be sent to the same pump instance as the matching P-binlog. + // But in some older versions of pump-client, writing of C-binlog would fallback to some other instances when the correct one is unavailable. + // When this error occurs, we may assume that the matching P-binlog is on a different pump instance. + // And it would query TiKV for the matching C-binlog. So it should be OK to ignore the error here. + log.Error("Matching P-binlog not found", binlog) continue } From 469e7339e4a646bdbab2bd4600367af69ac38603 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 15:30:57 +0800 Subject: [PATCH 03/11] Update pump/storage/log.go Co-Authored-By: Ian --- pump/storage/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/log.go b/pump/storage/log.go index fd999da7f..b8cd57be7 100644 --- a/pump/storage/log.go +++ b/pump/storage/log.go @@ -283,7 +283,7 @@ func readRecord(reader io.Reader) (record *Record, err error) { record.payload = make([]byte, record.length) _, err = io.ReadFull(reader, record.payload) if err != nil { - return nil, errors.Annotate(err, "read failed") + return nil, errors.Annotate(err, "read payload failed") } } else { buf := new(bytes.Buffer) From 93dbef6f5771f98abd0fb98b5376b558479578d0 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 15:31:17 +0800 Subject: [PATCH 04/11] Update pump/storage/storage.go Co-Authored-By: Ian --- pump/storage/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 9064f17aa..89335da7b 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -832,7 +832,7 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error { pvalue, err := a.vlog.readValue(vp) if err != nil { - return errors.Annotatef(err, "read value failed, vp: %+v", vp) + return errors.Annotatef(err, "read P-Binlog value failed, vp: %+v", vp) } pbinlog := new(pb.Binlog) From aed8bbaa66d901337f7deb2cb633a75ee309f1b7 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 15:31:46 +0800 Subject: [PATCH 05/11] Update pump/storage/storage.go Co-Authored-By: Ian --- pump/storage/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 89335da7b..36cb0ba74 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -822,7 +822,7 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error { vpData, err := a.metadata.Get(encodeTSKey(cbinlog.StartTs), nil) if err != nil { - return errors.Annotatef(err, "get value pointer failed ts(%d)", cbinlog.StartTs) + return errors.Annotatef(err, "get pointer of P-Binlog(ts: %d) failed", cbinlog.StartTs) } err = vp.UnmarshalBinary(vpData) From 3d6d8b4108b9bcba8e865eb7e4d309dede8a79bd Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 15:41:17 +0800 Subject: [PATCH 06/11] log.go: no annotatef --- pump/storage/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/log.go b/pump/storage/log.go index b8cd57be7..a50e1f673 100644 --- a/pump/storage/log.go +++ b/pump/storage/log.go @@ -108,7 +108,7 @@ func (r *Record) isValid() bool { func newLogFile(fid uint32, name string) (lf *logFile, err error) { fd, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { - return nil, errors.Annotatef(err, "open file %s failed", name) + return nil, errors.Trace(err) } info, err := fd.Stat() From 96ceadc5bde87a5308c0e30d2861214d6074baab Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 15:53:36 +0800 Subject: [PATCH 07/11] Update pump/storage/vlog.go Co-Authored-By: Ian --- pump/storage/vlog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 6ed05aeb9..13827f5bd 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -271,7 +271,7 @@ func (vlog *valueLog) close() error { func (vlog *valueLog) readValue(vp valuePointer) ([]byte, error) { logFile, err := vlog.getFileRLocked(vp.Fid) if err != nil { - return nil, errors.Annotatef(err, "get file failed: %d", vp.Fid) + return nil, errors.Annotatef(err, "get file(id: %d) failed", vp.Fid) } defer logFile.lock.RUnlock() From 1e44e4bef18c05a0e78896874778da8b8eec8cde Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 15:54:25 +0800 Subject: [PATCH 08/11] Apply suggestions from code review Co-Authored-By: Ian --- pump/storage/vlog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 13827f5bd..ca95759ab 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -278,7 +278,7 @@ func (vlog *valueLog) readValue(vp valuePointer) ([]byte, error) { record, err := logFile.readRecord(vp.Offset) if err != nil { - return nil, errors.Annotatef(err, "read failed at: %+v", vp) + return nil, errors.Annotatef(err, "read record at %+v failed", vp) } return record.payload, nil From 5e22f47215b5e72d6ee4a3bbb71242c52d38e2e1 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 15:56:17 +0800 Subject: [PATCH 09/11] refine log --- pump/storage/vlog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index ca95759ab..c2062dd79 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -304,7 +304,7 @@ func (vlog *valueLog) write(reqs []*request) error { if vlog.sync { err = curFile.fdatasync() if err != nil { - return errors.Annotate(err, "fdatasync failed") + return errors.Annotate(err, "fdatasync file %s failed", curFile.path) } } From cf209c896b789c275011263cfc06f47eda32f821 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 15:58:24 +0800 Subject: [PATCH 10/11] fix format --- pump/storage/vlog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index c2062dd79..35c1da7d6 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -304,7 +304,7 @@ func (vlog *valueLog) write(reqs []*request) error { if vlog.sync { err = curFile.fdatasync() if err != nil { - return errors.Annotate(err, "fdatasync file %s failed", curFile.path) + return errors.Annotatef(err, "fdatasync file %s failed", curFile.path) } } From 433bc14178af81cda91be782ab68d5f9fd1459a9 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 20 May 2019 16:23:31 +0800 Subject: [PATCH 11/11] vlog.go: refine log --- pump/storage/vlog.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 35c1da7d6..0bd544f58 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -319,13 +319,13 @@ func (vlog *valueLog) write(reqs []*request) error { if vlog.writableOffset() > vlog.opt.ValueLogFileSize { err := curFile.finalize() if err != nil { - return errors.Annotate(err, "finalize file failed") + return errors.Annotatef(err, "finalize file %s failed", curFile.path) } id := atomic.AddUint32(&vlog.maxFid, 1) curFile, err = vlog.createLogFile(id) if err != nil { - return errors.Annotate(err, "create file failed") + return errors.Annotatef(err, "create file id %d failed", id) } } return nil