Skip to content

Commit

Permalink
Use a bitswap session for 'Cat'
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
  • Loading branch information
whyrusleeping committed Feb 2, 2018
1 parent 3f2c774 commit 704ec60
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 17 deletions.
15 changes: 12 additions & 3 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
uio "github.com/ipfs/go-ipfs/unixfs/io"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

type CoreAPI struct {
Expand Down Expand Up @@ -49,12 +50,16 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI {
// ResolveNode resolves the path `p` using Unixfx resolver, gets and returns the
// resolved Node.
func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (coreiface.Node, error) {
p, err := api.ResolvePath(ctx, p)
return resolveNode(ctx, api.node.DAG, p)
}

func resolveNode(ctx context.Context, ng ipld.NodeGetter, p coreiface.Path) (coreiface.Node, error) {
p, err := resolvePath(ctx, ng, p)
if err != nil {
return nil, err
}

node, err := api.node.DAG.Get(ctx, p.Cid())
node, err := ng.Get(ctx, p.Cid())
if err != nil {
return nil, err
}
Expand All @@ -65,12 +70,16 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (coreifac
// resolved path.
// TODO: store all of ipfspath.Resolver.ResolvePathComponents() in Path
func (api *CoreAPI) ResolvePath(ctx context.Context, p coreiface.Path) (coreiface.Path, error) {
return resolvePath(ctx, api.node.DAG, p)
}

func resolvePath(ctx context.Context, ng ipld.NodeGetter, p coreiface.Path) (coreiface.Path, error) {
if p.Resolved() {
return p, nil
}

r := &ipfspath.Resolver{
DAG: api.node.DAG,
DAG: ng,
ResolveOnce: uio.ResolveUnixfsOnce,
}

Expand Down
3 changes: 3 additions & 0 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
dag "github.com/ipfs/go-ipfs/merkledag"
uio "github.com/ipfs/go-ipfs/unixfs/io"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
Expand All @@ -30,6 +31,8 @@ func (api *UnixfsAPI) Add(ctx context.Context, r io.Reader) (coreiface.Path, err

// Cat returns the data contained by an IPFS or IPNS object(s) at path `p`.
func (api *UnixfsAPI) Cat(ctx context.Context, p coreiface.Path) (coreiface.Reader, error) {
ses := dag.NewSession(ctx, api.node.DAG)

dagnode, err := api.core().ResolveNode(ctx, p)
if err != nil {
return nil, err
Expand Down
9 changes: 9 additions & 0 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ func (sg *sesGetter) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.
return getNodesFromBG(ctx, sg.bs, keys)
}

func NewSession(ctx context.Context, dag ipld.DAGService) ipld.NodeGetter {
switch impl := dag.(type) {
case *dagService:
return &sesGetter{bserv.NewSession(ctx, impl.Blocks)}
default:
return dag
}
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error {
var ng ipld.NodeGetter = serv
Expand Down
6 changes: 3 additions & 3 deletions path/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (e ErrNoLink) Error() string {
// TODO: now that this is more modular, try to unify this code with the
// the resolvers in namesys
type Resolver struct {
DAG ipld.DAGService
DAG ipld.NodeGetter

ResolveOnce func(ctx context.Context, ds ipld.DAGService, nd ipld.Node, names []string) (*ipld.Link, []string, error)
ResolveOnce func(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error)
}

// NewBasicResolver constructs a new basic resolver.
Expand Down Expand Up @@ -124,7 +124,7 @@ func (s *Resolver) ResolvePath(ctx context.Context, fpath Path) (ipld.Node, erro

// ResolveSingle simply resolves one hop of a path through a graph with no
// extra context (does not opaquely resolve through sharded nodes)
func ResolveSingle(ctx context.Context, ds ipld.DAGService, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
func ResolveSingle(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
return nd.ResolveLink(names)
}

Expand Down
43 changes: 36 additions & 7 deletions unixfs/hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
HashMurmur3 uint64 = 0x22
)

var ErrReadOnly = fmt.Errorf("hamt is in readonly mode")

type HamtShard struct {
nd *dag.ProtoNode

Expand All @@ -57,7 +59,8 @@ type HamtShard struct {
prefixPadStr string
maxpadlen int

dserv ipld.DAGService
dserv ipld.DAGService
nodeRead ipld.NodeGetter
}

// child can either be another shard, or a leaf node value
Expand All @@ -68,7 +71,11 @@ type child interface {

// NewHamtShard creates a new, empty HAMT shard with the given size.
func NewHamtShard(dserv ipld.DAGService, size int) (*HamtShard, error) {
ds, err := makeHamtShard(dserv, size)
return newHamtShard(dserv, dserv, size)
}

func newHamtShard(dserv ipld.DAGService, read ipld.NodeGetter, size int) (*HamtShard, error) {
ds, err := makeHamtShard(dserv, read, size)
if err != nil {
return nil, err
}
Expand All @@ -79,7 +86,7 @@ func NewHamtShard(dserv ipld.DAGService, size int) (*HamtShard, error) {
return ds, nil
}

func makeHamtShard(ds ipld.DAGService, size int) (*HamtShard, error) {
func makeHamtShard(ds ipld.DAGService, nr ipld.NodeGetter, size int) (*HamtShard, error) {
lg2s := int(math.Log2(float64(size)))
if 1<<uint(lg2s) != size {
return nil, fmt.Errorf("hamt size should be a power of two")
Expand All @@ -91,11 +98,20 @@ func makeHamtShard(ds ipld.DAGService, size int) (*HamtShard, error) {
maxpadlen: len(maxpadding),
tableSize: size,
dserv: ds,
nodeRead: nr,
}, nil
}

func NewHamtReader(ng ipld.NodeGetter, nd ipld.Node) (*HamtShard, error) {
return newHamtFromDag(nil, ng, nd)
}

// NewHamtFromDag creates new a HAMT shard from the given DAG.
func NewHamtFromDag(dserv ipld.DAGService, nd ipld.Node) (*HamtShard, error) {
return newHamtFromDag(dserv, dserv, nd)
}

func newHamtFromDag(write ipld.DAGService, read ipld.NodeGetter, nd ipld.Node) (*HamtShard, error) {
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrLinkNotFound
Expand All @@ -114,7 +130,7 @@ func NewHamtFromDag(dserv ipld.DAGService, nd ipld.Node) (*HamtShard, error) {
return nil, fmt.Errorf("only murmur3 supported as hash function")
}

ds, err := makeHamtShard(dserv, int(pbd.GetFanout()))
ds, err := makeHamtShard(write, read, int(pbd.GetFanout()))
if err != nil {
return nil, err
}
Expand All @@ -140,6 +156,9 @@ func (ds *HamtShard) Prefix() *cid.Prefix {

// Node serializes the HAMT structure into a merkledag node with unixfs formatting
func (ds *HamtShard) Node() (ipld.Node, error) {
if ds.dserv == nil {
return nil, ErrReadOnly
}
out := new(dag.ProtoNode)
out.SetPrefix(ds.prefix)

Expand Down Expand Up @@ -222,6 +241,10 @@ func (ds *HamtShard) Label() string {

// Set sets 'name' = nd in the HAMT
func (ds *HamtShard) Set(ctx context.Context, name string, nd ipld.Node) error {
if ds.dserv == nil {
return ErrReadOnly
}

hv := &hashBits{b: hash([]byte(name))}
err := ds.dserv.Add(ctx, nd)
if err != nil {
Expand Down Expand Up @@ -287,7 +310,7 @@ func (ds *HamtShard) loadChild(ctx context.Context, i int) (child, error) {
return nil, fmt.Errorf("invalid link name '%s'", lnk.Name)
}

nd, err := lnk.GetNode(ctx, ds.dserv)
nd, err := lnk.GetNode(ctx, ds.nodeRead)
if err != nil {
return nil, err
}
Expand All @@ -308,7 +331,7 @@ func (ds *HamtShard) loadChild(ctx context.Context, i int) (child, error) {
return nil, fmt.Errorf("HAMT entries must have non-zero length name")
}

cds, err := NewHamtFromDag(ds.dserv, nd)
cds, err := newHamtFromDag(ds.dserv, ds.nodeRead, nd)
if err != nil {
return nil, err
}
Expand All @@ -332,6 +355,9 @@ func (ds *HamtShard) setChild(i int, c child) {

// Link returns a merklelink to this shard node
func (ds *HamtShard) Link() (*ipld.Link, error) {
if ds.dserv == nil {
return nil, ErrReadOnly
}
nd, err := ds.Node()
if err != nil {
return nil, err
Expand Down Expand Up @@ -453,6 +479,9 @@ func (ds *HamtShard) walkTrie(ctx context.Context, cb func(*shardValue) error) e
}

func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string, val *ipld.Link) error {
if ds.dserv == nil {
return ErrReadOnly
}
idx := hv.Next(ds.tableSizeLg2)

if ds.bitfield.Bit(idx) != 1 {
Expand Down Expand Up @@ -510,7 +539,7 @@ func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string,
}

// replace value with another shard, one level deeper
ns, err := NewHamtShard(ds.dserv, ds.tableSize)
ns, err := newHamtShard(ds.dserv, ds.nodeRead, ds.tableSize)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ReadSeekCloser interface {

// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.DAGService) (DagReader, error) {
func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagReader, error) {
switch n := n.(type) {
case *mdag.RawNode:
return NewBufDagReader(n.RawData()), nil
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/pbdagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// DagReader provides a way to easily read the data contained in a dag.
type pbDagReader struct {
serv ipld.DAGService
serv ipld.NodeGetter

// the node being read
node *mdag.ProtoNode
Expand Down Expand Up @@ -51,7 +51,7 @@ type pbDagReader struct {
var _ DagReader = (*pbDagReader)(nil)

// NewPBFileReader constructs a new PBFileReader.
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.DAGService) *pbDagReader {
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.NodeGetter) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
curLinks := getLinkCids(n)
return &pbDagReader{
Expand Down
2 changes: 1 addition & 1 deletion unixfs/io/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// ResolveUnixfsOnce resolves a single hop of a path through a graph in a
// unixfs context. This includes handling traversing sharded directories.
func ResolveUnixfsOnce(ctx context.Context, ds ipld.DAGService, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
func ResolveUnixfsOnce(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
switch nd := nd.(type) {
case *dag.ProtoNode:
upb, err := ft.FromBytes(nd.Data())
Expand Down

0 comments on commit 704ec60

Please sign in to comment.