Skip to content

Commit

Permalink
fix: booster-bitswap client - extract blocks from identity cids
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jul 17, 2023
1 parent 8c9f8dc commit 0731f5b
Showing 1 changed file with 76 additions and 26 deletions.
102 changes: 76 additions & 26 deletions cmd/booster-bitswap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ import (
"context"
"crypto/rand"
"fmt"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal"
mh "github.com/multiformats/go-multihash"
"net/http"
_ "net/http/pprof"
"sort"
"sync/atomic"
"time"

"github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap"
lotus_blockstore "github.com/filecoin-project/lotus/blockstore"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/boxo/bitswap/client"
bsnetwork "github.com/ipfs/boxo/bitswap/network"
Expand Down Expand Up @@ -109,7 +114,8 @@ var fetchCmd = &cli.Command{

ctx, cancel := context.WithCancel(ctx)
defer cancel()
brn := &blockReceiver{bs: bs, ctx: ctx, cancel: cancel}
idbs := lotus_blockstore.WrapIDStore(bs)
brn := &blockReceiver{bs: idbs, ctx: ctx, cancel: cancel}
bsClient := client.New(ctx, net, bs, client.WithBlockReceivedNotifier(brn))
defer bsClient.Close()
net.Start(bsClient)
Expand Down Expand Up @@ -163,37 +169,51 @@ var fetchCmd = &cli.Command{
}

func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle chan struct{}) (uint64, uint64, error) {
if throttle != nil {
throttle <- struct{}{}
}
// Get the block
start := time.Now()
blk, err := bsClient.GetBlock(ctx, c)
if throttle != nil {
<-throttle
}
if err != nil {
return 0, 0, err
}
var size uint64
var links []cid.Cid
if c.Prefix().MhType == mh.IDENTITY {
var err error
size, links, err = getIDBlock(c)
if err != nil {
return 0, 0, err
}
} else {
if throttle != nil {
throttle <- struct{}{}
}
// Get the block
start := time.Now()
blk, err := bsClient.GetBlock(ctx, c)
if throttle != nil {
<-throttle
}
if err != nil {
return 0, 0, err
}

var size = uint64(len(blk.RawData()))
log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start).String())
size = uint64(len(blk.RawData()))
log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start).String())

// Read the links from the block to child nodes in the DAG
var count = uint64(1)
ipldDecoder := ipldlegacy.NewDecoder()
nd, err := ipldDecoder.DecodeNode(ctx, blk)
if err != nil {
return 0, 0, fmt.Errorf("decoding node %s: %w", c, err)
// Read the links from the block to child nodes in the DAG
ipldDecoder := ipldlegacy.NewDecoder()
nd, err := ipldDecoder.DecodeNode(ctx, blk)
if err != nil {
return 0, 0, fmt.Errorf("decoding node %s: %w", c, err)
}

ndLinks := nd.Links()
for _, l := range ndLinks {
links = append(links, l.Cid)
}
}

var count = uint64(1)
var eg errgroup.Group
lnks := nd.Links()
for _, l := range lnks {
l := l
for _, link := range links {
link := link
// Launch a go routine to fetch the blocks underneath each link
eg.Go(func() error {
cnt, sz, err := getBlocks(ctx, bsClient, l.Cid, throttle)
cnt, sz, err := getBlocks(ctx, bsClient, link, throttle)
if err != nil {
return err
}
Expand All @@ -206,8 +226,38 @@ func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle
return count, size, eg.Wait()
}

func getIDBlock(c cid.Cid) (uint64, []cid.Cid, error) {
dmh, err := mh.Decode(c.Hash())
if err != nil {
return 0, nil, err
}

if dmh.Code != mh.IDENTITY {
return 0, nil, fmt.Errorf("bad cid: multihash type identity but decoded mh is not identity")
}

decoder, err := cidlink.DefaultLinkSystem().DecoderChooser(cidlink.Link{Cid: c})
if err != nil {
return 0, nil, fmt.Errorf("choosing decoder for identity CID %s: %w", c, err)
}
node, err := ipld.Decode(dmh.Digest, decoder)
if err != nil {
return 0, nil, fmt.Errorf("decoding identity CID %s: %w", c, err)
}
links, err := traversal.SelectLinks(node)
if err != nil {
return 0, nil, fmt.Errorf("collecting links from identity CID %s: %w", c, err)
}
// convert from Link to Cid
resultCids := make([]cid.Cid, 0)
for _, link_ := range links {
resultCids = append(resultCids, link_.(cidlink.Link).Cid)
}
return uint64(len(dmh.Digest)), resultCids, nil
}

type blockReceiver struct {
bs *blockstore.ReadWrite
bs lotus_blockstore.Blockstore
ctx context.Context
cancel context.CancelFunc
}
Expand Down

0 comments on commit 0731f5b

Please sign in to comment.