diff --git a/go.mod b/go.mod index fa864af8..764da3b9 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c - github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1 + github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896 github.com/ipfs/go-cid v0.0.4 github.com/ipfs/go-datastore v0.1.1 github.com/ipfs/go-graphsync v0.0.4 diff --git a/go.sum b/go.sum index 3951c7de..b02c65be 100644 --- a/go.sum +++ b/go.sum @@ -140,8 +140,8 @@ github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJ github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0= github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I= -github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1 h1:Nq8xEW+2KZq7IkRlkOh0rTEUI8FgunhMoLj5EMkJzbQ= -github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk= +github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896 h1:l8gnU1VBhftugMKzfh+n7nuDhOw3X1iqfrA33GVBMMY= +github.com/ipfs/go-car v0.0.3-0.20200124090545-1a340009d896/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= diff --git a/piecestore/piecestore.go b/piecestore/piecestore.go index fc046f92..d2fed68e 100644 --- a/piecestore/piecestore.go +++ b/piecestore/piecestore.go @@ -1,87 +1,125 @@ package piecestore import ( + "bytes" "fmt" "github.com/filecoin-project/go-statestore" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" ) -var DSPrefix = "/storagemarket/pieces" +// DSPiecePrefix is the name space for storing piece infos +var DSPiecePrefix = "/storagemarket/pieces" +// DSCIDPrefix is the name space for storing CID infos +var DSCIDPrefix = "/storagemarket/cid-infos" + +// NewPieceStore returns a new piecestore based on the given datastore func NewPieceStore(ds datastore.Batching) PieceStore { return &pieceStore{ - store: statestore.New(namespace.Wrap(ds, datastore.NewKey(DSPrefix))), + pieces: statestore.New(namespace.Wrap(ds, datastore.NewKey(DSPiecePrefix))), + cidInfos: statestore.New(namespace.Wrap(ds, datastore.NewKey(DSCIDPrefix))), } } type pieceStore struct { - store *statestore.StateStore + pieces *statestore.StateStore + cidInfos *statestore.StateStore } func (ps *pieceStore) AddDealForPiece(pieceCID []byte, dealInfo DealInfo) error { - // Do we need to de-dupe or anything here? return ps.mutatePieceInfo(pieceCID, func(pi *PieceInfo) error { + for _, di := range pi.Deals { + if di == dealInfo { + return nil + } + } pi.Deals = append(pi.Deals, dealInfo) return nil }) } -func (ps *pieceStore) AddBlockInfosToPiece(pieceCID []byte, blockInfos []BlockInfo) error { - // Do we need to de-dupe or anything here? - return ps.mutatePieceInfo(pieceCID, func(pi *PieceInfo) error { - pi.Blocks = blockInfos - return nil - }) +func (ps *pieceStore) AddPieceBlockLocations(pieceCID []byte, blockLocations map[cid.Cid]BlockLocation) error { + for c, blockLocation := range blockLocations { + err := ps.mutateCIDInfo(c, func(ci *CIDInfo) error { + for _, pbl := range ci.PieceBlockLocations { + if bytes.Equal(pbl.PieceCID, pieceCID) && pbl.BlockLocation == blockLocation { + return nil + } + } + ci.PieceBlockLocations = append(ci.PieceBlockLocations, PieceBlockLocation{blockLocation, pieceCID}) + return nil + }) + if err != nil { + return err + } + } + return nil } -func (ps *pieceStore) HasBlockInfo(pieceCID []byte) (bool, error) { - pi, err := ps.GetPieceInfo(pieceCID) - if err != nil { - return false, err +func (ps *pieceStore) GetPieceInfo(pieceCID []byte) (PieceInfo, error) { + var out PieceInfo + if err := ps.pieces.Get(newKey(pieceCID)).Get(&out); err != nil { + return PieceInfo{}, err } - - return len(pi.Blocks) > 0, err + return out, nil } -func (ps *pieceStore) HasDealInfo(pieceCID []byte) (bool, error) { - pi, err := ps.GetPieceInfo(pieceCID) - if err != nil { - return false, err +func (ps *pieceStore) GetCIDInfo(payloadCID cid.Cid) (CIDInfo, error) { + var out CIDInfo + if err := ps.cidInfos.Get(payloadCID).Get(&out); err != nil { + return CIDInfo{}, err } - - return len(pi.Deals) > 0, nil + return out, nil } -func (ps *pieceStore) GetPieceInfo(pieceCID []byte) (PieceInfo, error) { - var out PieceInfo - if err := ps.store.Get(newKey(pieceCID)).Get(&out); err != nil { - return PieceInfo{}, err +func (ps *pieceStore) ensurePieceInfo(pieceCID []byte) error { + has, err := ps.pieces.Has(newKey(pieceCID)) + + if err != nil { + return err } - return out, nil + if has { + return nil + } + + pieceInfo := PieceInfo{PieceCID: pieceCID} + return ps.pieces.Begin(newKey(pieceCID), &pieceInfo) } -func (ps *pieceStore) ensurePieceInfo(pieceCID []byte) (PieceInfo, error) { - pieceInfo, err := ps.GetPieceInfo(pieceCID) +func (ps *pieceStore) ensureCIDInfo(c cid.Cid) error { + has, err := ps.cidInfos.Has(c) - if err == nil { - return pieceInfo, nil + if err != nil { + return err } - pieceInfo = PieceInfo{PieceCID: pieceCID} - err = ps.store.Begin(newKey(pieceCID), &pieceInfo) + if has { + return nil + } - return pieceInfo, err + cidInfo := CIDInfo{CID: c} + return ps.cidInfos.Begin(c, &cidInfo) } func (ps *pieceStore) mutatePieceInfo(pieceCID []byte, mutator interface{}) error { - _, err := ps.ensurePieceInfo(pieceCID) + err := ps.ensurePieceInfo(pieceCID) + if err != nil { + return err + } + + return ps.pieces.Get(newKey(pieceCID)).Mutate(mutator) +} + +func (ps *pieceStore) mutateCIDInfo(c cid.Cid, mutator interface{}) error { + err := ps.ensureCIDInfo(c) if err != nil { return err } - return ps.store.Get(newKey(pieceCID)).Mutate(mutator) + return ps.cidInfos.Get(c).Mutate(mutator) } func newKey(pieceCID []byte) fmt.Stringer { diff --git a/piecestore/piecestore_test.go b/piecestore/piecestore_test.go index 84718c2c..e758b224 100644 --- a/piecestore/piecestore_test.go +++ b/piecestore/piecestore_test.go @@ -9,51 +9,164 @@ import ( "github.com/stretchr/testify/assert" "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-fil-markets/shared_testutil" ) func TestStorePieceInfo(t *testing.T) { - ps := piecestore.NewPieceStore(datastore.NewMapDatastore()) + pieceCid := []byte{1, 2, 3, 4} + initializePieceStore := func(t *testing.T) piecestore.PieceStore { + ps := piecestore.NewPieceStore(datastore.NewMapDatastore()) + _, err := ps.GetPieceInfo(pieceCid) + assert.Error(t, err) + return ps + } + + // Add a deal info + t.Run("can add deals", func(t *testing.T) { + ps := initializePieceStore(t) + dealInfo := piecestore.DealInfo{ + DealID: rand.Uint64(), + SectorID: rand.Uint64(), + Offset: rand.Uint64(), + Length: rand.Uint64(), + } + err := ps.AddDealForPiece(pieceCid, dealInfo) + assert.NoError(t, err) + + pi, err := ps.GetPieceInfo(pieceCid) + assert.NoError(t, err) + assert.Len(t, pi.Deals, 1) + assert.Equal(t, pi.Deals[0], dealInfo) + }) + + t.Run("adding same deal twice does not dup", func(t *testing.T) { + ps := initializePieceStore(t) + dealInfo := piecestore.DealInfo{ + DealID: rand.Uint64(), + SectorID: rand.Uint64(), + Offset: rand.Uint64(), + Length: rand.Uint64(), + } + err := ps.AddDealForPiece(pieceCid, dealInfo) + assert.NoError(t, err) + + pi, err := ps.GetPieceInfo(pieceCid) + assert.NoError(t, err) + assert.Len(t, pi.Deals, 1) + assert.Equal(t, pi.Deals[0], dealInfo) + + err = ps.AddDealForPiece(pieceCid, dealInfo) + assert.NoError(t, err) + + pi, err = ps.GetPieceInfo(pieceCid) + assert.NoError(t, err) + assert.Len(t, pi.Deals, 1) + assert.Equal(t, pi.Deals[0], dealInfo) + }) +} + +func TestStoreCIDInfo(t *testing.T) { + + pieceCid1 := []byte{1, 2, 3, 4} + pieceCid2 := []byte{5, 6, 7, 8} + testCIDs := shared_testutil.GenerateCids(3) + blockLocations := make([]piecestore.BlockLocation, 0, 3) + for i := 0; i < 3; i++ { + blockLocations = append(blockLocations, piecestore.BlockLocation{ + RelOffset: rand.Uint64(), + BlockSize: rand.Uint64(), + }) + } - _, err := ps.GetPieceInfo(pieceCid) - assert.Error(t, err) - - // Add a PieceInfo and some state - testCid, err := cid.Decode("bafzbeigai3eoy2ccc7ybwjfz5r3rdxqrinwi4rwytly24tdbh6yk7zslrm") - assert.NoError(t, err) - blockInfos := []piecestore.BlockInfo{{testCid, 42, 43}} - - err = ps.AddBlockInfosToPiece(pieceCid, blockInfos) - assert.NoError(t, err) - has, err := ps.HasBlockInfo(pieceCid) - assert.True(t, has) - assert.NoError(t, err) - has, err = ps.HasDealInfo(pieceCid) - assert.False(t, has) - assert.NoError(t, err) - - pi, err := ps.GetPieceInfo(pieceCid) - assert.NoError(t, err) - assert.Len(t, pi.Blocks, 1) - assert.Equal(t, pi.Blocks[0], piecestore.BlockInfo{testCid, 42, 43}) - - dealInfo := piecestore.DealInfo{ - DealID: rand.Uint64(), - SectorID: rand.Uint64(), - Offset: rand.Uint64(), - Length: rand.Uint64(), + initializePieceStore := func(t *testing.T) piecestore.PieceStore { + ps := piecestore.NewPieceStore(datastore.NewMapDatastore()) + _, err := ps.GetCIDInfo(testCIDs[0]) + assert.Error(t, err) + return ps } - err = ps.AddDealForPiece(pieceCid, dealInfo) - assert.NoError(t, err) - - has, err = ps.HasBlockInfo(pieceCid) - assert.True(t, has) - assert.NoError(t, err) - has, err = ps.HasDealInfo(pieceCid) - assert.True(t, has) - assert.NoError(t, err) - pi, err = ps.GetPieceInfo(pieceCid) - assert.NoError(t, err) - assert.Len(t, pi.Deals, 1) - assert.Equal(t, pi.Deals[0], dealInfo) + + t.Run("can add piece block locations", func(t *testing.T) { + ps := initializePieceStore(t) + err := ps.AddPieceBlockLocations(pieceCid1, map[cid.Cid]piecestore.BlockLocation{ + testCIDs[0]: blockLocations[0], + testCIDs[1]: blockLocations[1], + testCIDs[2]: blockLocations[2], + }) + assert.NoError(t, err) + + ci, err := ps.GetCIDInfo(testCIDs[0]) + assert.NoError(t, err) + assert.Len(t, ci.PieceBlockLocations, 1) + assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[0], pieceCid1}) + + ci, err = ps.GetCIDInfo(testCIDs[1]) + assert.NoError(t, err) + assert.Len(t, ci.PieceBlockLocations, 1) + assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[1], pieceCid1}) + + ci, err = ps.GetCIDInfo(testCIDs[2]) + assert.NoError(t, err) + assert.Len(t, ci.PieceBlockLocations, 1) + assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[2], pieceCid1}) + }) + + t.Run("overlapping adds", func(t *testing.T) { + ps := initializePieceStore(t) + err := ps.AddPieceBlockLocations(pieceCid1, map[cid.Cid]piecestore.BlockLocation{ + testCIDs[0]: blockLocations[0], + testCIDs[1]: blockLocations[2], + }) + assert.NoError(t, err) + err = ps.AddPieceBlockLocations(pieceCid2, map[cid.Cid]piecestore.BlockLocation{ + testCIDs[1]: blockLocations[1], + testCIDs[2]: blockLocations[2], + }) + assert.NoError(t, err) + + ci, err := ps.GetCIDInfo(testCIDs[0]) + assert.NoError(t, err) + assert.Len(t, ci.PieceBlockLocations, 1) + assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[0], pieceCid1}) + + ci, err = ps.GetCIDInfo(testCIDs[1]) + assert.NoError(t, err) + assert.Len(t, ci.PieceBlockLocations, 2) + assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[2], pieceCid1}) + assert.Equal(t, ci.PieceBlockLocations[1], piecestore.PieceBlockLocation{blockLocations[1], pieceCid2}) + + ci, err = ps.GetCIDInfo(testCIDs[2]) + assert.NoError(t, err) + assert.Len(t, ci.PieceBlockLocations, 1) + assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[2], pieceCid2}) + }) + + t.Run("duplicate adds", func(t *testing.T) { + ps := initializePieceStore(t) + err := ps.AddPieceBlockLocations(pieceCid1, map[cid.Cid]piecestore.BlockLocation{ + testCIDs[0]: blockLocations[0], + testCIDs[1]: blockLocations[1], + }) + assert.NoError(t, err) + err = ps.AddPieceBlockLocations(pieceCid1, map[cid.Cid]piecestore.BlockLocation{ + testCIDs[1]: blockLocations[1], + testCIDs[2]: blockLocations[2], + }) + assert.NoError(t, err) + + ci, err := ps.GetCIDInfo(testCIDs[0]) + assert.NoError(t, err) + assert.Len(t, ci.PieceBlockLocations, 1) + assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[0], pieceCid1}) + + ci, err = ps.GetCIDInfo(testCIDs[1]) + assert.NoError(t, err) + assert.Len(t, ci.PieceBlockLocations, 1) + assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[1], pieceCid1}) + + ci, err = ps.GetCIDInfo(testCIDs[2]) + assert.NoError(t, err) + assert.Len(t, ci.PieceBlockLocations, 1) + assert.Equal(t, ci.PieceBlockLocations[0], piecestore.PieceBlockLocation{blockLocations[2], pieceCid1}) + }) } diff --git a/piecestore/types.go b/piecestore/types.go index e454339f..0858c4ed 100644 --- a/piecestore/types.go +++ b/piecestore/types.go @@ -1,8 +1,10 @@ package piecestore -import "github.com/ipfs/go-cid" +import ( + "github.com/ipfs/go-cid" +) -//go:generate cbor-gen-for PieceInfo DealInfo BlockInfo +//go:generate cbor-gen-for PieceInfo DealInfo BlockLocation PieceBlockLocation CIDInfo // DealInfo is information about a single deal for a give piece type DealInfo struct { @@ -12,20 +14,34 @@ type DealInfo struct { Length uint64 } -// BlockInfo is information about where a given block is within a piece -type BlockInfo struct { - CID cid.Cid +// BlockLocation is information about where a given block is relative to the overall piece +type BlockLocation struct { RelOffset uint64 BlockSize uint64 } +// PieceBlockLocation is block information along with the pieceCID of the piece the block +// is inside of +type PieceBlockLocation struct { + BlockLocation + PieceCID []byte +} + +// CIDInfo is information about where a given CID will live inside a piece +type CIDInfo struct { + CID cid.Cid + PieceBlockLocations []PieceBlockLocation +} + +// CIDInfoUndefined is cid info with no information +var CIDInfoUndefined = CIDInfo{} + // PieceInfo is metadata about a piece a provider may be storing based // on its PieceCID -- so that, given a pieceCID during retrieval, the miner // can determine how to unseal it if needed type PieceInfo struct { PieceCID []byte Deals []DealInfo - Blocks []BlockInfo } // PieceInfoUndefined is piece info with no information @@ -34,8 +50,7 @@ var PieceInfoUndefined = PieceInfo{} // PieceStore is a saved database of piece info that can be modified and queried type PieceStore interface { AddDealForPiece(pieceCID []byte, dealInfo DealInfo) error - AddBlockInfosToPiece(pieceCID []byte, blockInfos []BlockInfo) error - HasBlockInfo(pieceCID []byte) (bool, error) - HasDealInfo(pieceCID []byte) (bool, error) + AddPieceBlockLocations(pieceCID []byte, blockLocations map[cid.Cid]BlockLocation) error GetPieceInfo(pieceCID []byte) (PieceInfo, error) + GetCIDInfo(payloadCID cid.Cid) (CIDInfo, error) } diff --git a/piecestore/types_cbor_gen.go b/piecestore/types_cbor_gen.go index 0e0e5784..85008f74 100644 --- a/piecestore/types_cbor_gen.go +++ b/piecestore/types_cbor_gen.go @@ -17,7 +17,7 @@ func (t *PieceInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{131}); err != nil { + if _, err := w.Write([]byte{130}); err != nil { return err } @@ -46,20 +46,6 @@ func (t *PieceInfo) MarshalCBOR(w io.Writer) error { return err } } - - // t.Blocks ([]piecestore.BlockInfo) (slice) - if len(t.Blocks) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.Blocks was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Blocks)))); err != nil { - return err - } - for _, v := range t.Blocks { - if err := v.MarshalCBOR(w); err != nil { - return err - } - } return nil } @@ -74,7 +60,7 @@ func (t *PieceInfo) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 3 { + if extra != 2 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -122,33 +108,6 @@ func (t *PieceInfo) UnmarshalCBOR(r io.Reader) error { t.Deals[i] = v } - // t.Blocks ([]piecestore.BlockInfo) (slice) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - - if extra > cbg.MaxLength { - return fmt.Errorf("t.Blocks: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - if extra > 0 { - t.Blocks = make([]BlockInfo, extra) - } - for i := 0; i < int(extra); i++ { - - var v BlockInfo - if err := v.UnmarshalCBOR(br); err != nil { - return err - } - - t.Blocks[i] = v - } - return nil } @@ -241,12 +200,143 @@ func (t *DealInfo) UnmarshalCBOR(r io.Reader) error { return nil } -func (t *BlockInfo) MarshalCBOR(w io.Writer) error { +func (t *BlockLocation) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.RelOffset (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.RelOffset))); err != nil { + return err + } + + // t.BlockSize (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.BlockSize))); err != nil { + return err + } + return nil +} + +func (t *BlockLocation) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.RelOffset (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.RelOffset = uint64(extra) + // t.BlockSize (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.BlockSize = uint64(extra) + return nil +} + +func (t *PieceBlockLocation) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{131}); err != nil { + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.BlockLocation (piecestore.BlockLocation) (struct) + if err := t.BlockLocation.MarshalCBOR(w); err != nil { + return err + } + + // t.PieceCID ([]uint8) (slice) + if len(t.PieceCID) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.PieceCID was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.PieceCID)))); err != nil { + return err + } + if _, err := w.Write(t.PieceCID); err != nil { + return err + } + return nil +} + +func (t *PieceBlockLocation) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.BlockLocation (piecestore.BlockLocation) (struct) + + { + + if err := t.BlockLocation.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.PieceCID ([]uint8) (slice) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.PieceCID: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + t.PieceCID = make([]byte, extra) + if _, err := io.ReadFull(br, t.PieceCID); err != nil { + return err + } + return nil +} + +func (t *CIDInfo) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { return err } @@ -256,19 +346,23 @@ func (t *BlockInfo) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.CID: %w", err) } - // t.RelOffset (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.RelOffset))); err != nil { - return err + // t.PieceBlockLocations ([]piecestore.PieceBlockLocation) (slice) + if len(t.PieceBlockLocations) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.PieceBlockLocations was too long") } - // t.BlockSize (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.BlockSize))); err != nil { + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.PieceBlockLocations)))); err != nil { return err } + for _, v := range t.PieceBlockLocations { + if err := v.MarshalCBOR(w); err != nil { + return err + } + } return nil } -func (t *BlockInfo) UnmarshalCBOR(r io.Reader) error { +func (t *CIDInfo) UnmarshalCBOR(r io.Reader) error { br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) @@ -279,7 +373,7 @@ func (t *BlockInfo) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 3 { + if extra != 2 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -295,25 +389,32 @@ func (t *BlockInfo) UnmarshalCBOR(r io.Reader) error { t.CID = c } - // t.RelOffset (uint64) (uint64) + // t.PieceBlockLocations ([]piecestore.PieceBlockLocation) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") + + if extra > cbg.MaxLength { + return fmt.Errorf("t.PieceBlockLocations: array too large (%d)", extra) } - t.RelOffset = uint64(extra) - // t.BlockSize (uint64) (uint64) - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") + if extra > 0 { + t.PieceBlockLocations = make([]PieceBlockLocation, extra) } - t.BlockSize = uint64(extra) + for i := 0; i < int(extra); i++ { + + var v PieceBlockLocation + if err := v.UnmarshalCBOR(br); err != nil { + return err + } + + t.PieceBlockLocations[i] = v + } + return nil } diff --git a/retrievalmarket/discovery/local.go b/retrievalmarket/discovery/local.go index f74902be..521ea929 100644 --- a/retrievalmarket/discovery/local.go +++ b/retrievalmarket/discovery/local.go @@ -36,8 +36,7 @@ func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error { } func (l *Local) GetPeers(payloadCID cid.Cid) ([]retrievalmarket.RetrievalPeer, error) { - key := payloadCID.String() - entry, err := l.ds.Get(datastore.NewKey(key)) + entry, err := l.ds.Get(dshelp.CidToDsKey(payloadCID)) if err == datastore.ErrNotFound { return []retrievalmarket.RetrievalPeer{}, nil } diff --git a/retrievalmarket/impl/blockunsealing/blockunsealing.go b/retrievalmarket/impl/blockunsealing/blockunsealing.go index 4530b8d1..10026b85 100644 --- a/retrievalmarket/impl/blockunsealing/blockunsealing.go +++ b/retrievalmarket/impl/blockunsealing/blockunsealing.go @@ -5,6 +5,7 @@ import ( "context" "io" + "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -22,7 +23,7 @@ type LoaderWithUnsealing interface { type loaderWithUnsealing struct { ctx context.Context bs blockstore.Blockstore - pieceInfo piecestore.PieceInfo + pieceStore piecestore.PieceStore carIO pieceio.CarIO unsealer UnsealingFunc alreadyUnsealed bool @@ -33,8 +34,8 @@ type UnsealingFunc func(ctx context.Context, sectorId uint64, offset uint64, len // NewLoaderWithUnsealing creates a loader that will attempt to read blocks from the blockstore but unseal the piece // as needed using the passed unsealing function -func NewLoaderWithUnsealing(ctx context.Context, bs blockstore.Blockstore, pieceInfo piecestore.PieceInfo, carIO pieceio.CarIO, unsealer UnsealingFunc) LoaderWithUnsealing { - return &loaderWithUnsealing{ctx, bs, pieceInfo, carIO, unsealer, false} +func NewLoaderWithUnsealing(ctx context.Context, bs blockstore.Blockstore, pieceStore piecestore.PieceStore, carIO pieceio.CarIO, unsealer UnsealingFunc) LoaderWithUnsealing { + return &loaderWithUnsealing{ctx, bs, pieceStore, carIO, unsealer, false} } func (lu *loaderWithUnsealing) Load(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) { @@ -50,8 +51,8 @@ func (lu *loaderWithUnsealing) Load(lnk ipld.Link, lnkCtx ipld.LinkContext) (io. } // attempt unseal if block is not in blockstore - if !has && !lu.alreadyUnsealed { - err = lu.attemptUnseal() + if !has { + err = lu.attemptUnseal(c) if err != nil { return nil, err } @@ -65,20 +66,14 @@ func (lu *loaderWithUnsealing) Load(lnk ipld.Link, lnkCtx ipld.LinkContext) (io. return bytes.NewReader(blk.RawData()), nil } -func (lu *loaderWithUnsealing) attemptUnseal() error { +func (lu *loaderWithUnsealing) attemptUnseal(c cid.Cid) error { - lu.alreadyUnsealed = true - - // try to unseal data from piece - var reader io.ReadCloser - var err error - for _, deal := range lu.pieceInfo.Deals { - reader, err = lu.unsealer(lu.ctx, deal.SectorID, deal.Offset, deal.Length) - if err == nil { - break - } + cidInfo, err := lu.pieceStore.GetCIDInfo(c) + if err != nil { + return xerrors.Errorf("error looking up information on CID: %w", err) } + reader, err := lu.firstSuccessfulUnseal(cidInfo) // no successful unseal if err != nil { return xerrors.Errorf("Unable to unseal piece: %w", err) @@ -92,3 +87,22 @@ func (lu *loaderWithUnsealing) attemptUnseal() error { return nil } + +func (lu *loaderWithUnsealing) firstSuccessfulUnseal(cidInfo piecestore.CIDInfo) (io.ReadCloser, error) { + // try to unseal data from all pieces + lastErr := xerrors.New("no sectors found to unseal from") + for _, pieceBlockLocation := range cidInfo.PieceBlockLocations { + pieceInfo, err := lu.pieceStore.GetPieceInfo(pieceBlockLocation.PieceCID) + if err != nil { + continue + } + for _, deal := range pieceInfo.Deals { + reader, err := lu.unsealer(lu.ctx, deal.SectorID, deal.Offset, deal.Length) + if err == nil { + return reader, nil + } + lastErr = err + } + } + return nil, lastErr +} diff --git a/retrievalmarket/impl/blockunsealing/blockunsealing_test.go b/retrievalmarket/impl/blockunsealing/blockunsealing_test.go index 86815459..9ed2452d 100644 --- a/retrievalmarket/impl/blockunsealing/blockunsealing_test.go +++ b/retrievalmarket/impl/blockunsealing/blockunsealing_test.go @@ -54,12 +54,37 @@ func TestNewLoaderWithUnsealing(t *testing.T) { Offset: rand.Uint64(), Length: rand.Uint64(), } + pieceCID := []byte("applesauce") piece := piecestore.PieceInfo{ + PieceCID: pieceCID, Deals: []piecestore.DealInfo{ deal1, deal2, }, } + deal3 := piecestore.DealInfo{ + DealID: rand.Uint64(), + SectorID: rand.Uint64(), + Offset: rand.Uint64(), + Length: rand.Uint64(), + } + pieceCID2 := []byte("cheesewhiz") + piece2 := piecestore.PieceInfo{ + PieceCID: pieceCID2, + Deals: []piecestore.DealInfo{ + deal3, + }, + } + cidInfo := piecestore.CIDInfo{ + PieceBlockLocations: []piecestore.PieceBlockLocation{ + { + PieceCID: pieceCID, + }, + { + PieceCID: pieceCID2, + }, + }, + } checkSuccessLoad := func(t *testing.T, loaderWithUnsealing blockunsealing.LoaderWithUnsealing, lnk ipld.Link) { read, err := loaderWithUnsealing.Load(lnk, ipld.LinkContext{}) @@ -74,7 +99,8 @@ func TestNewLoaderWithUnsealing(t *testing.T) { t.Run("when intermediate blockstore has block", func(t *testing.T) { bs := setupBlockStore(t) unsealer := testnodes.NewTestRetrievalProviderNode() - loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, piece, cio, unsealer.UnsealSector) + pieceStore := tut.NewTestPieceStore() + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, pieceStore, cio, unsealer.UnsealSector) checkSuccessLoad(t, loaderWithUnsealing, testdata.RootNodeLnk) unsealer.VerifyExpectations(t) }) @@ -84,9 +110,13 @@ func TestNewLoaderWithUnsealing(t *testing.T) { bs := setupBlockStore(t) unsealer := testnodes.NewTestRetrievalProviderNode() unsealer.ExpectUnseal(deal1.SectorID, deal1.Offset, deal1.Length, carData) - loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, piece, cio, unsealer.UnsealSector) + pieceStore := tut.NewTestPieceStore() + pieceStore.ExpectCID(testdata.MiddleMapBlock.Cid(), cidInfo) + pieceStore.ExpectPiece(pieceCID, piece) + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, pieceStore, cio, unsealer.UnsealSector) checkSuccessLoad(t, loaderWithUnsealing, testdata.MiddleMapNodeLnk) unsealer.VerifyExpectations(t) + pieceStore.VerifyExpectations(t) }) t.Run("unsealing success on later ref", func(t *testing.T) { @@ -94,9 +124,43 @@ func TestNewLoaderWithUnsealing(t *testing.T) { unsealer := testnodes.NewTestRetrievalProviderNode() unsealer.ExpectFailedUnseal(deal1.SectorID, deal1.Offset, deal1.Length) unsealer.ExpectUnseal(deal2.SectorID, deal2.Offset, deal2.Length, carData) - loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, piece, cio, unsealer.UnsealSector) + pieceStore := tut.NewTestPieceStore() + pieceStore.ExpectCID(testdata.MiddleMapBlock.Cid(), cidInfo) + pieceStore.ExpectPiece(pieceCID, piece) + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, pieceStore, cio, unsealer.UnsealSector) + checkSuccessLoad(t, loaderWithUnsealing, testdata.MiddleMapNodeLnk) + unsealer.VerifyExpectations(t) + pieceStore.VerifyExpectations(t) + }) + + t.Run("unsealing success on second piece", func(t *testing.T) { + bs := setupBlockStore(t) + unsealer := testnodes.NewTestRetrievalProviderNode() + unsealer.ExpectFailedUnseal(deal1.SectorID, deal1.Offset, deal1.Length) + unsealer.ExpectFailedUnseal(deal2.SectorID, deal2.Offset, deal2.Length) + unsealer.ExpectUnseal(deal3.SectorID, deal3.Offset, deal3.Length, carData) + pieceStore := tut.NewTestPieceStore() + pieceStore.ExpectCID(testdata.MiddleMapBlock.Cid(), cidInfo) + pieceStore.ExpectPiece(pieceCID, piece) + pieceStore.ExpectPiece(pieceCID2, piece2) + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, pieceStore, cio, unsealer.UnsealSector) + checkSuccessLoad(t, loaderWithUnsealing, testdata.MiddleMapNodeLnk) + unsealer.VerifyExpectations(t) + pieceStore.VerifyExpectations(t) + }) + + t.Run("piece lookup success on second piece", func(t *testing.T) { + bs := setupBlockStore(t) + unsealer := testnodes.NewTestRetrievalProviderNode() + unsealer.ExpectUnseal(deal3.SectorID, deal3.Offset, deal3.Length, carData) + pieceStore := tut.NewTestPieceStore() + pieceStore.ExpectCID(testdata.MiddleMapBlock.Cid(), cidInfo) + pieceStore.ExpectMissingPiece(pieceCID) + pieceStore.ExpectPiece(pieceCID2, piece2) + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, pieceStore, cio, unsealer.UnsealSector) checkSuccessLoad(t, loaderWithUnsealing, testdata.MiddleMapNodeLnk) unsealer.VerifyExpectations(t) + pieceStore.VerifyExpectations(t) }) t.Run("fails all unsealing", func(t *testing.T) { @@ -104,40 +168,59 @@ func TestNewLoaderWithUnsealing(t *testing.T) { unsealer := testnodes.NewTestRetrievalProviderNode() unsealer.ExpectFailedUnseal(deal1.SectorID, deal1.Offset, deal1.Length) unsealer.ExpectFailedUnseal(deal2.SectorID, deal2.Offset, deal2.Length) - loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, piece, cio, unsealer.UnsealSector) + unsealer.ExpectFailedUnseal(deal3.SectorID, deal3.Offset, deal3.Length) + pieceStore := tut.NewTestPieceStore() + pieceStore.ExpectCID(testdata.MiddleMapBlock.Cid(), cidInfo) + pieceStore.ExpectPiece(pieceCID, piece) + pieceStore.ExpectPiece(pieceCID2, piece2) + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, pieceStore, cio, unsealer.UnsealSector) _, err := loaderWithUnsealing.Load(testdata.MiddleMapNodeLnk, ipld.LinkContext{}) require.Error(t, err) unsealer.VerifyExpectations(t) + pieceStore.VerifyExpectations(t) }) - t.Run("car io failure", func(t *testing.T) { + t.Run("fails looking up cid info", func(t *testing.T) { bs := setupBlockStore(t) unsealer := testnodes.NewTestRetrievalProviderNode() - randBytes := make([]byte, 100) - _, err := rand.Read(randBytes) - require.NoError(t, err) - unsealer.ExpectUnseal(deal1.SectorID, deal1.Offset, deal1.Length, randBytes) - loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, piece, cio, unsealer.UnsealSector) - _, err = loaderWithUnsealing.Load(testdata.MiddleMapNodeLnk, ipld.LinkContext{}) + pieceStore := tut.NewTestPieceStore() + pieceStore.ExpectMissingCID(testdata.MiddleMapBlock.Cid()) + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, pieceStore, cio, unsealer.UnsealSector) + _, err := loaderWithUnsealing.Load(testdata.MiddleMapNodeLnk, ipld.LinkContext{}) require.Error(t, err) unsealer.VerifyExpectations(t) + pieceStore.VerifyExpectations(t) }) - t.Run("when piece was already unsealed", func(t *testing.T) { + t.Run("fails looking up all pieces", func(t *testing.T) { bs := setupBlockStore(t) unsealer := testnodes.NewTestRetrievalProviderNode() - unsealer.ExpectUnseal(deal1.SectorID, deal1.Offset, deal1.Length, carData) - loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, piece, cio, unsealer.UnsealSector) - checkSuccessLoad(t, loaderWithUnsealing, testdata.MiddleMapNodeLnk) - // clear out block store - err := bs.DeleteBlock(testdata.MiddleMapBlock.Cid()) - require.NoError(t, err) + pieceStore := tut.NewTestPieceStore() + pieceStore.ExpectCID(testdata.MiddleMapBlock.Cid(), cidInfo) + pieceStore.ExpectMissingPiece(pieceCID) + pieceStore.ExpectMissingPiece(pieceCID2) + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, pieceStore, cio, unsealer.UnsealSector) + _, err := loaderWithUnsealing.Load(testdata.MiddleMapNodeLnk, ipld.LinkContext{}) + require.Error(t, err) + unsealer.VerifyExpectations(t) + pieceStore.VerifyExpectations(t) + }) - // attemp to load again, will not unseal, will fail + t.Run("car io failure", func(t *testing.T) { + bs := setupBlockStore(t) + unsealer := testnodes.NewTestRetrievalProviderNode() + randBytes := make([]byte, 100) + _, err := rand.Read(randBytes) + require.NoError(t, err) + unsealer.ExpectUnseal(deal1.SectorID, deal1.Offset, deal1.Length, randBytes) + pieceStore := tut.NewTestPieceStore() + pieceStore.ExpectCID(testdata.MiddleMapBlock.Cid(), cidInfo) + pieceStore.ExpectPiece(pieceCID, piece) + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, bs, pieceStore, cio, unsealer.UnsealSector) _, err = loaderWithUnsealing.Load(testdata.MiddleMapNodeLnk, ipld.LinkContext{}) require.Error(t, err) - unsealer.VerifyExpectations(t) }) + }) } diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index dd37cc36..3103a71b 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -50,7 +50,7 @@ func TestClientCanMakeQueryToProvider(t *testing.T) { t.Run("when there is some other error, returns error", func(t *testing.T) { unknownPiece := tut.GenerateCids(1)[0] expectedQR.Status = retrievalmarket.QueryResponseError - expectedQR.Message = "GetPieceSize failed" + expectedQR.Message = "GetCIDInfo failed" actualQR, err := client.Query(bgCtx, retrievalPeer, unknownPiece, retrievalmarket.QueryParams{}) assert.NoError(t, err) assert.Equal(t, expectedQR, actualQR) @@ -82,12 +82,22 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC providerNode := testnodes.NewTestRetrievalProviderNode() pieceStore := tut.NewTestPieceStore() expectedCIDs := tut.GenerateCids(3) - missingPiece := tut.GenerateCids(1)[0] + expectedPieces := [][]byte{[]byte("applesuace"), []byte("jam"), []byte("apricot")} + missingCID := tut.GenerateCids(1)[0] expectedQR := tut.MakeTestQueryResponse() - pieceStore.ExpectMissingPiece(missingPiece.Bytes()) - for i, piece := range expectedCIDs { - pieceStore.ExpectPiece(piece.Bytes(), piecestore.PieceInfo{ + pieceStore.ExpectMissingCID(missingCID) + for i, c := range expectedCIDs { + pieceStore.ExpectCID(c, piecestore.CIDInfo{ + PieceBlockLocations: []piecestore.PieceBlockLocation{ + { + PieceCID: expectedPieces[i], + }, + }, + }) + } + for i, piece := range expectedPieces { + pieceStore.ExpectPiece(piece, piecestore.PieceInfo{ Deals: []piecestore.DealInfo{ { Length: expectedQR.Size * uint64(i+1), @@ -107,7 +117,7 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC Address: paymentAddress, ID: testData.Host2.ID(), } - return client, expectedCIDs, missingPiece, expectedQR, retrievalPeer, provider + return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider } func TestClientCanMakeDealWithProvider(t *testing.T) { @@ -356,7 +366,18 @@ func setupClient( func setupProvider(t *testing.T, testData *tut.Libp2pTestData, payloadCID cid.Cid, pieceInfo piecestore.PieceInfo, expectedQR retrievalmarket.QueryResponse, providerPaymentAddr address.Address, providerNode retrievalmarket.RetrievalProviderNode) retrievalmarket.RetrievalProvider { nw2 := rmnet.NewFromLibp2pHost(testData.Host2) pieceStore := tut.NewTestPieceStore() - pieceStore.ExpectPiece(payloadCID.Bytes(), pieceInfo) + expectedPiece := make([]byte, 32) + _, err := rand.Read(expectedPiece) + require.NoError(t, err) + cidInfo := piecestore.CIDInfo{ + PieceBlockLocations: []piecestore.PieceBlockLocation{ + { + PieceCID: expectedPiece, + }, + }, + } + pieceStore.ExpectCID(payloadCID, cidInfo) + pieceStore.ExpectPiece(expectedPiece, pieceInfo) provider := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, nw2, pieceStore, testData.Bs2) provider.SetPaymentInterval(expectedQR.MaxPaymentInterval, expectedQR.MaxPaymentIntervalIncrease) provider.SetPricePerByte(expectedQR.MinPricePerByte) diff --git a/retrievalmarket/impl/provider.go b/retrievalmarket/impl/provider.go index a99b8ed0..15e4de63 100644 --- a/retrievalmarket/impl/provider.go +++ b/retrievalmarket/impl/provider.go @@ -6,6 +6,7 @@ import ( "reflect" "sync" + "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -17,6 +18,7 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockunsealing" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/providerstates" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" + "github.com/filecoin-project/go-fil-markets/shared/params" "github.com/filecoin-project/go-fil-markets/shared/tokenamount" ) @@ -35,15 +37,29 @@ type provider struct { var _ retrievalmarket.RetrievalProvider = &provider{} +// DefaultPricePerByte is the charge per byte retrieved if the miner does +// not specifically set it +var DefaultPricePerByte = tokenamount.FromInt(2) + +// DefaultPaymentInterval is the baseline interval, set to the unixfs chunk size +// if the miner does not explicitly set it otherwise +var DefaultPaymentInterval = uint64(params.UnixfsChunkSize) + +// DefaultPaymentIntervalIncrease is the amount interval increases on each payment, set to the unixfs chunk size +// if the miner does not explicitly set it otherwise +var DefaultPaymentIntervalIncrease = uint64(params.UnixfsChunkSize) + // NewProvider returns a new retrieval provider func NewProvider(paymentAddress address.Address, node retrievalmarket.RetrievalProviderNode, network rmnet.RetrievalMarketNetwork, pieceStore piecestore.PieceStore, bs blockstore.Blockstore) retrievalmarket.RetrievalProvider { return &provider{ - bs: bs, - node: node, - network: network, - paymentAddress: paymentAddress, - pieceStore: pieceStore, - pricePerByte: tokenamount.FromInt(2), // TODO: allow setting + bs: bs, + node: node, + network: network, + paymentAddress: paymentAddress, + pieceStore: pieceStore, + pricePerByte: DefaultPricePerByte, // TODO: allow setting + paymentInterval: DefaultPaymentInterval, + paymentIntervalIncrease: DefaultPaymentIntervalIncrease, } } @@ -129,7 +145,7 @@ func (p *provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) { MaxPaymentIntervalIncrease: p.paymentIntervalIncrease, } - pieceInfo, err := p.pieceStore.GetPieceInfo(query.PayloadCID.Bytes()) + pieceInfo, err := getPieceInfoFromCid(p.pieceStore, query.PayloadCID) if err == nil && len(pieceInfo.Deals) > 0 { answer.Status = retrievalmarket.QueryResponseAvailable @@ -166,7 +182,7 @@ func (p *provider) HandleDealStream(stream rmnet.RetrievalDealStream) { } p.notifySubscribers(retrievalmarket.ProviderEventOpen, dealState) - environment := providerDealEnvironment{p.pieceStore, piecestore.PieceInfoUndefined, p.node, nil, p.pricePerByte, p.paymentInterval, p.paymentIntervalIncrease, stream} + environment := providerDealEnvironment{p.pieceStore, p.node, nil, p.pricePerByte, p.paymentInterval, p.paymentIntervalIncrease, stream} for { var handler providerstates.ProviderHandlerFunc @@ -188,7 +204,7 @@ func (p *provider) HandleDealStream(stream rmnet.RetrievalDealStream) { break } if environment.br == nil { - loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, p.bs, environment.pieceInfo, cario.NewCarIO(), p.node.UnsealSector) + loaderWithUnsealing := blockunsealing.NewLoaderWithUnsealing(ctx, p.bs, environment.pieceStore, cario.NewCarIO(), p.node.UnsealSector) environment.br = blockio.NewSelectorBlockReader(cidlink.Link{Cid: dealState.PayloadCID}, loaderWithUnsealing.Load) } @@ -203,7 +219,6 @@ func (p *provider) HandleDealStream(stream rmnet.RetrievalDealStream) { type providerDealEnvironment struct { pieceStore piecestore.PieceStore - pieceInfo piecestore.PieceInfo node retrievalmarket.RetrievalProviderNode br blockio.BlockReader minPricePerByte tokenamount.TokenAmount @@ -240,14 +255,29 @@ func (pde *providerDealEnvironment) NextBlock(ctx context.Context) (retrievalmar return pde.br.ReadBlock(ctx) } -func (pde *providerDealEnvironment) GetPieceSize(pieceCID []byte) (uint64, error) { - var err error - pde.pieceInfo, err = pde.pieceStore.GetPieceInfo(pieceCID) +func (pde *providerDealEnvironment) GetPieceSize(c cid.Cid) (uint64, error) { + pieceInfo, err := getPieceInfoFromCid(pde.pieceStore, c) if err != nil { return 0, err } - if len(pde.pieceInfo.Deals) == 0 { + if len(pieceInfo.Deals) == 0 { return 0, errors.New("Not enough piece info") } - return pde.pieceInfo.Deals[0].Length, nil + return pieceInfo.Deals[0].Length, nil +} + +func getPieceInfoFromCid(pieceStore piecestore.PieceStore, c cid.Cid) (piecestore.PieceInfo, error) { + cidInfo, err := pieceStore.GetCIDInfo(c) + if err != nil { + return piecestore.PieceInfoUndefined, err + } + var lastErr error + for _, pieceBlockLocation := range cidInfo.PieceBlockLocations { + pieceInfo, err := pieceStore.GetPieceInfo(pieceBlockLocation.PieceCID) + if err == nil { + return pieceInfo, nil + } + lastErr = err + } + return piecestore.PieceInfoUndefined, lastErr } diff --git a/retrievalmarket/impl/provider_test.go b/retrievalmarket/impl/provider_test.go index 5aed94cc..eb505177 100644 --- a/retrievalmarket/impl/provider_test.go +++ b/retrievalmarket/impl/provider_test.go @@ -24,6 +24,14 @@ func TestHandleQueryStream(t *testing.T) { pcid := tut.GenerateCids(1)[0] expectedPeer := peer.ID("somepeer") expectedSize := uint64(1234) + expectedPieceCID := []byte("applesauce") + expectedCIDInfo := piecestore.CIDInfo{ + PieceBlockLocations: []piecestore.PieceBlockLocation{ + { + PieceCID: expectedPieceCID, + }, + }, + } expectedPiece := piecestore.PieceInfo{ Deals: []piecestore.DealInfo{ piecestore.DealInfo{ @@ -68,7 +76,8 @@ func TestHandleQueryStream(t *testing.T) { require.NoError(t, err) pieceStore := tut.NewTestPieceStore() - pieceStore.ExpectPiece(pcid.Bytes(), expectedPiece) + pieceStore.ExpectCID(pcid, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID, expectedPiece) receiveStreamOnProvider(qs, pieceStore) @@ -90,7 +99,8 @@ func TestHandleQueryStream(t *testing.T) { }) require.NoError(t, err) pieceStore := tut.NewTestPieceStore() - pieceStore.ExpectMissingPiece(pcid.Bytes()) + pieceStore.ExpectCID(pcid, expectedCIDInfo) + pieceStore.ExpectMissingPiece(expectedPieceCID) receiveStreamOnProvider(qs, pieceStore) @@ -104,19 +114,39 @@ func TestHandleQueryStream(t *testing.T) { require.Equal(t, response.MaxPaymentIntervalIncrease, expectedPaymentIntervalIncrease) }) - t.Run("error reading piece", func(t *testing.T) { + t.Run("cid info not found", func(t *testing.T) { qs := readWriteQueryStream() err := qs.WriteQuery(retrievalmarket.Query{ PayloadCID: pcid, }) require.NoError(t, err) pieceStore := tut.NewTestPieceStore() + pieceStore.ExpectMissingCID(pcid) receiveStreamOnProvider(qs, pieceStore) response, err := qs.ReadQueryResponse() pieceStore.VerifyExpectations(t) require.NoError(t, err) + require.Equal(t, response.Status, retrievalmarket.QueryResponseUnavailable) + require.Equal(t, response.PaymentAddress, expectedAddress) + require.Equal(t, response.MinPricePerByte, expectedPricePerByte) + require.Equal(t, response.MaxPaymentInterval, expectedPaymentInterval) + require.Equal(t, response.MaxPaymentIntervalIncrease, expectedPaymentIntervalIncrease) + }) + + t.Run("error reading piece", func(t *testing.T) { + qs := readWriteQueryStream() + err := qs.WriteQuery(retrievalmarket.Query{ + PayloadCID: pcid, + }) + require.NoError(t, err) + pieceStore := tut.NewTestPieceStore() + + receiveStreamOnProvider(qs, pieceStore) + + response, err := qs.ReadQueryResponse() + require.NoError(t, err) require.Equal(t, response.Status, retrievalmarket.QueryResponseError) require.NotEmpty(t, response.Message) }) @@ -128,7 +158,6 @@ func TestHandleQueryStream(t *testing.T) { receiveStreamOnProvider(qs, pieceStore) response, err := qs.ReadQueryResponse() - pieceStore.VerifyExpectations(t) require.NotNil(t, err) require.Equal(t, response, retrievalmarket.QueryResponseUndefined) }) @@ -146,7 +175,8 @@ func TestHandleQueryStream(t *testing.T) { }) require.NoError(t, err) pieceStore := tut.NewTestPieceStore() - pieceStore.ExpectPiece(pcid.Bytes(), expectedPiece) + pieceStore.ExpectCID(pcid, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID, expectedPiece) receiveStreamOnProvider(qs, pieceStore) diff --git a/retrievalmarket/impl/providerstates/provider_states.go b/retrievalmarket/impl/providerstates/provider_states.go index bb76044d..2f903cfb 100644 --- a/retrievalmarket/impl/providerstates/provider_states.go +++ b/retrievalmarket/impl/providerstates/provider_states.go @@ -3,6 +3,7 @@ package providerstates import ( "context" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -13,7 +14,7 @@ import ( // ProviderDealEnvironment is a bridge to the environment a provider deal is executing in type ProviderDealEnvironment interface { Node() rm.RetrievalProviderNode - GetPieceSize(pieceCID []byte) (uint64, error) + GetPieceSize(c cid.Cid) (uint64, error) DealStream() rmnet.RetrievalDealStream NextBlock(context.Context) (rm.Block, bool, error) CheckDealParams(pricePerByte tokenamount.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64) error @@ -54,7 +55,7 @@ func ReceiveDeal(ctx context.Context, environment ProviderDealEnvironment, deal } // verify we have the piece - _, err = environment.GetPieceSize(dealProposal.PayloadCID.Bytes()) + _, err = environment.GetPieceSize(dealProposal.PayloadCID) if err != nil { if err == rm.ErrNotFound { return responseFailure(environment.DealStream(), rm.DealStatusDealNotFound, rm.ErrNotFound.Error(), dealProposal.ID) diff --git a/retrievalmarket/impl/providerstates/provider_states_test.go b/retrievalmarket/impl/providerstates/provider_states_test.go index bc1c8f46..e0315c28 100644 --- a/retrievalmarket/impl/providerstates/provider_states_test.go +++ b/retrievalmarket/impl/providerstates/provider_states_test.go @@ -58,7 +58,7 @@ func TestReceiveDeal(t *testing.T) { ProposalReader: testnet.StubbedDealProposalReader(proposal), ResponseWriter: testnet.ExpectDealResponseWriter(t, expectedDealResponse), }) - fe.ExpectPiece(expectedPiece.Bytes(), 10000) + fe.ExpectPiece(expectedPiece, 10000) fe.ExpectParams(defaultPricePerByte, defaultCurrentInterval, defaultIntervalIncrease, nil) f := providerstates.ReceiveDeal(ctx, fe, *dealState) fe.VerifyExpectations(t) @@ -82,7 +82,7 @@ func TestReceiveDeal(t *testing.T) { ProposalReader: testnet.StubbedDealProposalReader(proposal), ResponseWriter: testnet.ExpectDealResponseWriter(t, expectedDealResponse), }) - fe.ExpectMissingPiece(expectedPiece.Bytes()) + fe.ExpectMissingPiece(expectedPiece) f := providerstates.ReceiveDeal(ctx, fe, *dealState) node.VerifyExpectations(t) fe.VerifyExpectations(t) @@ -104,7 +104,7 @@ func TestReceiveDeal(t *testing.T) { ProposalReader: testnet.StubbedDealProposalReader(proposal), ResponseWriter: testnet.ExpectDealResponseWriter(t, expectedDealResponse), }) - fe.ExpectPiece(expectedPiece.Bytes(), 10000) + fe.ExpectPiece(expectedPiece, 10000) fe.ExpectParams(defaultPricePerByte, defaultCurrentInterval, defaultIntervalIncrease, errors.New(message)) f := providerstates.ReceiveDeal(ctx, fe, *dealState) fe.VerifyExpectations(t) @@ -133,7 +133,7 @@ func TestReceiveDeal(t *testing.T) { ProposalReader: testnet.StubbedDealProposalReader(proposal), ResponseWriter: testnet.FailDealResponseWriter, }) - fe.ExpectPiece(expectedPiece.Bytes(), 10000) + fe.ExpectPiece(expectedPiece, 10000) fe.ExpectParams(defaultPricePerByte, defaultCurrentInterval, defaultIntervalIncrease, nil) f := providerstates.ReceiveDeal(ctx, fe, *dealState) fe.VerifyExpectations(t) @@ -360,42 +360,42 @@ type dealParamsKey struct { } type testProviderDealEnvironment struct { - node retrievalmarket.RetrievalProviderNode - ds rmnet.RetrievalDealStream - nextResponse int - responses []readBlockResponse - expectedParams map[dealParamsKey]error - receivedParams map[dealParamsKey]struct{} - expectedPieces map[string]uint64 - expectedMissingPieces map[string]struct{} - receivedPieces map[string]struct{} - receivedMissingPieces map[string]struct{} + node retrievalmarket.RetrievalProviderNode + ds rmnet.RetrievalDealStream + nextResponse int + responses []readBlockResponse + expectedParams map[dealParamsKey]error + receivedParams map[dealParamsKey]struct{} + expectedCIDs map[cid.Cid]uint64 + expectedMissingCIDs map[cid.Cid]struct{} + receivedCIDs map[cid.Cid]struct{} + receivedMissingCIDs map[cid.Cid]struct{} } func NewTestProviderDealEnvironment(node retrievalmarket.RetrievalProviderNode, ds rmnet.RetrievalDealStream, responses []readBlockResponse) *testProviderDealEnvironment { return &testProviderDealEnvironment{ - node: node, - ds: ds, - nextResponse: 0, - responses: responses, - expectedParams: make(map[dealParamsKey]error), - receivedParams: make(map[dealParamsKey]struct{}), - expectedPieces: make(map[string]uint64), - expectedMissingPieces: make(map[string]struct{}), - receivedPieces: make(map[string]struct{}), - receivedMissingPieces: make(map[string]struct{})} + node: node, + ds: ds, + nextResponse: 0, + responses: responses, + expectedParams: make(map[dealParamsKey]error), + receivedParams: make(map[dealParamsKey]struct{}), + expectedCIDs: make(map[cid.Cid]uint64), + expectedMissingCIDs: make(map[cid.Cid]struct{}), + receivedCIDs: make(map[cid.Cid]struct{}), + receivedMissingCIDs: make(map[cid.Cid]struct{})} } // ExpectPiece records a piece being expected to be queried and return the given piece info -func (te *testProviderDealEnvironment) ExpectPiece(pieceCid []byte, size uint64) { - te.expectedPieces[string(pieceCid)] = size +func (te *testProviderDealEnvironment) ExpectPiece(c cid.Cid, size uint64) { + te.expectedCIDs[c] = size } // ExpectMissingPiece records a piece being expected to be queried and should fail -func (te *testProviderDealEnvironment) ExpectMissingPiece(pieceCid []byte) { - te.expectedMissingPieces[string(pieceCid)] = struct{}{} +func (te *testProviderDealEnvironment) ExpectMissingPiece(c cid.Cid) { + te.expectedMissingCIDs[c] = struct{}{} } func (te *testProviderDealEnvironment) ExpectParams(pricePerByte tokenamount.TokenAmount, @@ -407,8 +407,8 @@ func (te *testProviderDealEnvironment) ExpectParams(pricePerByte tokenamount.Tok func (te *testProviderDealEnvironment) VerifyExpectations(t *testing.T) { require.Equal(t, len(te.expectedParams), len(te.receivedParams)) - require.Equal(t, len(te.expectedPieces), len(te.receivedPieces)) - require.Equal(t, len(te.expectedMissingPieces), len(te.receivedMissingPieces)) + require.Equal(t, len(te.expectedCIDs), len(te.receivedCIDs)) + require.Equal(t, len(te.expectedMissingCIDs), len(te.receivedMissingCIDs)) } func (te *testProviderDealEnvironment) Node() rm.RetrievalProviderNode { @@ -419,15 +419,15 @@ func (te *testProviderDealEnvironment) DealStream() rmnet.RetrievalDealStream { return te.ds } -func (te *testProviderDealEnvironment) GetPieceSize(pieceCID []byte) (uint64, error) { - pio, ok := te.expectedPieces[string(pieceCID)] +func (te *testProviderDealEnvironment) GetPieceSize(c cid.Cid) (uint64, error) { + pio, ok := te.expectedCIDs[c] if ok { - te.receivedPieces[string(pieceCID)] = struct{}{} + te.receivedCIDs[c] = struct{}{} return pio, nil } - _, ok = te.expectedMissingPieces[string(pieceCID)] + _, ok = te.expectedMissingCIDs[c] if ok { - te.receivedMissingPieces[string(pieceCID)] = struct{}{} + te.receivedMissingCIDs[c] = struct{}{} return 0, retrievalmarket.ErrNotFound } return 0, errors.New("GetPieceSize failed") diff --git a/shared_testutil/test_piecestore.go b/shared_testutil/test_piecestore.go index 3094121e..5ca54849 100644 --- a/shared_testutil/test_piecestore.go +++ b/shared_testutil/test_piecestore.go @@ -6,15 +6,18 @@ import ( "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" ) // TestPieceStore is piecestore who's query results are mocked type TestPieceStore struct { - expectedPieces map[string]piecestore.PieceInfo - expectedMissingPieces map[string]struct{} - receivedPieces map[string]struct{} - receivedMissingPieces map[string]struct{} + piecesStubbed map[string]piecestore.PieceInfo + piecesExpected map[string]struct{} + piecesReceived map[string]struct{} + cidInfosStubbed map[cid.Cid]piecestore.CIDInfo + cidInfosExpected map[cid.Cid]struct{} + cidInfosReceived map[cid.Cid]struct{} } var _ piecestore.PieceStore = &TestPieceStore{} @@ -22,55 +25,87 @@ var _ piecestore.PieceStore = &TestPieceStore{} // NewTestPieceStore creates a TestPieceStore func NewTestPieceStore() *TestPieceStore { return &TestPieceStore{ - expectedPieces: make(map[string]piecestore.PieceInfo), - expectedMissingPieces: make(map[string]struct{}), - receivedPieces: make(map[string]struct{}), - receivedMissingPieces: make(map[string]struct{}), + piecesStubbed: make(map[string]piecestore.PieceInfo), + piecesExpected: make(map[string]struct{}), + piecesReceived: make(map[string]struct{}), + cidInfosStubbed: make(map[cid.Cid]piecestore.CIDInfo), + cidInfosExpected: make(map[cid.Cid]struct{}), + cidInfosReceived: make(map[cid.Cid]struct{}), } } +// StubPiece creates a return value for the given piece cid without expecting it +// to be called +func (tps *TestPieceStore) StubPiece(pieceCid []byte, pieceInfo piecestore.PieceInfo) { + tps.piecesStubbed[string(pieceCid)] = pieceInfo +} + // ExpectPiece records a piece being expected to be queried and return the given piece info func (tps *TestPieceStore) ExpectPiece(pieceCid []byte, pieceInfo piecestore.PieceInfo) { - tps.expectedPieces[string(pieceCid)] = pieceInfo + tps.piecesExpected[string(pieceCid)] = struct{}{} + tps.StubPiece(pieceCid, pieceInfo) } // ExpectMissingPiece records a piece being expected to be queried and should fail func (tps *TestPieceStore) ExpectMissingPiece(pieceCid []byte) { - tps.expectedMissingPieces[string(pieceCid)] = struct{}{} + tps.piecesExpected[string(pieceCid)] = struct{}{} } -// VerifyExpectations verifies that the piecestore was queried in the expected ways -func (tps *TestPieceStore) VerifyExpectations(t *testing.T) { - require.Equal(t, len(tps.expectedPieces), len(tps.receivedPieces)) - require.Equal(t, len(tps.expectedMissingPieces), len(tps.receivedMissingPieces)) +// StubCID creates a return value for the given CID without expecting it +// to be called +func (tps *TestPieceStore) StubCID(c cid.Cid, cidInfo piecestore.CIDInfo) { + tps.cidInfosStubbed[c] = cidInfo } -func (tps *TestPieceStore) AddDealForPiece(pieceCID []byte, dealInfo piecestore.DealInfo) error { - panic("not implemented") +// ExpectCID records a CID being expected to be queried and return the given CID info +func (tps *TestPieceStore) ExpectCID(c cid.Cid, cidInfo piecestore.CIDInfo) { + tps.cidInfosExpected[c] = struct{}{} + tps.StubCID(c, cidInfo) } -func (tps *TestPieceStore) AddBlockInfosToPiece(pieceCID []byte, blockInfos []piecestore.BlockInfo) error { - panic("not implemented") +// ExpectMissingCID records a CID being expected to be queried and should fail +func (tps *TestPieceStore) ExpectMissingCID(c cid.Cid) { + tps.cidInfosExpected[c] = struct{}{} +} + +// VerifyExpectations verifies that the piecestore was queried in the expected ways +func (tps *TestPieceStore) VerifyExpectations(t *testing.T) { + require.Equal(t, tps.piecesExpected, tps.piecesReceived) + require.Equal(t, tps.cidInfosExpected, tps.cidInfosReceived) } -func (tps *TestPieceStore) HasBlockInfo(pieceCID []byte) (bool, error) { +func (tps *TestPieceStore) AddDealForPiece(pieceCID []byte, dealInfo piecestore.DealInfo) error { panic("not implemented") } -func (tps *TestPieceStore) HasDealInfo(pieceCID []byte) (bool, error) { +func (tps *TestPieceStore) AddPieceBlockLocations(pieceCID []byte, blockLocations map[cid.Cid]piecestore.BlockLocation) error { panic("not implemented") } func (tps *TestPieceStore) GetPieceInfo(pieceCID []byte) (piecestore.PieceInfo, error) { - pio, ok := tps.expectedPieces[string(pieceCID)] + tps.piecesReceived[string(pieceCID)] = struct{}{} + + pio, ok := tps.piecesStubbed[string(pieceCID)] if ok { - tps.receivedPieces[string(pieceCID)] = struct{}{} return pio, nil } - _, ok = tps.expectedMissingPieces[string(pieceCID)] + _, ok = tps.piecesExpected[string(pieceCID)] if ok { - tps.receivedMissingPieces[string(pieceCID)] = struct{}{} return piecestore.PieceInfoUndefined, retrievalmarket.ErrNotFound } - return piecestore.PieceInfoUndefined, errors.New("GetPieceSize failed") + return piecestore.PieceInfoUndefined, errors.New("GetPieceInfo failed") +} + +func (tps *TestPieceStore) GetCIDInfo(c cid.Cid) (piecestore.CIDInfo, error) { + tps.cidInfosReceived[c] = struct{}{} + + cio, ok := tps.cidInfosStubbed[c] + if ok { + return cio, nil + } + _, ok = tps.cidInfosExpected[c] + if ok { + return piecestore.CIDInfoUndefined, retrievalmarket.ErrNotFound + } + return piecestore.CIDInfoUndefined, errors.New("GetCIDInfo failed") } diff --git a/storagemarket/impl/provider_states.go b/storagemarket/impl/provider_states.go index 64b6ac8f..971bbe93 100644 --- a/storagemarket/impl/provider_states.go +++ b/storagemarket/impl/provider_states.go @@ -4,6 +4,7 @@ import ( "bytes" "context" + "github.com/ipfs/go-cid" ipldfree "github.com/ipld/go-ipld-prime/impl/free" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" @@ -221,15 +222,21 @@ func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal } func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - // TODO: observe sector lifecycle, status, expiration.. - err := p.fs.Delete(deal.PiecePath) - if err != nil { - return nil, err - } + err := p.fs.Delete(deal.PiecePath) + if err != nil { + return nil, err + } sectorID, offset, length, err := p.spn.LocatePieceForDealWithinSector(ctx, deal.DealID) if err != nil { return nil, err } + // TODO: Record actual block locations for all CIDs in piece by improving car writing + err = p.pieceStore.AddPieceBlockLocations(deal.Proposal.PieceRef, map[cid.Cid]piecestore.BlockLocation{ + deal.Ref: {}, + }) + if err != nil { + return nil, err + } return nil, p.pieceStore.AddDealForPiece(deal.Proposal.PieceRef, piecestore.DealInfo{ DealID: deal.DealID, SectorID: sectorID,