Skip to content

Commit

Permalink
feat:add get quic
Browse files Browse the repository at this point in the history
  • Loading branch information
yann committed Oct 26, 2023
1 parent 26da21b commit 0c00cc6
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 43 deletions.
11 changes: 8 additions & 3 deletions cmd/gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/kubo/client/rpc"
ma "github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
dagpool "github.com/yann-y/fds/dag/pool/ipfs"
"github.com/yann-y/fds/internal/iam"
Expand Down Expand Up @@ -60,14 +62,17 @@ func startServer(cctx *cli.Context) {
}
defer db.Close()
router := mux.NewRouter()
//poolClient, err := dagpoolcli.NewPoolClient(poolAddr, poolUser, poolPassword, true)
poolClient, err := dagpool.NewPoolClient(poolAddr, true)
kuboApi, err := rpc.NewApi(ma.StringCast(poolAddr))
if err != nil {
log.Fatal(err)
}
poolClient, err := dagpool.NewPoolClient(kuboApi, true)
if err != nil {
log.Fatalf("connect dagpool server err: %v", err)
}
defer poolClient.Close(context.TODO())
dagServ := merkledag.NewDAGService(dagpoolcli.NewBlockService(poolClient))
storageSys := store.NewStorageSys(cctx.Context, dagServ, db)
storageSys := store.NewStorageSys(cctx.Context, dagServ, kuboApi, db)
authSys := iam.NewAuthSys(db, cred)
bmSys := store.NewBucketMetadataSys(db)
storageSys.SetNewBucketNSLock(bmSys.NewNSLock)
Expand Down
16 changes: 7 additions & 9 deletions dag/pool/ipfs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/kubo/client/rpc"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"golang.org/x/xerrors"
"strings"
Expand All @@ -22,19 +21,18 @@ var log = logging.Logger("ipfs-client")
var _ dagpoolcli.PoolClient = (*PoolClient)(nil)

type PoolClient struct {
sh *rpc.HttpApi
api *rpc.HttpApi
addr string
enablePin bool
}

// NewPoolClient new a dagPoolClient
func NewPoolClient(addr string, enablePin bool) (*PoolClient, error) {
sh, err := rpc.NewApi(ma.StringCast(addr))
func NewPoolClient(api *rpc.HttpApi, enablePin bool) (*PoolClient, error) {
return &PoolClient{
sh: sh,
api: api,
addr: "",
enablePin: enablePin,
}, err
}, nil
}
func (i *PoolClient) Close(ctx context.Context) {
return
Expand All @@ -57,7 +55,7 @@ func (i *PoolClient) Has(ctx context.Context, cid cid.Cid) (bool, error) {

func (i *PoolClient) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
log.Debugf(cid.String())
node, err := i.sh.Dag().Get(ctx, cid)
node, err := i.api.Dag().Get(ctx, cid)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, format.ErrNotFound{Cid: cid}
Expand All @@ -69,14 +67,14 @@ func (i *PoolClient) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

func (i *PoolClient) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
log.Debugf(cid.String())
stat, err := i.sh.Block().Stat(ctx, path.IpfsPath(cid))
stat, err := i.api.Block().Stat(ctx, path.IpfsPath(cid))
return stat.Size(), err
}

func (i *PoolClient) Put(ctx context.Context, block blocks.Block) error {
cidBuilder, _ := merkledag.PrefixForCidVersion(0)
cidCodec := multicodec.Code(cidBuilder.Codec).String()
_, err := i.sh.Block().Put(ctx, bytes.NewReader(block.RawData()),
_, err := i.api.Block().Put(ctx, bytes.NewReader(block.RawData()),
options.Block.Hash(cidBuilder.MhType, cidBuilder.MhLength),
options.Block.CidCodec(cidCodec),
options.Block.Format("v0"))
Expand Down
49 changes: 18 additions & 31 deletions internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (
"github.com/dustin/go-humanize"
dagpoolcli "github.com/filedag-project/filedag-storage/dag/pool/client"
"github.com/google/uuid"
iface "github.com/ipfs/boxo/coreiface"
"github.com/ipfs/boxo/coreiface/path"
"github.com/ipfs/boxo/files"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-merkledag"
ufsio "github.com/ipfs/go-unixfs/io"
"github.com/klauspost/readahead"
pool "github.com/libp2p/go-buffer-pool"
"github.com/ipfs/kubo/client/rpc"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem"
"github.com/syndtr/goleveldb/leveldb"
"github.com/yann-y/fds/dag/pool/ipfs"
"github.com/yann-y/fds/internal/consts"
"github.com/yann-y/fds/internal/datatypes"
"github.com/yann-y/fds/internal/lock"
Expand Down Expand Up @@ -66,6 +66,7 @@ var ErrBucketNotEmpty = errors.New("bucket not empty")

// StorageSys store sys
type StorageSys struct {
Api *rpc.HttpApi
Db *uleveldb.ULevelDB
DagPool ipld.DAGService
CidBuilder cid.Builder
Expand All @@ -78,9 +79,10 @@ type StorageSys struct {
}

// NewStorageSys new a storage sys
func NewStorageSys(ctx context.Context, dagService ipld.DAGService, db *uleveldb.ULevelDB) *StorageSys {
func NewStorageSys(ctx context.Context, dagService ipld.DAGService, api *rpc.HttpApi, db *uleveldb.ULevelDB) *StorageSys {
cidBuilder, _ := merkledag.PrefixForCidVersion(0)
s := &StorageSys{
Api: api,
Db: db,
DagPool: dagService,
CidBuilder: cidBuilder,
Expand Down Expand Up @@ -120,30 +122,10 @@ func (s *StorageSys) SetHasBucket(hasBucket func(ctx context.Context, bucket str
}

func (s *StorageSys) store(ctx context.Context, reader io.ReadCloser, size int64) (cid.Cid, error) {
data := io.Reader(reader)
if size > bigFileThreshold {
// We use 2 buffers, so we always have a full buffer of input.
bufA := pool.Get(chunkSize)
bufB := pool.Get(chunkSize)
defer pool.Put(bufA)
defer pool.Put(bufB)
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:chunkSize], bufB[:chunkSize]})
if err == nil {
data = ra
defer ra.Close()
} else {
log.Infof("readahead.NewReaderBuffer failed, error: %v", err)
}
}
node, err := ipfs.BalanceNode(data, s.DagPool, s.CidBuilder)
node, err := s.Api.Unixfs().Add(ctx, files.NewReaderFile(reader))
if err != nil {
return cid.Undef, err
}
select {
case <-ctx.Done():
return cid.Undef, ctx.Err()
default:
}
return node.Cid(), nil
}

Expand Down Expand Up @@ -304,15 +286,20 @@ func (s *StorageSys) GetObject(ctx context.Context, bucket, object string) (Obje
if err != nil {
return ObjectInfo{}, nil, err
}
dagNode, err := s.DagPool.Get(ctx, meatCid)
f, err := s.Api.Unixfs().Get(ctx, path.IpfsPath(meatCid))
if err != nil {
return ObjectInfo{}, nil, err
}
reader, err := ufsio.NewDagReader(ctx, dagNode, s.DagPool)
if err != nil {
return ObjectInfo{}, nil, err
var file files.File
switch f := f.(type) {
case files.File:
file = f
case files.Directory:
return meta, nil, iface.ErrIsDir
default:
return meta, nil, iface.ErrNotSupported
}
return meta, reader, nil
return meta, file, nil
}

func (s *StorageSys) getObjectInfo(ctx context.Context, bucket, object string) (meta ObjectInfo, err error) {
Expand Down

0 comments on commit 0c00cc6

Please sign in to comment.