Skip to content

Commit

Permalink
feat(datatransfer): implement and extract
Browse files Browse the repository at this point in the history
feat(datatransfer): setup implementation path

Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app

WIP using CBOR encoding for dataxfermsg

* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing

WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff

all message tests passing, some others in datatransfer

Some cleanup for PR

Cleanup for PR, clarifying and additional comments

mod tidy

Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters

Fix tests

Initiate push and pull requests (#536)

* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type

tests passing

cleanup for PR

remove unused fmt import in graphsync_test

a better reflection

send data transfer response

other tests passing

feat(datatransfer): milestone 2 infrastructure

Setup test path for all tickets for milestone 2

responses alert subscribers when request is not accepted (#607)

Graphsync response is scheduled when a valid push request is received (#625)

fix(datatransfer): fix tests

fix an error with read buffers in tests

fix(deps): fix go.sum

Feat/dt graphsync pullreqs (#627)

* graphsync responses to pull requests

Feat/dt initiator cleanup (#645)

* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above

Techdebt/dt split graphsync impl receiver (#651)

* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils

DTM sends data over graphsync for validated push requests (#665)

* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs

with @hannahhoward

Feat/dt gs pullrequests (#693)

* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)

Feat/dt subscribe, file Xfer round trip (#720)

Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter

Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)

* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy

minor fix to go.sum

feat(datatransfer): switch to graphsync implementation

Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side

fix(datatransfer): Fix validators

Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic

Feat/dt extraction use go-fil-components/datatransfer (#770)

* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy

use a package

updates after rebase with master
  • Loading branch information
hannahhoward committed Dec 17, 2019
1 parent 4ace1fe commit 798d411
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 324 deletions.
22 changes: 20 additions & 2 deletions chain/deals/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions chain/deals/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/lotus/lib/statestore"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/dtypes"

"github.com/filecoin-project/lotus/retrieval/discovery"
)

Expand All @@ -35,6 +36,7 @@ type ClientDeal struct {
Miner peer.ID
MinerWorker address.Address
DealID uint64
PayloadCid cid.Cid

PublishMessage *types.SignedMessage

Expand Down Expand Up @@ -244,8 +246,8 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
State: api.DealUnknown,
Miner: p.MinerID,
MinerWorker: p.MinerWorker,

s: s,
PayloadCid: p.Data,
s: s,
}

c.incoming <- deal
Expand Down
7 changes: 3 additions & 4 deletions chain/deals/client_utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package deals

import (
"bytes"
"context"
"runtime"

Expand All @@ -12,7 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/datatransfer"
"github.com/filecoin-project/go-fil-components/datatransfer"
"github.com/filecoin-project/lotus/lib/cborutil"
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
Expand Down Expand Up @@ -147,7 +146,7 @@ func (c *ClientRequestValidator) ValidatePull(
Selector ipld.Node) error {
dealVoucher, ok := voucher.(*StorageDataTransferVoucher)
if !ok {
return xerrors.Errorf("voucher type %s: %w", voucher.Identifier(), ErrWrongVoucherType)
return xerrors.Errorf("voucher type %s: %w", voucher.Type(), ErrWrongVoucherType)
}

var deal ClientDeal
Expand All @@ -158,7 +157,7 @@ func (c *ClientRequestValidator) ValidatePull(
if deal.Miner != receiver {
return xerrors.Errorf("Deal Peer %s, Data Transfer Peer %s: %w", deal.Miner.String(), receiver.String(), ErrWrongPeer)
}
if !bytes.Equal(deal.Proposal.PieceRef, baseCid.Bytes()) {
if !deal.PayloadCid.Equals(baseCid) {
return xerrors.Errorf("Deal Payload CID %s, Data Transfer CID %s: %w", string(deal.Proposal.PieceRef), baseCid.String(), ErrWrongPiece)
}
for _, state := range DataTransferStates {
Expand Down
4 changes: 2 additions & 2 deletions chain/deals/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-components/datatransfer"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/datatransfer"
"github.com/filecoin-project/lotus/lib/cborutil"
"github.com/filecoin-project/lotus/lib/statestore"
"github.com/filecoin-project/lotus/node/modules/dtypes"
Expand Down Expand Up @@ -225,7 +225,7 @@ func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState da
var next api.DealState
var err error
var mut func(*MinerDeal)
switch event {
switch event.Code {
case datatransfer.Complete:
next = api.DealStaged
mut = func(deal *MinerDeal) {
Expand Down
7 changes: 3 additions & 4 deletions chain/deals/provider_utils.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package deals

import (
"bytes"
"context"
"runtime"

"github.com/filecoin-project/go-fil-components/datatransfer"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/datatransfer"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/ipld/go-ipld-prime"

Expand Down Expand Up @@ -163,7 +162,7 @@ func (m *ProviderRequestValidator) ValidatePush(
Selector ipld.Node) error {
dealVoucher, ok := voucher.(*StorageDataTransferVoucher)
if !ok {
return xerrors.Errorf("voucher type %s: %w", voucher.Identifier(), ErrWrongVoucherType)
return xerrors.Errorf("voucher type %s: %w", voucher.Type(), ErrWrongVoucherType)
}

var deal MinerDeal
Expand All @@ -175,7 +174,7 @@ func (m *ProviderRequestValidator) ValidatePush(
return xerrors.Errorf("Deal Peer %s, Data Transfer Peer %s: %w", deal.Client.String(), sender.String(), ErrWrongPeer)
}

if !bytes.Equal(deal.Proposal.PieceRef, baseCid.Bytes()) {
if !deal.Ref.Equals(baseCid) {
return xerrors.Errorf("Deal Payload CID %s, Data Transfer CID %s: %w", string(deal.Proposal.PieceRef), baseCid.String(), ErrWrongPiece)
}
for _, state := range DataTransferStates {
Expand Down
50 changes: 15 additions & 35 deletions chain/deals/request_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (wrongDTType) FromBytes([]byte) error {
return fmt.Errorf("not implemented")
}

func (wrongDTType) Identifier() string {
func (wrongDTType) Type() string {
return "WrongDTTYPE"
}

Expand Down Expand Up @@ -76,6 +76,7 @@ func newClientDeal(minerID peer.ID, state api.DealState) (deals.ClientDeal, erro
return deals.ClientDeal{
Proposal: newProposal,
ProposalCid: proposalNd.Cid(),
PayloadCid: blockGenerator.Next().Cid(),
Miner: minerID,
MinerWorker: minerAddr,
State: state,
Expand All @@ -91,10 +92,7 @@ func newMinerDeal(clientID peer.ID, state api.DealState) (deals.MinerDeal, error
if err != nil {
return deals.MinerDeal{}, err
}
ref, err := cid.Cast(newProposal.PieceRef)
if err != nil {
return deals.MinerDeal{}, err
}
ref := blockGenerator.Next().Cid()

return deals.MinerDeal{
Proposal: newProposal,
Expand Down Expand Up @@ -143,11 +141,8 @@ func TestClientRequestValidation(t *testing.T) {
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
t.Fatal("deal tracking failed")
}
pieceRef, err := cid.Cast(clientDeal.Proposal.PieceRef)
if err != nil {
t.Fatal("unable to construct piece cid")
}
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid, 1}, pieceRef, nil), deals.ErrWrongPeer) {
payloadCid := clientDeal.PayloadCid
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid, 1}, payloadCid, nil), deals.ErrWrongPeer) {
t.Fatal("Pull should fail if miner address is incorrect")
}
})
Expand All @@ -171,11 +166,8 @@ func TestClientRequestValidation(t *testing.T) {
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
t.Fatal("deal tracking failed")
}
pieceRef, err := cid.Cast(clientDeal.Proposal.PieceRef)
if err != nil {
t.Fatal("unable to construct piece cid")
}
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid, 1}, pieceRef, nil), deals.ErrInacceptableDealState) {
payloadCid := clientDeal.PayloadCid
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid, 1}, payloadCid, nil), deals.ErrInacceptableDealState) {
t.Fatal("Pull should fail if deal is in a state that cannot be data transferred")
}
})
Expand All @@ -187,11 +179,8 @@ func TestClientRequestValidation(t *testing.T) {
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
t.Fatal("deal tracking failed")
}
pieceRef, err := cid.Cast(clientDeal.Proposal.PieceRef)
if err != nil {
t.Fatal("unable to construct piece cid")
}
if crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid, 1}, pieceRef, nil) != nil {
payloadCid := clientDeal.PayloadCid
if crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid, 1}, payloadCid, nil) != nil {
t.Fatal("Pull should should succeed when all parameters are correct")
}
})
Expand Down Expand Up @@ -236,11 +225,8 @@ func TestProviderRequestValidation(t *testing.T) {
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
t.Fatal("deal tracking failed")
}
pieceRef, err := cid.Cast(minerDeal.Proposal.PieceRef)
if err != nil {
t.Fatal("unable to construct piece cid")
}
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid, 1}, pieceRef, nil), deals.ErrWrongPeer) {
ref := minerDeal.Ref
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid, 1}, ref, nil), deals.ErrWrongPeer) {
t.Fatal("Push should fail if miner address is incorrect")
}
})
Expand All @@ -264,11 +250,8 @@ func TestProviderRequestValidation(t *testing.T) {
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
t.Fatal("deal tracking failed")
}
pieceRef, err := cid.Cast(minerDeal.Proposal.PieceRef)
if err != nil {
t.Fatal("unable to construct piece cid")
}
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid, 1}, pieceRef, nil), deals.ErrInacceptableDealState) {
ref := minerDeal.Ref
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid, 1}, ref, nil), deals.ErrInacceptableDealState) {
t.Fatal("Push should fail if deal is in a state that cannot be data transferred")
}
})
Expand All @@ -280,11 +263,8 @@ func TestProviderRequestValidation(t *testing.T) {
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
t.Fatal("deal tracking failed")
}
pieceRef, err := cid.Cast(minerDeal.Proposal.PieceRef)
if err != nil {
t.Fatal("unable to construct piece cid")
}
if mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid, 1}, pieceRef, nil) != nil {
ref := minerDeal.Ref
if mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid, 1}, ref, nil) != nil {
t.Fatal("Push should should succeed when all parameters are correct")
}
})
Expand Down
4 changes: 2 additions & 2 deletions chain/deals/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (dv *StorageDataTransferVoucher) FromBytes(raw []byte) error {
return dv.UnmarshalCBOR(r)
}

// Identifier is the unique string identifier for a StorageDataTransferVoucher
func (dv *StorageDataTransferVoucher) Identifier() string {
// Type is the unique string identifier for a StorageDataTransferVoucher
func (dv *StorageDataTransferVoucher) Type() string {
return "StorageDataTransferVoucher"
}
78 changes: 0 additions & 78 deletions datatransfer/dagservice_impl.go

This file was deleted.

Loading

0 comments on commit 798d411

Please sign in to comment.