Skip to content

Commit

Permalink
feat(retrievalmarket): extract skeleton interfaces
Browse files Browse the repository at this point in the history
Define all types to spec, modify interfaces, wrap old code

fix(builder): use client blockstore for retrieval

feat(retrieval): add node implementations

add node adapters for client & provider so that retrieval can be extracted
  • Loading branch information
hannahhoward committed Jan 7, 2020
1 parent 463692e commit ec44c96
Show file tree
Hide file tree
Showing 20 changed files with 1,106 additions and 413 deletions.
4 changes: 2 additions & 2 deletions chain/deals/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/dtypes"

retrievalmarket "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/retrieval/discovery"
)

Expand Down Expand Up @@ -252,7 +252,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro

c.incoming <- deal

return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{
return deal.ProposalCid, c.discovery.AddPeer(p.Data, retrievalmarket.RetrievalPeer{
Address: dealProposal.Provider,
ID: deal.Miner,
})
Expand Down
16 changes: 0 additions & 16 deletions gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/lotus/chain/deals"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/paych"
"github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/storage"
)

Expand Down Expand Up @@ -59,21 +58,6 @@ func main() {
os.Exit(1)
}

err = gen.WriteTupleEncodersToFile("./retrieval/cbor_gen.go", "retrieval",
retrieval.RetParams{},

retrieval.Query{},
retrieval.QueryResponse{},
retrieval.Unixfs0Offer{},
retrieval.DealProposal{},
retrieval.DealResponse{},
retrieval.Block{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync",
blocksync.BlockSyncRequest{},
blocksync.BlockSyncResponse{},
Expand Down
8 changes: 4 additions & 4 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/paych"
"github.com/filecoin-project/lotus/peermgr"
"github.com/filecoin-project/lotus/retrieval"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/retrieval/discovery"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
Expand Down Expand Up @@ -221,9 +221,9 @@ func Online() Option {
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),

Override(new(*discovery.Local), discovery.NewLocal),
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
Override(new(retrievalmarket.PeerResolver), modules.RetrievalResolver),

Override(new(*retrieval.Client), retrieval.NewClient),
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
Override(new(dtypes.ClientDealStore), modules.NewClientDealStore),
Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer),
Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator),
Expand All @@ -246,7 +246,7 @@ func Online() Option {
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingDAG), modules.StagingDAG),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync),
Override(new(*retrieval.Miner), retrieval.NewMiner),
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator),
Expand Down
63 changes: 50 additions & 13 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"bytes"
"context"
"errors"
"io"
Expand All @@ -17,6 +18,7 @@ import (
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -31,8 +33,7 @@ import (
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/retrieval/discovery"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
)

type API struct {
Expand All @@ -44,8 +45,8 @@ type API struct {
paych.PaychAPI

DealClient *deals.Client
RetDiscovery discovery.PeerResolver
Retrieval *retrieval.Client
RetDiscovery retrievalmarket.PeerResolver
Retrieval retrievalmarket.RetrievalClient
Chain *store.ChainStore

LocalDAG dtypes.ClientDAG
Expand Down Expand Up @@ -153,7 +154,18 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe

out := make([]api.QueryOffer, len(peers))
for k, p := range peers {
out[k] = a.Retrieval.Query(ctx, p, root)
queryResponse, err := a.Retrieval.Query(ctx, p, root.Bytes(), retrievalmarket.QueryParams{})
if err != nil {
out[k] = api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
} else {
out[k] = api.QueryOffer{
Root: root,
Size: queryResponse.Size,
MinPrice: queryResponse.PieceRetrievalPrice(),
Miner: p.Address, // TODO: check
MinerPeerID: p.ID,
}
}
}

return out, nil
Expand Down Expand Up @@ -263,18 +275,43 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
order.MinerPeerID = pid
}

outFile, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0777)
if err != nil {
return err
retrievalResult := make(chan error, 1)

unsubscribe := a.Retrieval.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
if bytes.Equal(state.PieceCID, order.Root.Bytes()) {
switch event {
case retrievalmarket.ClientEventError:
retrievalResult <- xerrors.New("Retrieval Error")
case retrievalmarket.ClientEventComplete:
retrievalResult <- nil
}
}
})

a.Retrieval.Retrieve(
ctx, order.Root.Bytes(), retrievalmarket.Params{
PricePerByte: types.BigDiv(order.Total, types.NewInt(order.Size)),
}, order.Total, order.MinerPeerID, order.Client, order.Miner)
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case err := <-retrievalResult:
if err != nil {
return xerrors.Errorf("RetrieveUnixfs: %w", err)
}
}

err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.Total, order.MinerPeerID, order.Client, order.Miner, outFile)
unsubscribe()

nd, err := a.LocalDAG.Get(ctx, order.Root)
if err != nil {
_ = outFile.Close()
return xerrors.Errorf("RetrieveUnixfs: %w", err)
return xerrors.Errorf("ClientRetrieve: %w", err)
}

return outFile.Close()
file, err := unixfile.NewUnixfsFile(ctx, a.LocalDAG, nd)
if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err)
}
return files.WriteTo(file, path)
}

func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) {
Expand Down
11 changes: 11 additions & 0 deletions node/modules/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package modules

import (
"context"
"github.com/filecoin-project/lotus/retrievaladapter"
"path/filepath"
"reflect"

"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/paych"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
graphsync "github.com/ipfs/go-graphsync/impl"
Expand All @@ -26,6 +30,7 @@ import (

"github.com/filecoin-project/go-data-transfer/impl/graphsync"
"github.com/filecoin-project/lotus/chain/deals"
payapi "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
Expand Down Expand Up @@ -97,3 +102,9 @@ func ClientGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.Client

return gs
}

// RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient {
adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi)
return retrievalimpl.NewClient(h, bs, adapter)
}
3 changes: 2 additions & 1 deletion node/modules/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/peermgr"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/retrieval/discovery"
)

Expand Down Expand Up @@ -80,6 +81,6 @@ func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c *deals.Client) {
})
}

func RetrievalResolver(l *discovery.Local) discovery.PeerResolver {
func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver {
return discovery.Multi(l)
}
16 changes: 12 additions & 4 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/retrieval"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
"github.com/filecoin-project/lotus/retrievaladapter"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)

func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
Expand Down Expand Up @@ -115,11 +118,10 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
return sm, nil
}

func HandleRetrieval(host host.Host, lc fx.Lifecycle, m *retrieval.Miner) {
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider) {
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
host.SetStreamHandler(retrieval.QueryProtocolID, m.HandleQueryStream)
host.SetStreamHandler(retrieval.ProtocolID, m.HandleDealStream)
m.Start(host)
return nil
},
})
Expand Down Expand Up @@ -261,3 +263,9 @@ func SealTicketGen(api api.FullNode) storage.TicketFn {
}, nil
}
}

// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider {
adapter := retrievaladapter.NewRetrievalProviderNode(full)
return retrievalimpl.NewProvider(sblks, adapter)
}
20 changes: 20 additions & 0 deletions retrieval/cbor-gen/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"fmt"
"os"

retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
)

// main func has ONE JOB
func main() {
fmt.Print("Generating Cbor Marshal/Unmarshal...")

if err := retrievalimpl.RunCborGen(); err != nil {
fmt.Println("Failed: ")
fmt.Println(err)
os.Exit(1)
}
fmt.Println("Done.")
}
Loading

0 comments on commit ec44c96

Please sign in to comment.