Skip to content

Commit

Permalink
carlog repair fixez, fuzz tests
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 16, 2023
1 parent 000d727 commit fcd1c8a
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 46 deletions.
133 changes: 128 additions & 5 deletions ributil/repair_car.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
carutil "github.com/ipld/go-car/util"
"golang.org/x/xerrors"
"io"
"math/bits"
)

type RepairCarLog struct {
Expand All @@ -23,13 +24,17 @@ type RepairCarLog struct {
}

func NewCarRepairReader(source io.Reader, root cid.Cid, repair func(cid.Cid) ([]byte, error)) (*RepairCarLog, error) {
br := bufio.NewReader(source)
br := bufio.NewReaderSize(source, int(carutil.MaxAllowedSectionSize))

h, err := car.ReadHeader(br)
if err != nil {
return nil, xerrors.Errorf("read car header: %w", err)
}

if h.Version != 1 {
return nil, xerrors.Errorf("unsupported car version: %d", h.Version)
}

if len(h.Roots) != 1 {
return nil, xerrors.Errorf("expected 1 root, got %d", len(h.Roots))
}
Expand Down Expand Up @@ -82,13 +87,84 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) {
expCidBytes := firstCidInLayer.Bytes()

// length header read
ent, err := carutil.LdRead(r.source)

maxExpectedCIDLen := 4 // car max extry size is 32MB, so 4 bytes is enough for varint length

cidLenEnt, err := r.source.Peek(firstCidInLayer.ByteLen() + maxExpectedCIDLen)
if err != nil {
return 0, xerrors.Errorf("read entry: %w", err)
return 0, xerrors.Errorf("peek entry: %w", err)
}
cidOff, err := match32Bytes(firstCidInLayer.Bytes()[:32], cidLenEnt[1:]) // at least 1 byte for varint length
if err != nil {
return 0, xerrors.Errorf("match cid pos: %w", err)
}

varintLen := cidOff + 1

if _, err := r.source.Discard(varintLen); err != nil {
return 0, xerrors.Errorf("discard varint len bytes: %w", err)
}

// vEntLen contains the length claimed by the varint. It will, most of the time
// be ok, but sometimes it may contain bitflips, so don't always trust it
var ent []byte
vEntLen, n := binary.Uvarint(cidLenEnt[:varintLen])
if n <= 0 || vEntLen > uint64(carutil.MaxAllowedSectionSize) {
// varint len is probably corrupted
log.Errorw("bad varint or header is bigger than util.MaxAllowedSectionSize, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen)

goodData, err := r.repairBlock(firstCidInLayer)
if err != nil {
return 0, xerrors.Errorf("repair block %s: %w", firstCidInLayer, err)
}

// make ent the correct length
ent = make([]byte, len(firstCidInLayer.Bytes())+len(goodData))

// now reconstruct correct entry for next steps
copy(ent[:firstCidInLayer.ByteLen()], firstCidInLayer.Bytes())
copy(ent[firstCidInLayer.ByteLen():], goodData)
}

if len(ent) == 0 {
// wasn't repaired above, so just read from source stream
ent, err = r.source.Peek(int(vEntLen))
if err != nil {
if err == io.EOF {
// length was probably corrupted
log.Errorw("read entry eof, varint len is probably corrupted, will try repair", "expected", firstCidInLayer, "actual", vEntLen)

goodData, err := r.repairBlock(firstCidInLayer)
if err != nil {
return 0, xerrors.Errorf("repair block %s: %w", firstCidInLayer, err)
}

// make ent the correct length
ent = make([]byte, len(firstCidInLayer.Bytes())+len(goodData))

// now reconstruct correct entry for next steps
copy(ent[:firstCidInLayer.ByteLen()], firstCidInLayer.Bytes())
copy(ent[firstCidInLayer.ByteLen():], goodData)
} else {
return 0, xerrors.Errorf("peek entry: %w", err)
}

}
}

if len(ent) < len(expCidBytes) {
return 0, xerrors.Errorf("read expected cid: short read")
log.Errorw("entry shorter than cid, will attempt repair", "expected", firstCidInLayer, "actual", ent)
goodData, err := r.repairBlock(firstCidInLayer)
if err != nil {
return 0, xerrors.Errorf("repair block %s: %w", firstCidInLayer, err)
}

// make ent the correct length
ent = make([]byte, len(firstCidInLayer.Bytes())+len(goodData))

// now reconstruct correct entry for next steps
copy(ent[:firstCidInLayer.ByteLen()], firstCidInLayer.Bytes())
copy(ent[firstCidInLayer.ByteLen():], goodData)
}

if !bytes.Equal(ent[:len(expCidBytes)], expCidBytes) {
Expand All @@ -112,7 +188,10 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) {
}

if len(goodData) != len(ent[len(expCidBytes):]) {
return 0, xerrors.Errorf("repair block %s: data length mismatch %d != %d", firstCidInLayer, len(goodData), len(ent[len(expCidBytes):]))
// resize ent to the correct length
ent = make([]byte, len(expCidBytes)+len(goodData))
// copy in cid bytes again..
copy(ent[:len(expCidBytes)], expCidBytes)
}

copy(ent[len(expCidBytes):], goodData)
Expand All @@ -138,10 +217,54 @@ func (r *RepairCarLog) Read(p []byte) (n int, err error) {
r.expectCidStack = append(r.expectCidStack, links)
}

// advance the source by the correct amount
if _, err := r.source.Discard(len(ent)); err != nil {
return 0, xerrors.Errorf("discard entry: %w", err)
}

// now perform the real read
n = copy(p, r.readBuf)
r.readBuf = r.readBuf[n:]
return
}

// finds pattern in buf
func match32Bytes(pattern []byte, buf []byte) (off int, err error) {
// data might be corrupted, so we can't use bytes.Index
// we count matching bits at offsets 0,1,2,3, and select highest overlap

if len(pattern) != 32 {
return 0, xerrors.Errorf("pattern must be 32 bytes")
}
if len(buf) < 4+32 {
return 0, xerrors.Errorf("buf must be at least 36 bytes")
}

var maxOverlap int
var maxOverlapOff int

for i := 0; i < 4; i++ {
overlap := b32overlap(pattern, buf[i:])
if overlap == 32*8 {
return i, nil
}
if overlap > maxOverlap {
maxOverlap = overlap
maxOverlapOff = i
}
}

return maxOverlapOff, nil
}

func b32overlap(patt, b []byte) (overlap int) {
var matchingBits int

for i, pb := range patt {
matchingBits += 8 - bits.OnesCount8(pb^b[i])
}

return
}

var _ io.Reader = (*RepairCarLog)(nil)
161 changes: 120 additions & 41 deletions ributil/repair_car_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package ributil

import (
"bytes"
"context"
"github.com/filecoin-project/lotus/blockstore"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"io"
Expand Down Expand Up @@ -78,7 +82,18 @@ var testCar = []byte{
0x72, 0x63, 0x62, 0x64, 0x2e, 0x70, 0x79, 0x0a,
}

func TestRepairCarLog(t *testing.T) {
var testCarBs = func() blockstore.Blockstore {
bstore := blockstore.NewMemory()

_, err := car.LoadCar(context.Background(), bstore, bytes.NewReader(testCar))
if err != nil {
panic(err)
}

return bstore
}()

func TestRepairCarLogHappyPath(t *testing.T) {
rc, err := cid.Parse("bafyreig67dpkzct5dlv6bopobeti72tttybwtyg63xh25qoan3t7i7aj2a")
if err != nil {
t.Fatal(err)
Expand All @@ -94,80 +109,144 @@ func TestRepairCarLog(t *testing.T) {
require.Equal(t, testCar, d)
}

func TestRepairCarBitFlipData(t *testing.T) {
func TestRepairCar(t *testing.T) {
rc, err := cid.Parse("bafyreig67dpkzct5dlv6bopobeti72tttybwtyg63xh25qoan3t7i7aj2a")
if err != nil {
t.Fatal(err)
}

tcCopy := make([]byte, len(testCar))
copy(tcCopy, testCar)
tcCopy[len(tcCopy)-1] ^= 0x01

rr, err := NewCarRepairReader(bytes.NewReader(tcCopy), rc, func(c cid.Cid) ([]byte, error) {
if c.String() != "bafkreifgh655vcevj45q7uqpftwzy7ubfqkpwvh77enfrnaxac5t4gro5q" {
return nil, xerrors.Errorf("unexpected cid: %s", c)
}

return []byte("crp.ydrcbd.py\n"), nil
})
if err != nil {
t.Fatal(err)
testCases := []struct {
Name string
CorruptOffset int
CorruptCallback func(byte, int) byte
}{
{
Name: "BitFlipData",
CorruptOffset: len(testCar) - 1,
CorruptCallback: func(b byte, i int) byte {
return b ^ 0x01
},
},
{
Name: "BitFlipCID",
CorruptOffset: len(testCar) - len("crp.ydrcbd.py\n") - 10,
CorruptCallback: func(b byte, i int) byte {
return b ^ 0x01
},
},
{
Name: "BitFlipVarintToEOF",
CorruptOffset: len(testCar) - len("crp.ydrcbd.py\n") - rc.ByteLen() - 1,
CorruptCallback: func(b byte, i int) byte {
return b ^ 0x01
},
},
{
Name: "BitFlipVarintDecodeFail",
CorruptOffset: len(testCar) - len("crp.ydrcbd.py\n") - rc.ByteLen() - 1,
CorruptCallback: func(b byte, i int) byte {
return b ^ 0x80
},
},
{
Name: "BitFlipDataShort",
CorruptOffset: len(testCar) - len("crp.ydrcbd.py\n") - rc.ByteLen() - 1,
CorruptCallback: func(b byte, i int) byte {
return b - 1
},
},
{
Name: "BitFlipDataShortMidCID",
CorruptOffset: len(testCar) - len("crp.ydrcbd.py\n") - rc.ByteLen() - 1,
CorruptCallback: func(b byte, i int) byte {
return 12
},
},
}

d, err := io.ReadAll(rr)
require.NoError(t, err)
require.Equal(t, testCar, d)
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
testRepairCarWithCorruption(t, false, []int{tc.CorruptOffset}, tc.CorruptCallback)
})
}
}

func TestRepairCarBitFlipCID(t *testing.T) {
rc, err := cid.Parse("bafyreig67dpkzct5dlv6bopobeti72tttybwtyg63xh25qoan3t7i7aj2a")
if err != nil {
t.Fatal(err)
func fuzzRepairFunc(t *testing.T, corruptOffset []int) {
// Define the CorruptCallback function using the byte 'b'
// This function can be randomized or you can create several and choose one based on 'b'
corruptCallback := func(b byte, ci int) byte {
// Example corruption function, replace with actual logic
return b ^ byte(corruptOffset[ci]&0xff)
}

lastBlkOff := len("crp.ydrcbd.py\n") + 10
coffs := lo.Map(corruptOffset, func(i int, v int) int {
co := v & 0x7fffffff
co >>= 8
co = co % len(testCar)
if co == 0 {
co = 1 // don't corrupt first byte, too we won't bother with handling that
}
return co
})

tcCopy := make([]byte, len(testCar))
copy(tcCopy, testCar)
tcCopy[len(tcCopy)-lastBlkOff] ^= 0x01
testRepairCarWithCorruption(t, true, coffs, corruptCallback)
}

rr, err := NewCarRepairReader(bytes.NewReader(tcCopy), rc, func(c cid.Cid) ([]byte, error) {
if c.String() != "bafkreifgh655vcevj45q7uqpftwzy7ubfqkpwvh77enfrnaxac5t4gro5q" {
return nil, xerrors.Errorf("unexpected cid: %s", c)
}
func FuzzRepairCar(f *testing.F) {
f.Add(int(0), byte(0))
f.Fuzz(func(t *testing.T, c0 int, b0 byte) {
fuzzRepairFunc(t, []int{(c0 << 8) | int(b0)})
})
}

return []byte("crp.ydrcbd.py\n"), nil
func FuzzRepairCar2c(f *testing.F) {
f.Add(int(0), byte(0), int(0), byte(0))
f.Fuzz(func(t *testing.T, c0 int, b0 byte, c1 int, b1 byte) {
fuzzRepairFunc(t, []int{(c0 << 8) | int(b0), (c1 << 8) | int(b1)})
})
if err != nil {
t.Fatal(err)
}
}

d, err := io.ReadAll(rr)
require.NoError(t, err)
require.Equal(t, testCar, d)
func FuzzRepairCar3c(f *testing.F) {
f.Add(int(0), byte(0), int(0), byte(0), int(0), byte(0))
f.Fuzz(func(t *testing.T, c0 int, b0 byte, c1 int, b1 byte, c2 int, b2 byte) {
fuzzRepairFunc(t, []int{(c0 << 8) | int(b0), (c1 << 8) | int(b1), (c2 << 8) | int(b2)})
})
}

func TestRepairCarBitFlipLen(t *testing.T) {
func testRepairCarWithCorruption(t *testing.T, fuzz bool, corruptOffset []int, corruptCallback func(byte, int) byte) {
rc, err := cid.Parse("bafyreig67dpkzct5dlv6bopobeti72tttybwtyg63xh25qoan3t7i7aj2a")
if err != nil {
t.Fatal(err)
}

lastBlkOff := len("crp.ydrcbd.py\n") + rc.ByteLen() + 1

// Create a copy of testCar and apply the corruption
tcCopy := make([]byte, len(testCar))
copy(tcCopy, testCar)
tcCopy[len(tcCopy)-lastBlkOff] ^= 0x01

for ci, off := range corruptOffset {
tcCopy[off] = corruptCallback(tcCopy[off], ci)
}

rr, err := NewCarRepairReader(bytes.NewReader(tcCopy), rc, func(c cid.Cid) ([]byte, error) {
if fuzz {
// fuzz can break any block
b, err := testCarBs.Get(context.Background(), c)
if err != nil {
return nil, xerrors.Errorf("get test blk: %w", err)
}
return b.RawData(), nil
}

if c.String() != "bafkreifgh655vcevj45q7uqpftwzy7ubfqkpwvh77enfrnaxac5t4gro5q" {
return nil, xerrors.Errorf("unexpected cid: %s", c)
}

return []byte("crp.ydrcbd.py\n"), nil
})
if err != nil {
if fuzz {
return // Ignore errors here when fuzzing
}
t.Fatal(err)
}

Expand Down

0 comments on commit fcd1c8a

Please sign in to comment.