From 0c00cc6b99075c8d4673a9ee3ff1c4af21877170 Mon Sep 17 00:00:00 2001 From: yann Date: Thu, 26 Oct 2023 13:01:10 +0800 Subject: [PATCH] feat:add get quic --- cmd/gateway/server.go | 11 ++++++--- dag/pool/ipfs/client.go | 16 ++++++-------- internal/store/store.go | 49 +++++++++++++++-------------------------- 3 files changed, 33 insertions(+), 43 deletions(-) diff --git a/cmd/gateway/server.go b/cmd/gateway/server.go index 2d55adb..4b3b879 100644 --- a/cmd/gateway/server.go +++ b/cmd/gateway/server.go @@ -8,6 +8,8 @@ import ( "github.com/gorilla/mux" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-merkledag" + "github.com/ipfs/kubo/client/rpc" + ma "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" dagpool "github.com/yann-y/fds/dag/pool/ipfs" "github.com/yann-y/fds/internal/iam" @@ -60,14 +62,17 @@ func startServer(cctx *cli.Context) { } defer db.Close() router := mux.NewRouter() - //poolClient, err := dagpoolcli.NewPoolClient(poolAddr, poolUser, poolPassword, true) - poolClient, err := dagpool.NewPoolClient(poolAddr, true) + kuboApi, err := rpc.NewApi(ma.StringCast(poolAddr)) + if err != nil { + log.Fatal(err) + } + poolClient, err := dagpool.NewPoolClient(kuboApi, true) if err != nil { log.Fatalf("connect dagpool server err: %v", err) } defer poolClient.Close(context.TODO()) dagServ := merkledag.NewDAGService(dagpoolcli.NewBlockService(poolClient)) - storageSys := store.NewStorageSys(cctx.Context, dagServ, db) + storageSys := store.NewStorageSys(cctx.Context, dagServ, kuboApi, db) authSys := iam.NewAuthSys(db, cred) bmSys := store.NewBucketMetadataSys(db) storageSys.SetNewBucketNSLock(bmSys.NewNSLock) diff --git a/dag/pool/ipfs/client.go b/dag/pool/ipfs/client.go index fd3785d..318ca87 100644 --- a/dag/pool/ipfs/client.go +++ b/dag/pool/ipfs/client.go @@ -12,7 +12,6 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-merkledag" "github.com/ipfs/kubo/client/rpc" - ma "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multicodec" "golang.org/x/xerrors" "strings" @@ -22,19 +21,18 @@ var log = logging.Logger("ipfs-client") var _ dagpoolcli.PoolClient = (*PoolClient)(nil) type PoolClient struct { - sh *rpc.HttpApi + api *rpc.HttpApi addr string enablePin bool } // NewPoolClient new a dagPoolClient -func NewPoolClient(addr string, enablePin bool) (*PoolClient, error) { - sh, err := rpc.NewApi(ma.StringCast(addr)) +func NewPoolClient(api *rpc.HttpApi, enablePin bool) (*PoolClient, error) { return &PoolClient{ - sh: sh, + api: api, addr: "", enablePin: enablePin, - }, err + }, nil } func (i *PoolClient) Close(ctx context.Context) { return @@ -57,7 +55,7 @@ func (i *PoolClient) Has(ctx context.Context, cid cid.Cid) (bool, error) { func (i *PoolClient) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { log.Debugf(cid.String()) - node, err := i.sh.Dag().Get(ctx, cid) + node, err := i.api.Dag().Get(ctx, cid) if err != nil { if strings.Contains(err.Error(), "not found") { return nil, format.ErrNotFound{Cid: cid} @@ -69,14 +67,14 @@ func (i *PoolClient) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) func (i *PoolClient) GetSize(ctx context.Context, cid cid.Cid) (int, error) { log.Debugf(cid.String()) - stat, err := i.sh.Block().Stat(ctx, path.IpfsPath(cid)) + stat, err := i.api.Block().Stat(ctx, path.IpfsPath(cid)) return stat.Size(), err } func (i *PoolClient) Put(ctx context.Context, block blocks.Block) error { cidBuilder, _ := merkledag.PrefixForCidVersion(0) cidCodec := multicodec.Code(cidBuilder.Codec).String() - _, err := i.sh.Block().Put(ctx, bytes.NewReader(block.RawData()), + _, err := i.api.Block().Put(ctx, bytes.NewReader(block.RawData()), options.Block.Hash(cidBuilder.MhType, cidBuilder.MhLength), options.Block.CidCodec(cidCodec), options.Block.Format("v0")) diff --git a/internal/store/store.go b/internal/store/store.go index 870a1a3..83b9084 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -8,17 +8,17 @@ import ( "github.com/dustin/go-humanize" dagpoolcli "github.com/filedag-project/filedag-storage/dag/pool/client" "github.com/google/uuid" + iface "github.com/ipfs/boxo/coreiface" + "github.com/ipfs/boxo/coreiface/path" + "github.com/ipfs/boxo/files" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-merkledag" - ufsio "github.com/ipfs/go-unixfs/io" - "github.com/klauspost/readahead" - pool "github.com/libp2p/go-buffer-pool" + "github.com/ipfs/kubo/client/rpc" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/mem" "github.com/syndtr/goleveldb/leveldb" - "github.com/yann-y/fds/dag/pool/ipfs" "github.com/yann-y/fds/internal/consts" "github.com/yann-y/fds/internal/datatypes" "github.com/yann-y/fds/internal/lock" @@ -66,6 +66,7 @@ var ErrBucketNotEmpty = errors.New("bucket not empty") // StorageSys store sys type StorageSys struct { + Api *rpc.HttpApi Db *uleveldb.ULevelDB DagPool ipld.DAGService CidBuilder cid.Builder @@ -78,9 +79,10 @@ type StorageSys struct { } // NewStorageSys new a storage sys -func NewStorageSys(ctx context.Context, dagService ipld.DAGService, db *uleveldb.ULevelDB) *StorageSys { +func NewStorageSys(ctx context.Context, dagService ipld.DAGService, api *rpc.HttpApi, db *uleveldb.ULevelDB) *StorageSys { cidBuilder, _ := merkledag.PrefixForCidVersion(0) s := &StorageSys{ + Api: api, Db: db, DagPool: dagService, CidBuilder: cidBuilder, @@ -120,30 +122,10 @@ func (s *StorageSys) SetHasBucket(hasBucket func(ctx context.Context, bucket str } func (s *StorageSys) store(ctx context.Context, reader io.ReadCloser, size int64) (cid.Cid, error) { - data := io.Reader(reader) - if size > bigFileThreshold { - // We use 2 buffers, so we always have a full buffer of input. - bufA := pool.Get(chunkSize) - bufB := pool.Get(chunkSize) - defer pool.Put(bufA) - defer pool.Put(bufB) - ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:chunkSize], bufB[:chunkSize]}) - if err == nil { - data = ra - defer ra.Close() - } else { - log.Infof("readahead.NewReaderBuffer failed, error: %v", err) - } - } - node, err := ipfs.BalanceNode(data, s.DagPool, s.CidBuilder) + node, err := s.Api.Unixfs().Add(ctx, files.NewReaderFile(reader)) if err != nil { return cid.Undef, err } - select { - case <-ctx.Done(): - return cid.Undef, ctx.Err() - default: - } return node.Cid(), nil } @@ -304,15 +286,20 @@ func (s *StorageSys) GetObject(ctx context.Context, bucket, object string) (Obje if err != nil { return ObjectInfo{}, nil, err } - dagNode, err := s.DagPool.Get(ctx, meatCid) + f, err := s.Api.Unixfs().Get(ctx, path.IpfsPath(meatCid)) if err != nil { return ObjectInfo{}, nil, err } - reader, err := ufsio.NewDagReader(ctx, dagNode, s.DagPool) - if err != nil { - return ObjectInfo{}, nil, err + var file files.File + switch f := f.(type) { + case files.File: + file = f + case files.Directory: + return meta, nil, iface.ErrIsDir + default: + return meta, nil, iface.ErrNotSupported } - return meta, reader, nil + return meta, file, nil } func (s *StorageSys) getObjectInfo(ctx context.Context, bucket, object string) (meta ObjectInfo, err error) {