From 1bfc6c5086841b3614a009135db29814b0052d23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 17 Nov 2023 11:13:29 +0100 Subject: [PATCH] rbdeal repair: Wire up carlog repair reader --- rbdeal/deal_repair.go | 27 ++++++++++++++++++++++----- ributil/repair_car.go | 35 ++++++++++++++++++++++++++++++----- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/rbdeal/deal_repair.go b/rbdeal/deal_repair.go index 91f1435..86aec7a 100644 --- a/rbdeal/deal_repair.go +++ b/rbdeal/deal_repair.go @@ -3,8 +3,10 @@ package rbdeal import ( "context" "fmt" + "github.com/ipfs/go-cid" ribs2 "github.com/lotus-web3/ribs" "github.com/lotus-web3/ribs/ributil" + "github.com/multiformats/go-multihash" "golang.org/x/xerrors" "io" "os" @@ -204,7 +206,7 @@ func (r *ribs) fetchGroupHttp(ctx context.Context, workerID int, group ribs2.Gro // make the request!! - rc := ributil.RobustGet(reqUrl.String(), gm.CarSize, func() *ributil.RateCounter { + robustReqReader := ributil.RobustGet(reqUrl.String(), gm.CarSize, func() *ributil.RateCounter { return r.repairFetchCounters.Get(group) }) @@ -242,25 +244,40 @@ func (r *ribs) fetchGroupHttp(ctx context.Context, workerID int, group ribs2.Gro } }() + repairReader, err := ributil.NewCarRepairReader(robustReqReader, gm.RootCid, func(b cid.Cid) ([]byte, error) { + var outData []byte + return outData, r.retrProv.FetchBlocks(ctx, group, []multihash.Multihash{b.Hash()}, func(cidx int, data []byte) { + outData = make([]byte, len(data)) + copy(outData, data) + }) + }) + if err != nil { + _ = f.Close() + _ = os.Remove(groupFile) + _ = robustReqReader.Close() + log.Errorw("failed to create repair reader", "err", err, "group", group, "provider", candidate.Provider, "url", reqUrl.String()) + continue + } + cc := new(ributil.DataCidWriter) - commdReader := io.TeeReader(rc, cc) + commdReader := io.TeeReader(repairReader, cc) _, err = io.Copy(f, commdReader) done() if err != nil { _ = f.Close() _ = os.Remove(groupFile) - _ = rc.Close() + _ = robustReqReader.Close() log.Errorw("failed to copy response body", "err", err, "group", group, "provider", candidate.Provider, "url", reqUrl.String()) continue } if err := f.Close(); err != nil { - _ = rc.Close() + _ = robustReqReader.Close() return xerrors.Errorf("close group file: %w", err) } - if err := rc.Close(); err != nil { + if err := robustReqReader.Close(); err != nil { return xerrors.Errorf("close response body: %w", err) } diff --git a/ributil/repair_car.go b/ributil/repair_car.go index 7a12b92..10fc5f8 100644 --- a/ributil/repair_car.go +++ b/ributil/repair_car.go @@ -21,8 +21,13 @@ type RepairCarLog struct { readBuf []byte repairBlock func(cid.Cid) ([]byte, error) + + totalRead, bad int64 } +var minReadDecision int64 = 100 // blocks +var maxCorruptPct int64 = 10 // percent + func NewCarRepairReader(source io.Reader, root cid.Cid, repair func(cid.Cid) ([]byte, error)) (*RepairCarLog, error) { br := bufio.NewReaderSize(source, int(carutil.MaxAllowedSectionSize)) @@ -67,6 +72,19 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) { return } + r.totalRead++ + wasCorrupt := false + defer func() { + if wasCorrupt { + r.bad++ + } + + if r.totalRead > minReadDecision && r.bad*100/r.totalRead > maxCorruptPct { + log.Errorw("too many corrupt blocks, aborting", "total", r.totalRead, "bad", r.bad) + err = xerrors.Errorf("too many corrupt blocks, aborting") + } + }() + if len(r.expectCidStack) == 0 { return 0, io.EOF } @@ -111,7 +129,8 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) { vEntLen, n := binary.Uvarint(cidLenEnt[:varintLen]) if n <= 0 || vEntLen > uint64(carutil.MaxAllowedSectionSize) { // varint len is probably corrupted - log.Errorw("bad varint or header is bigger than util.MaxAllowedSectionSize, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen) + log.Errorw("repairRead bad varint or header is bigger than util.MaxAllowedSectionSize, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen) + wasCorrupt = true goodData, err := r.repairBlock(firstCidInLayer) if err != nil { @@ -132,7 +151,8 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) { if err != nil { if err == io.EOF { // length was probably corrupted - log.Errorw("read entry eof, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen) + log.Errorw("repairRead read entry eof, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen) + wasCorrupt = true goodData, err := r.repairBlock(firstCidInLayer) if err != nil { @@ -153,7 +173,9 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) { } if len(ent) < len(expCidBytes) { - log.Errorw("entry shorter than cid, will attempt repair", "expected", firstCidInLayer, "actual", ent) + log.Errorw("repairRead entry shorter than cid, will attempt repair", "expected", firstCidInLayer, "actual", ent) + wasCorrupt = true + goodData, err := r.repairBlock(firstCidInLayer) if err != nil { return 0, xerrors.Errorf("repair block %s: %w", firstCidInLayer, err) @@ -168,7 +190,8 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) { } if !bytes.Equal(ent[:len(expCidBytes)], expCidBytes) { - log.Errorw("cid mismatch in car stream, will attempt repair", "expected", firstCidInLayer, "actual", ent[:len(expCidBytes)]) + log.Errorw("repairRead cid mismatch in car stream, will attempt repair", "expected", firstCidInLayer, "actual", ent[:len(expCidBytes)]) + wasCorrupt = true // repair here is really just copying the right cid into the entry copy(ent[:len(expCidBytes)], expCidBytes) @@ -179,7 +202,8 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) { return 0, xerrors.Errorf("hash data: %w", err) } if !hash.Equals(firstCidInLayer) { - log.Errorw("data hash mismatch in car stream, will attempt repair", "expected", firstCidInLayer, "actual", hash) + log.Errorw("repairRead data hash mismatch in car stream, will attempt repair", "expected", firstCidInLayer, "actual", hash) + wasCorrupt = true // block data repair goodData, err := r.repairBlock(firstCidInLayer) @@ -246,6 +270,7 @@ func match32Bytes(pattern []byte, buf []byte) (off int, err error) { for i := 0; i < 4; i++ { overlap := b32overlap(pattern, buf[i:]) if overlap == 32*8 { + // happy path return i, nil } if overlap > maxOverlap {