Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data transfers package #1321

Merged
merged 10 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions changelog/unreleased/http-datatx.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Enhancement: Add support for multiple data transfer protocols

Previously, we had to configure which data transfer protocol to use in the
dataprovider service. A previous PR added the functionality to redirect requests
to different handlers based on the request method but that would lead to
conflicts if multiple protocols don't support mutually exclusive sets of
requests. This PR adds the functionality to have multiple such handlers
simultaneously and the client can choose which protocol to use.

https://github.com/cs3org/reva/pull/1321
https://github.com/cs3org/reva/pull/1285/
39 changes: 29 additions & 10 deletions cmd/reva/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"time"

"github.com/cheggaaa/pb"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rhttp"
Expand Down Expand Up @@ -86,23 +86,28 @@ func downloadCommand() *command {
return formatError(res.Status)
}

p, err := getDownloadProtocolInfo(res.Protocols, "simple")
if err != nil {
return err
}

// TODO(labkode): upload to data server
fmt.Printf("Downloading from: %s\n", res.DownloadEndpoint)
fmt.Printf("Downloading from: %s\n", p.DownloadEndpoint)

content, err := checkDownloadWebdavRef(res.DownloadEndpoint, res.Opaque)
content, err := checkDownloadWebdavRef(res.Protocols)
if err != nil {
if _, ok := err.(errtypes.IsNotSupported); !ok {
return err
}

dataServerURL := res.DownloadEndpoint
dataServerURL := p.DownloadEndpoint
// TODO(labkode): do a protocol switch
httpReq, err := rhttp.NewRequest(ctx, "GET", dataServerURL, nil)
if err != nil {
return err
}

httpReq.Header.Set(datagateway.TokenTransportHeader, res.Token)
httpReq.Header.Set(datagateway.TokenTransportHeader, p.Token)
httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
// TODO make insecure configurable
Expand Down Expand Up @@ -145,13 +150,27 @@ func downloadCommand() *command {
return cmd
}

func checkDownloadWebdavRef(endpoint string, opaque *typespb.Opaque) (io.Reader, error) {
if opaque == nil {
func getDownloadProtocolInfo(protocolInfos []*gateway.FileDownloadProtocol, protocol string) (*gateway.FileDownloadProtocol, error) {
for _, p := range protocolInfos {
if p.Protocol == protocol {
return p, nil
}
}
return nil, errtypes.NotFound(protocol)
}

func checkDownloadWebdavRef(protocols []*gateway.FileDownloadProtocol) (io.Reader, error) {
p, err := getDownloadProtocolInfo(protocols, "simple")
if err != nil {
return nil, err
}

if p.Opaque == nil {
return nil, errtypes.NotSupported("opaque object not defined")
}

var token string
tokenOpaque, ok := opaque.Map["webdav-token"]
tokenOpaque, ok := p.Opaque.Map["webdav-token"]
if !ok {
return nil, errtypes.NotSupported("webdav token not defined")
}
Expand All @@ -163,7 +182,7 @@ func checkDownloadWebdavRef(endpoint string, opaque *typespb.Opaque) (io.Reader,
}

var filePath string
fileOpaque, ok := opaque.Map["webdav-file-path"]
fileOpaque, ok := p.Opaque.Map["webdav-file-path"]
if !ok {
return nil, errtypes.NotSupported("webdav file path not defined")
}
Expand All @@ -174,7 +193,7 @@ func checkDownloadWebdavRef(endpoint string, opaque *typespb.Opaque) (io.Reader,
return nil, errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder)
}

c := gowebdav.NewClient(endpoint, "", "")
c := gowebdav.NewClient(p.DownloadEndpoint, "", "")
c.SetHeader(tokenpkg.TokenHeader, token)

reader, err := c.ReadStream(filePath)
Expand Down
58 changes: 37 additions & 21 deletions cmd/reva/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/pkg/errors"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
Expand All @@ -50,11 +51,11 @@ func uploadCommand() *command {
cmd := newCommand("upload")
cmd.Description = func() string { return "upload a local file to the remote server" }
cmd.Usage = func() string { return "Usage: upload [-flags] <file_name> <remote_target>" }
disableTusFlag := cmd.Bool("disable-tus", false, "whether to disable tus protocol")
protocolFlag := cmd.String("protocol", "tus", "the protocol to be used for uploads")
xsFlag := cmd.String("xs", "negotiate", "compute checksum")

cmd.ResetFlags = func() {
*disableTusFlag, *xsFlag = false, "negotiate"
*protocolFlag, *xsFlag = "tus", "negotiate"
}

cmd.Action = func(w ...io.Writer) error {
Expand Down Expand Up @@ -115,19 +116,23 @@ func uploadCommand() *command {
return formatError(res.Status)
}

// TODO(labkode): upload to data server
fmt.Printf("Data server: %s\n", res.UploadEndpoint)
fmt.Printf("Allowed checksums: %+v\n", res.AvailableChecksums)

if err = checkUploadWebdavRef(res.UploadEndpoint, res.Opaque, md, fd); err != nil {
if err = checkUploadWebdavRef(res.Protocols, md, fd); err != nil {
if _, ok := err.(errtypes.IsNotSupported); !ok {
return err
}
} else {
return nil
}

xsType, err := guessXS(*xsFlag, res.AvailableChecksums)
p, err := getUploadProtocolInfo(res.Protocols, *protocolFlag)
if err != nil {
return err
}

fmt.Printf("Data server: %s\n", p.UploadEndpoint)
fmt.Printf("Allowed checksums: %+v\n", p.AvailableChecksums)

xsType, err := guessXS(*xsFlag, p.AvailableChecksums)
if err != nil {
return err
}
Expand All @@ -144,15 +149,15 @@ func uploadCommand() *command {
return err
}

dataServerURL := res.UploadEndpoint
dataServerURL := p.UploadEndpoint

if *disableTusFlag {
if *protocolFlag == "simple" {
httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, fd)
if err != nil {
return err
}

httpReq.Header.Set(datagateway.TokenTransportHeader, res.Token)
httpReq.Header.Set(datagateway.TokenTransportHeader, p.Token)
q := httpReq.URL.Query()
q.Add("xs", xs)
q.Add("xs_type", storageprovider.GRPC2PKGXS(xsType).String())
Expand All @@ -178,9 +183,7 @@ func uploadCommand() *command {
if token, ok := tokenpkg.ContextGetToken(ctx); ok {
c.Header.Add(tokenpkg.TokenHeader, token)
}
if res.Token != "" {
c.Header.Add(datagateway.TokenTransportHeader, res.Token)
}
c.Header.Add(datagateway.TokenTransportHeader, p.Token)
tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
return err
Expand Down Expand Up @@ -233,13 +236,27 @@ func uploadCommand() *command {
return cmd
}

func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInfo, fd *os.File) error {
if opaque == nil {
func getUploadProtocolInfo(protocolInfos []*gateway.FileUploadProtocol, protocol string) (*gateway.FileUploadProtocol, error) {
for _, p := range protocolInfos {
if p.Protocol == protocol {
return p, nil
}
}
return nil, errtypes.NotFound(protocol)
}

func checkUploadWebdavRef(protocols []*gateway.FileUploadProtocol, md os.FileInfo, fd *os.File) error {
p, err := getUploadProtocolInfo(protocols, "simple")
if err != nil {
return err
}

if p.Opaque == nil {
return errtypes.NotSupported("opaque object not defined")
}

var token string
tokenOpaque, ok := opaque.Map["webdav-token"]
tokenOpaque, ok := p.Opaque.Map["webdav-token"]
if !ok {
return errtypes.NotSupported("webdav token not defined")
}
Expand All @@ -251,7 +268,7 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf
}

var filePath string
fileOpaque, ok := opaque.Map["webdav-file-path"]
fileOpaque, ok := p.Opaque.Map["webdav-file-path"]
if !ok {
return errtypes.NotSupported("webdav file path not defined")
}
Expand All @@ -262,12 +279,11 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf
return errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder)
}

c := gowebdav.NewClient(endpoint, "", "")
c := gowebdav.NewClient(p.UploadEndpoint, "", "")
c.SetHeader(tokenpkg.TokenHeader, token)
c.SetHeader("Upload-Length", strconv.FormatInt(md.Size(), 10))

err := c.WriteStream(filePath, fd, 0700)
if err != nil {
if err = c.WriteStream(filePath, fd, 0700); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
_ "github.com/cs3org/reva/pkg/ocm/provider/authorizer/loader"
_ "github.com/cs3org/reva/pkg/ocm/share/manager/loader"
_ "github.com/cs3org/reva/pkg/publicshare/manager/loader"
_ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/loader"
_ "github.com/cs3org/reva/pkg/share/manager/loader"
_ "github.com/cs3org/reva/pkg/storage/fs/loader"
_ "github.com/cs3org/reva/pkg/storage/registry/loader"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e
github.com/cs3org/go-cs3apis v0.0.0-20201007120910-416ed6cf8b00
github.com/cs3org/go-cs3apis v0.0.0-20201118090759-87929f5bae21
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59
github.com/go-ldap/ldap/v3 v3.2.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJff
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4=
github.com/cs3org/go-cs3apis v0.0.0-20201007120910-416ed6cf8b00 h1:LVl25JaflluOchVvaHWtoCynm5OaM+VNai0IYkcCSe0=
github.com/cs3org/go-cs3apis v0.0.0-20201007120910-416ed6cf8b00/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/go-cs3apis v0.0.0-20201118090759-87929f5bae21 h1:mZpylrgnCgSeaZ5EznvHIPIKuaQHMHZDi2wkJtk4M8Y=
github.com/cs3org/go-cs3apis v0.0.0-20201118090759-87929f5bae21/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
Loading