Skip to content

Commit

Permalink
rbdeal repair: Wire up carlog repair reader
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 17, 2023
1 parent 09fbb8f commit 1bfc6c5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 10 deletions.
27 changes: 22 additions & 5 deletions rbdeal/deal_repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package rbdeal
import (
"context"
"fmt"
"github.com/ipfs/go-cid"
ribs2 "github.com/lotus-web3/ribs"
"github.com/lotus-web3/ribs/ributil"
"github.com/multiformats/go-multihash"
"golang.org/x/xerrors"
"io"
"os"
Expand Down Expand Up @@ -204,7 +206,7 @@ func (r *ribs) fetchGroupHttp(ctx context.Context, workerID int, group ribs2.Gro

// make the request!!

rc := ributil.RobustGet(reqUrl.String(), gm.CarSize, func() *ributil.RateCounter {
robustReqReader := ributil.RobustGet(reqUrl.String(), gm.CarSize, func() *ributil.RateCounter {
return r.repairFetchCounters.Get(group)
})

Expand Down Expand Up @@ -242,25 +244,40 @@ func (r *ribs) fetchGroupHttp(ctx context.Context, workerID int, group ribs2.Gro
}
}()

repairReader, err := ributil.NewCarRepairReader(robustReqReader, gm.RootCid, func(b cid.Cid) ([]byte, error) {
var outData []byte
return outData, r.retrProv.FetchBlocks(ctx, group, []multihash.Multihash{b.Hash()}, func(cidx int, data []byte) {
outData = make([]byte, len(data))
copy(outData, data)
})
})
if err != nil {
_ = f.Close()
_ = os.Remove(groupFile)
_ = robustReqReader.Close()
log.Errorw("failed to create repair reader", "err", err, "group", group, "provider", candidate.Provider, "url", reqUrl.String())
continue
}

cc := new(ributil.DataCidWriter)
commdReader := io.TeeReader(rc, cc)
commdReader := io.TeeReader(repairReader, cc)

_, err = io.Copy(f, commdReader)
done()
if err != nil {
_ = f.Close()
_ = os.Remove(groupFile)
_ = rc.Close()
_ = robustReqReader.Close()
log.Errorw("failed to copy response body", "err", err, "group", group, "provider", candidate.Provider, "url", reqUrl.String())
continue
}

if err := f.Close(); err != nil {
_ = rc.Close()
_ = robustReqReader.Close()
return xerrors.Errorf("close group file: %w", err)
}

if err := rc.Close(); err != nil {
if err := robustReqReader.Close(); err != nil {
return xerrors.Errorf("close response body: %w", err)
}

Expand Down
35 changes: 30 additions & 5 deletions ributil/repair_car.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ type RepairCarLog struct {
readBuf []byte

repairBlock func(cid.Cid) ([]byte, error)

totalRead, bad int64
}

var minReadDecision int64 = 100 // blocks
var maxCorruptPct int64 = 10 // percent

func NewCarRepairReader(source io.Reader, root cid.Cid, repair func(cid.Cid) ([]byte, error)) (*RepairCarLog, error) {
br := bufio.NewReaderSize(source, int(carutil.MaxAllowedSectionSize))

Expand Down Expand Up @@ -67,6 +72,19 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) {
return
}

r.totalRead++
wasCorrupt := false
defer func() {
if wasCorrupt {
r.bad++
}

if r.totalRead > minReadDecision && r.bad*100/r.totalRead > maxCorruptPct {
log.Errorw("too many corrupt blocks, aborting", "total", r.totalRead, "bad", r.bad)
err = xerrors.Errorf("too many corrupt blocks, aborting")
}
}()

if len(r.expectCidStack) == 0 {
return 0, io.EOF
}
Expand Down Expand Up @@ -111,7 +129,8 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) {
vEntLen, n := binary.Uvarint(cidLenEnt[:varintLen])
if n <= 0 || vEntLen > uint64(carutil.MaxAllowedSectionSize) {
// varint len is probably corrupted
log.Errorw("bad varint or header is bigger than util.MaxAllowedSectionSize, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen)
log.Errorw("repairRead bad varint or header is bigger than util.MaxAllowedSectionSize, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen)
wasCorrupt = true

goodData, err := r.repairBlock(firstCidInLayer)
if err != nil {
Expand All @@ -132,7 +151,8 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) {
if err != nil {
if err == io.EOF {
// length was probably corrupted
log.Errorw("read entry eof, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen)
log.Errorw("repairRead read entry eof, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen)
wasCorrupt = true

goodData, err := r.repairBlock(firstCidInLayer)
if err != nil {
Expand All @@ -153,7 +173,9 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) {
}

if len(ent) < len(expCidBytes) {
log.Errorw("entry shorter than cid, will attempt repair", "expected", firstCidInLayer, "actual", ent)
log.Errorw("repairRead entry shorter than cid, will attempt repair", "expected", firstCidInLayer, "actual", ent)
wasCorrupt = true

goodData, err := r.repairBlock(firstCidInLayer)
if err != nil {
return 0, xerrors.Errorf("repair block %s: %w", firstCidInLayer, err)
Expand All @@ -168,7 +190,8 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) {
}

if !bytes.Equal(ent[:len(expCidBytes)], expCidBytes) {
log.Errorw("cid mismatch in car stream, will attempt repair", "expected", firstCidInLayer, "actual", ent[:len(expCidBytes)])
log.Errorw("repairRead cid mismatch in car stream, will attempt repair", "expected", firstCidInLayer, "actual", ent[:len(expCidBytes)])
wasCorrupt = true

// repair here is really just copying the right cid into the entry
copy(ent[:len(expCidBytes)], expCidBytes)
Expand All @@ -179,7 +202,8 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) {
return 0, xerrors.Errorf("hash data: %w", err)
}
if !hash.Equals(firstCidInLayer) {
log.Errorw("data hash mismatch in car stream, will attempt repair", "expected", firstCidInLayer, "actual", hash)
log.Errorw("repairRead data hash mismatch in car stream, will attempt repair", "expected", firstCidInLayer, "actual", hash)
wasCorrupt = true

// block data repair
goodData, err := r.repairBlock(firstCidInLayer)
Expand Down Expand Up @@ -246,6 +270,7 @@ func match32Bytes(pattern []byte, buf []byte) (off int, err error) {
for i := 0; i < 4; i++ {
overlap := b32overlap(pattern, buf[i:])
if overlap == 32*8 {
// happy path
return i, nil
}
if overlap > maxOverlap {
Expand Down

0 comments on commit 1bfc6c5

Please sign in to comment.