Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pump/*: Refine some log in storage/ #607

Merged
merged 13 commits into from
May 20, 2019
2 changes: 1 addition & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (s *Server) writeBinlog(ctx context.Context, in *binlog.WriteBinlogReq, isF

errHandle:
lossBinlogCacheCounter.Add(1)
log.Errorf("write binlog error %v", err)
log.Errorf("write binlog error %+v", err)
ret.Errmsg = err.Error()
return ret, err
}
Expand Down
23 changes: 9 additions & 14 deletions pump/storage/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ func encodeRecord(writer io.Writer, payload []byte) (int, error) {

n, err := writer.Write(header)
if err != nil {
return n, errors.Trace(err)
return n, errors.Annotate(err, "write header failed")
}

n, err = writer.Write(payload)
if err != nil {
return int(headerLength) + n, errors.Trace(err)
return int(headerLength) + n, errors.Annotate(err, "write payload failed")
}

return int(headerLength) + len(payload), nil
Expand Down Expand Up @@ -108,14 +108,12 @@ func (r *Record) isValid() bool {
func newLogFile(fid uint32, name string) (lf *logFile, err error) {
fd, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotatef(err, "open file %s failed", name)
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}

info, err := fd.Stat()
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotatef(err, "stat file %s failed", name)
}

logReporter := func(bytes int, reason error) {
Expand Down Expand Up @@ -200,8 +198,8 @@ func (lf *logFile) finalize() error {
return errors.Trace(lf.fdatasync())
}

func (lf *logFile) close() {
lf.fd.Close()
func (lf *logFile) close() error {
return lf.fd.Close()
}

// recover scan all the record get the state like maxTS which only saved when the file is finalized
Expand Down Expand Up @@ -272,8 +270,7 @@ func readRecord(reader io.Reader) (record *Record, err error) {
record = new(Record)
err = record.readHeader(reader)
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotate(err, "read header failed")
}

if record.magic != recordMagic {
Expand All @@ -286,8 +283,7 @@ func readRecord(reader io.Reader) (record *Record, err error) {
record.payload = make([]byte, record.length)
_, err = io.ReadFull(reader, record.payload)
if err != nil {
err = errors.Trace(err)
return
return nil, errors.Annotate(err, "read failed")
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
buf := new(bytes.Buffer)
Expand All @@ -300,8 +296,7 @@ func readRecord(reader io.Reader) (record *Record, err error) {
}

if !record.isValid() {
err = errors.New("checksum mismatch")
return
return nil, errors.New("checksum mismatch")
}

return
Expand Down
23 changes: 17 additions & 6 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ func (a *Append) GCTS(ts int64) {
}

func (a *Append) doGCTS(ts int64) {
log.Info("start gc ts: ", ts)
irange := &util.Range{
Start: encodeTSKey(0),
Limit: encodeTSKey(ts + 1),
Expand All @@ -550,6 +551,7 @@ func (a *Append) doGCTS(ts int64) {
}

a.vlog.gcTS(ts)
log.Info("finish gc ts: ", ts)
}

// MaxCommitTS implement Storage.MaxCommitTS
Expand Down Expand Up @@ -820,23 +822,23 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error {

vpData, err := a.metadata.Get(encodeTSKey(cbinlog.StartTs), nil)
if err != nil {
errors.Trace(err)
return errors.Annotatef(err, "get value pointer failed ts(%d)", cbinlog.StartTs)
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}

err = vp.UnmarshalBinary(vpData)
if err != nil {
errors.Trace(err)
return errors.Trace(err)
}

pvalue, err := a.vlog.readValue(vp)
if err != nil {
errors.Trace(err)
return errors.Annotatef(err, "read value failed, vp: %+v", vp)
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}

pbinlog := new(pb.Binlog)
err = pbinlog.Unmarshal(pvalue)
if err != nil {
errors.Trace(err)
return errors.Trace(err)
}

cbinlog.StartTs = pbinlog.StartTs
Expand Down Expand Up @@ -895,7 +897,7 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte

value, err := a.vlog.readValue(vp)
if err != nil {
log.Error(err)
log.Error(errors.ErrorStack(err))
iter.Release()
errorCount.WithLabelValues("read_value").Add(1.0)
return
Expand All @@ -904,7 +906,7 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
binlog := new(pb.Binlog)
err = binlog.Unmarshal(value)
if err != nil {
log.Error(err)
log.Error(errors.ErrorStack(err))
iter.Release()
return
}
Expand All @@ -919,6 +921,15 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
} else {
err = a.feedPreWriteValue(binlog)
if err != nil {
if errors.Cause(err) == leveldb.ErrNotFound {
// In pump-client, a C-binlog should always be sent to the same pump instance as the matching P-binlog.
// But in some older versions of pump-client, writing of C-binlog would fallback to some other instances when the correct one is unavailable.
// When this error occurs, we may assume that the matching P-binlog is on a different pump instance.
// And it would query TiKV for the matching C-binlog. So it should be OK to ignore the error here.
log.Error("Matching P-binlog not found", binlog)
continue
}

errorCount.WithLabelValues("feed_pre_write_value").Add(1.0)
log.Error(err)
iter.Release()
Expand Down
29 changes: 16 additions & 13 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,15 @@ func (vlog *valueLog) close() error {
if vlog.writableOffset() >= finalizeFileSizeAtClose {
err = curFile.finalize()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "finalize file %s failed", curFile.path)
}
}

for _, logFile := range vlog.filesMap {
err = logFile.fd.Close()
err = logFile.close()
if err != nil {
return err
return errors.Annotatef(err, "close %s failed", logFile.path)
}

}

return nil
Expand All @@ -272,16 +271,16 @@ func (vlog *valueLog) close() error {
func (vlog *valueLog) readValue(vp valuePointer) ([]byte, error) {
logFile, err := vlog.getFileRLocked(vp.Fid)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotatef(err, "get file failed: %d", vp.Fid)
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}

defer logFile.lock.RUnlock()

record, err := logFile.readRecord(vp.Offset)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotatef(err, "read failed at: %+v", vp)
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}

logFile.lock.RUnlock()

return record.payload, nil
}

Expand All @@ -305,7 +304,7 @@ func (vlog *valueLog) write(reqs []*request) error {
if vlog.sync {
err = curFile.fdatasync()
if err != nil {
return errors.Trace(err)
return errors.Annotate(err, "fdatasync failed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add file name?

}
}

Expand All @@ -320,13 +319,13 @@ func (vlog *valueLog) write(reqs []*request) error {
if vlog.writableOffset() > vlog.opt.ValueLogFileSize {
err := curFile.finalize()
if err != nil {
return errors.Trace(err)
return errors.Annotate(err, "finalize file failed")
}

id := atomic.AddUint32(&vlog.maxFid, 1)
curFile, err = vlog.createLogFile(id)
if err != nil {
return errors.Trace(err)
return errors.Annotate(err, "create file failed")
}
}
return nil
Expand All @@ -346,7 +345,7 @@ func (vlog *valueLog) write(reqs []*request) error {

if writeNow {
if err := toDisk(); err != nil {
return err
return errors.Annotate(err, "write to disk failed")
}
}
}
Expand Down Expand Up @@ -415,7 +414,11 @@ func (vlog *valueLog) gcTS(gcTS int64) {

for _, logFile := range toDeleteFiles {
logFile.lock.Lock()
err := os.Remove(logFile.path)
err := logFile.close()
if err != nil {
log.Errorf("close file %s err: %+v", logFile.path, err)
}
err = os.Remove(logFile.path)
if err != nil {
log.Errorf("remove file %s err: %v", logFile.path, err)
}
Expand Down