Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of file operations in public folder shares #877

Merged
merged 10 commits into from
Jun 25, 2020
1 change: 0 additions & 1 deletion internal/grpc/services/gateway/publicshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (s *svc) GetPublicShareByToken(ctx context.Context, req *link.GetPublicShar
return nil, err
}

// TODO the double call is not here
res, err := driver.GetPublicShareByToken(ctx, req)
if err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,6 @@ func (s *svc) UnsetArbitraryMetadata(ctx context.Context, req *provider.UnsetArb
}

func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) {
// TODO(refs) do we need to append home to every stat request?
c, err := s.find(ctx, req.Ref)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
Expand Down
262 changes: 232 additions & 30 deletions internal/grpc/services/publicstorageprovider/publicstorageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"strings"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
link "github.com/cs3org/go-cs3apis/cs3/sharing/link/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/rgrpc/status"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"go.opencensus.io/trace"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
Expand All @@ -46,7 +48,6 @@ type config struct {
MountPath string `mapstructure:"mount_path"`
MountID string `mapstructure:"mount_id"`
GatewayAddr string `mapstructure:"gateway_addr"`
DriverAddr string `mapstructure:"driver_addr"`
}

type service struct {
Expand All @@ -60,7 +61,6 @@ func (s *service) Close() error {
}

func (s *service) UnprotectedEndpoints() []string {
// return []string{"/cs3.sharing.link.v1beta1.LinkAPI/GetPublicShareByToken"}
return []string{}
}

Expand Down Expand Up @@ -111,11 +111,123 @@ func (s *service) UnsetArbitraryMetadata(ctx context.Context, req *provider.Unse
}

func (s *service) InitiateFileDownload(ctx context.Context, req *provider.InitiateFileDownloadRequest) (*provider.InitiateFileDownloadResponse, error) {
return nil, gstatus.Errorf(codes.Unimplemented, "method not implemented")
statReq := &provider.StatRequest{Ref: req.Ref}
statRes, err := s.Stat(ctx, statReq)
if err != nil {
return &provider.InitiateFileDownloadResponse{
Status: status.NewInternal(ctx, err, "gateway: error stating ref:"+req.Ref.String()),
}, nil
}
if statRes.Status.Code != rpc.Code_CODE_OK {
if statRes.Status.Code == rpc.Code_CODE_NOT_FOUND {
return &provider.InitiateFileDownloadResponse{
Status: status.NewNotFound(ctx, "gateway: file not found"),
}, nil
}
err := status.NewErrorFromCode(statRes.Status.Code, "gateway")
return &provider.InitiateFileDownloadResponse{
Status: status.NewInternal(ctx, err, "gateway: error stating ref"),
}, nil
}

req.Opaque = statRes.Info.Opaque
return s.initiateFileDownload(ctx, req)
}

func (s *service) translatePublicRefToCS3Ref(ctx context.Context, ref *provider.Reference) (*provider.Reference, string, error) {
log := appctx.GetLogger(ctx)
tkn, relativePath, err := s.unwrap(ctx, ref)
if err != nil {
return nil, "", err
}

originalPath, err := s.pathFromToken(ctx, tkn)
if err != nil {
return nil, "", err
}

cs3Ref := &provider.Reference{
Spec: &provider.Reference_Path{Path: path.Join("/", originalPath, relativePath)},
}
log.Debug().
Interface("sourceRef", ref).
Interface("cs3Ref", cs3Ref).
Str("tkn", tkn).
Str("originalPath", originalPath).
Str("relativePath", relativePath).
Msg("translatePublicRefToCS3Ref")
return cs3Ref, tkn, nil
}

// Both, t.dir and tokenPath paths need to be merged:
// tokenPath = /oc/einstein/public-links
// t.dir = /public/ausGxuUePCOi/foldera/folderb/
// res = /public-links/foldera/folderb/
// this `res` will get then expanded taking into account the authenticated user and the storage:
// end = /einstein/files/public-links/foldera/folderb/

func (s *service) initiateFileDownload(ctx context.Context, req *provider.InitiateFileDownloadRequest) (*provider.InitiateFileDownloadResponse, error) {
cs3Ref, _, err := s.translatePublicRefToCS3Ref(ctx, req.Ref)
if err != nil {
return nil, err
}
dReq := &provider.InitiateFileDownloadRequest{
Ref: cs3Ref,
}

dRes, err := s.gateway.InitiateFileDownload(ctx, dReq)
if err != nil {
return &provider.InitiateFileDownloadResponse{
Status: status.NewInternal(ctx, err, "gateway: error calling InitiateFileDownload"),
}, nil
}

if dRes.Status.Code != rpc.Code_CODE_OK {
return &provider.InitiateFileDownloadResponse{
Status: dRes.Status,
}, nil
}

return &provider.InitiateFileDownloadResponse{
Opaque: req.Opaque,
Status: dRes.Status,
DownloadEndpoint: dRes.DownloadEndpoint,
Expose: true, // TODO set to false, leave data provider lookup to the datagateway
}, nil
}

func (s *service) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*provider.InitiateFileUploadResponse, error) {
return nil, gstatus.Errorf(codes.Unimplemented, "method not implemented")
cs3Ref, _, err := s.translatePublicRefToCS3Ref(ctx, req.Ref)
if err != nil {
return nil, err
}
uReq := &provider.InitiateFileUploadRequest{
Ref: cs3Ref,
Opaque: req.Opaque,
}

uRes, err := s.gateway.InitiateFileUpload(ctx, uReq)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "gateway: error calling InitiateFileUpload"),
}, nil
}

if uRes.Status.Code != rpc.Code_CODE_OK {
return &provider.InitiateFileUploadResponse{
Status: uRes.Status,
}, nil
}

res := &provider.InitiateFileUploadResponse{
UploadEndpoint: uRes.UploadEndpoint,
Status: uRes.Status,
AvailableChecksums: uRes.AvailableChecksums,
Opaque: uRes.Opaque,
Expose: true, // TODO set to false, leave data provider lookup to the datagateway
}

return res, nil
}

func (s *service) GetPath(ctx context.Context, req *provider.GetPathRequest) (*provider.GetPathResponse, error) {
Expand All @@ -131,15 +243,106 @@ func (s *service) CreateHome(ctx context.Context, req *provider.CreateHomeReques
}

func (s *service) CreateContainer(ctx context.Context, req *provider.CreateContainerRequest) (*provider.CreateContainerResponse, error) {
return nil, gstatus.Errorf(codes.Unimplemented, "method not implemented")
ctx, span := trace.StartSpan(ctx, "CreateContainer")
defer span.End()

span.AddAttributes(
trace.StringAttribute("ref", req.Ref.String()),
)

cs3Ref, _, err := s.translatePublicRefToCS3Ref(ctx, req.Ref)
if err != nil {
return nil, err
}

var res *provider.CreateContainerResponse
// the call has to be made to the gateway instead of the storage.
res, err = s.gateway.CreateContainer(ctx, &provider.CreateContainerRequest{
Ref: cs3Ref,
})
if err != nil {
return &provider.CreateContainerResponse{
Status: status.NewInternal(ctx, err, "gateway: error calling CreateContainer for ref:"+req.Ref.String()),
}, nil
}
if res.Status.Code == rpc.Code_CODE_INTERNAL {
return res, nil
}

return res, nil
}

func (s *service) Delete(ctx context.Context, req *provider.DeleteRequest) (*provider.DeleteResponse, error) {
return nil, gstatus.Errorf(codes.Unimplemented, "method not implemented")
ctx, span := trace.StartSpan(ctx, "Delete")
defer span.End()

span.AddAttributes(
trace.StringAttribute("ref", req.Ref.String()),
)

cs3Ref, _, err := s.translatePublicRefToCS3Ref(ctx, req.Ref)
if err != nil {
return nil, err
}

var res *provider.DeleteResponse
// the call has to be made to the gateway instead of the storage.
res, err = s.gateway.Delete(ctx, &provider.DeleteRequest{
Ref: cs3Ref,
})
if err != nil {
return &provider.DeleteResponse{
Status: status.NewInternal(ctx, err, "gateway: error calling Delete for ref:"+req.Ref.String()),
}, nil
}
if res.Status.Code == rpc.Code_CODE_INTERNAL {
return res, nil
}

return res, nil
}

func (s *service) Move(ctx context.Context, req *provider.MoveRequest) (*provider.MoveResponse, error) {
return nil, gstatus.Errorf(codes.Unimplemented, "method not implemented")
ctx, span := trace.StartSpan(ctx, "Move")
defer span.End()

span.AddAttributes(
trace.StringAttribute("source", req.Source.String()),
trace.StringAttribute("destination", req.Destination.String()),
)

cs3RefSource, tknSource, err := s.translatePublicRefToCS3Ref(ctx, req.Source)
if err != nil {
return nil, err
}
// FIXME: maybe there's a shortcut possible here using the source path
cs3RefDestination, tknDest, err := s.translatePublicRefToCS3Ref(ctx, req.Destination)
if err != nil {
return nil, err
}

if tknSource != tknDest {
return &provider.MoveResponse{
Status: status.NewInvalidArg(ctx, "Source and destination token must be the same"),
}, nil
}

var res *provider.MoveResponse
// the call has to be made to the gateway instead of the storage.
res, err = s.gateway.Move(ctx, &provider.MoveRequest{
Source: cs3RefSource,
Destination: cs3RefDestination,
})
if err != nil {
return &provider.MoveResponse{
Status: status.NewInternal(ctx, err, "gateway: error calling Move for source ref "+req.Source.String()+" to destination ref "+req.Destination.String()),
}, nil
}
if res.Status.Code == rpc.Code_CODE_INTERNAL {
return res, nil
}

return res, nil
}

func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) {
Expand All @@ -155,33 +358,30 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide
return nil, err
}

pathFromToken, err := s.pathFromToken(ctx, tkn)
originalPath, err := s.pathFromToken(ctx, tkn)
if err != nil {
return nil, err
}

statResponse, err := s.gateway.Stat(
ctx,
&provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: path.Join("/", pathFromToken, relativePath),
},
var statResponse *provider.StatResponse
// the call has to be made to the gateway instead of the storage.
statResponse, err = s.gateway.Stat(ctx, &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: path.Join("/", originalPath, relativePath),
},
})
},
})
if err != nil {
log.Error().Err(err).Msg("error during stat call")
return nil, err
return &provider.StatResponse{
Status: status.NewInternal(ctx, err, "gateway: error calling Stat for ref:"+req.Ref.String()),
}, nil
}

// we don't want to leak the path
statResponse.Info.Path = path.Join("/", tkn, relativePath)

// if statResponse.Status.Code != rpc.Code_CODE_OK {
// if statResponse.Status.Code == rpc.Code_CODE_NOT_FOUND {
// // log.Warn().Str("path", refFromToken.GetPath()).Msgf("resource: `%v` not found", refFromToken.GetId())
// }
// }
// prevent leaking internal paths
if statResponse.Info != nil {
statResponse.Info.Path = path.Join("/", tkn, relativePath)
}

return statResponse, nil
}
Expand Down Expand Up @@ -212,7 +412,9 @@ func (s *service) ListContainer(ctx context.Context, req *provider.ListContainer
},
)
if err != nil {
return nil, err
return &provider.ListContainerResponse{
Status: status.NewInternal(ctx, err, "gateway: error calling ListContainer for ref:"+req.Ref.String()),
}, nil
}

for i := range listContainerR.Infos {
Expand Down Expand Up @@ -304,9 +506,9 @@ func (s *service) trimMountPrefix(fn string) (string, error) {
return "", errors.New(fmt.Sprintf("path=%q does not belong to this storage provider mount path=%q"+fn, s.mountPath))
}

// pathFromToken returns a reference from a public share token.
// pathFromToken returns the path for the publicly shared resource.
func (s *service) pathFromToken(ctx context.Context, token string) (string, error) {
driver, err := pool.GetPublicShareProviderClient(s.conf.DriverAddr)
driver, err := pool.GetGatewayServiceClient(s.conf.GatewayAddr)
if err != nil {
return "", err
}
Expand Down