Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] propper packages for data transfers #828

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
_ "github.com/cs3org/reva/pkg/ocm/provider/authorizer/loader"
_ "github.com/cs3org/reva/pkg/ocm/share/manager/loader"
_ "github.com/cs3org/reva/pkg/publicshare/manager/loader"
_ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/loader"
_ "github.com/cs3org/reva/pkg/share/manager/loader"
_ "github.com/cs3org/reva/pkg/storage/fs/loader"
_ "github.com/cs3org/reva/pkg/storage/registry/loader"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ The configuration for the storage driver [[Ref]](https://github.com/cs3org/reva/
{{< /highlight >}}
{{% /dir %}}

{{% dir name="disable_tus" type="bool" default=false %}}
Whether to disable TUS uploads. [[Ref]](https://github.com/cs3org/reva/tree/master/internal/http/services/dataprovider/dataprovider.go#L42)
{{% dir name="datatx" type="string" default="simple" %}}
The data transfer protocol to use [[Ref]](https://github.com/cs3org/reva/tree/master/internal/http/services/dataprovider/dataprovider.go#L42)
{{< highlight toml >}}
[http.services.dataprovider]
disable_tus = false
datatx = "simple"
{{< /highlight >}}
{{% /dir %}}

{{% dir name="datatxs" type="map[string]map[string]interface{}" default="docs/config/packages/rhttp/datatx" %}}
The data transfer protocol to use [[Ref]](https://github.com/cs3org/reva/tree/master/internal/http/services/dataprovider/dataprovider.go#L43)
{{< highlight toml >}}
[http.services.dataprovider.datatxs]
"[docs/config/packages/rhttp/datatx]({{< ref "docs/config/packages/rhttp/datatx" >}})"
{{< /highlight >}}
{{% /dir %}}

11 changes: 6 additions & 5 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package storageprovider

import (
"context"
// "encoding/json"
"fmt"
"net/url"
"os"
Expand All @@ -29,7 +28,6 @@ import (
"strings"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
// link "github.com/cs3org/go-cs3apis/cs3/sharing/link/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
Expand All @@ -52,12 +50,13 @@ type config struct {
MountPath string `mapstructure:"mount_path" docs:"/;The path where the file system would be mounted."`
MountID string `mapstructure:"mount_id" docs:"-;The ID of the mounted file system."`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:docs/config/packages/storage/fs"`
TmpFolder string `mapstructure:"tmp_folder" docs:"/var/tmp;Path to temporary folder."`
DataServerURL string `mapstructure:"data_server_url" docs:"http://localhost/data;The URL for the data server."`
DataServerURL string `mapstructure:"data_server_url" docs:"simple://localhost/data;The URL for the data server."`
ExposeDataServer bool `mapstructure:"expose_data_server" docs:"false;Whether to expose data server."` // if true the client will be able to upload/download directly to it
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
EnableUploadTx bool `mapstructure:"enable_upload_tx" docs:"false;Enables upload transactions"`
AvailableXS map[string]uint32 `mapstructure:"available_checksums" docs:"nil;List of available checksums."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:docs/config/packages/storage/fs"`
}

func (c *config) init() {
Expand Down Expand Up @@ -268,7 +267,9 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
}, nil
}
url := *s.dataServerURL
if s.conf.DisableTus {

// No protocol specific code here.
if s.conf.EnableUploadTx {
url.Path = path.Join("/", url.Path, newRef.GetPath())
} else {
metadata := map[string]string{}
Expand Down
136 changes: 34 additions & 102 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,25 @@ import (
"fmt"
"net/http"

"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rhttp/datatx"
datatxregistry "github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/pkg/rhttp/global"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/mitchellh/mapstructure"
"github.com/rs/zerolog"
tusd "github.com/tus/tusd/pkg/handler"
)

func init() {
global.Register("dataprovider", New)
}

type config struct {
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:docs/config/packages/storage/fs;The configuration for the storage driver"`
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"local;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:docs/config/packages/storage/fs;The configuration for the storage driver"`
DataTX string `mapstructure:"datatx" docs:"simple;The data transfer protocol to use"`
DataTXs map[string]map[string]interface{} `mapstructure:"datatxs" docs:"url:docs/config/packages/rhttp/datatx;The data transfer protocol to use"`
}

func (c *config) init() {
Expand All @@ -51,12 +52,17 @@ func (c *config) init() {
c.Driver = "localhome"
}

if c.DataTX == "" {
c.DataTX = "simple"
}

}

type svc struct {
conf *config
handler http.Handler
storage storage.FS
datatx datatx.DataTX
}

// New returns a new datasvc
Expand All @@ -73,12 +79,21 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
return nil, err
}

datatx, err := getDataTX(conf)
if err != nil {
return nil, err
}

s := &svc{
storage: fs,
datatx: datatx,
conf: conf,
}

err = s.setHandler()
if err := s.setHandler(); err != nil {
return nil, err
}

return s, err
}

Expand All @@ -91,12 +106,6 @@ func (s *svc) Unprotected() []string {
return []string{}
}

// Create a new DataStore instance which is responsible for
// storing the uploaded file on disk in the specified directory.
// This path _must_ exist before tusd will store uploads in it.
// If you want to save them on a different medium, for example
// a remote FTP server, you can implement your own storage backend
// by implementing the tusd.DataStore interface.
func getFS(c *config) (storage.FS, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
Expand All @@ -112,96 +121,19 @@ func (s *svc) Handler() http.Handler {
return s.handler
}

// Composable is the interface that a struct needs to implement to be composable by this composer
type Composable interface {
UseIn(composer *tusd.StoreComposer)
func (s *svc) setHandler() error {
h, err := s.datatx.Handler(s.conf.Prefix, s.storage)
if err != nil {
return err
}

s.handler = h
return nil
}

func (s *svc) setHandler() (err error) {
composable, ok := s.storage.(Composable)
if ok && !s.conf.DisableTus {
// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
// we only use the file store but you may plug in multiple.
composer := tusd.NewStoreComposer()

// let the composable storage tell tus which extensions it supports
composable.UseIn(composer)

config := tusd.Config{
BasePath: s.conf.Prefix,
StoreComposer: composer,
//Logger: logger, // TODO use logger
}

handler, err := tusd.NewUnroutedHandler(config)
if err != nil {
return err
}

s.handler = handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

log := appctx.GetLogger(r.Context())
log.Info().Msgf("tusd routing: path=%s", r.URL.Path)

method := r.Method
// https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override
if r.Header.Get("X-HTTP-Method-Override") != "" {
method = r.Header.Get("X-HTTP-Method-Override")
}

switch method {
// old fashioned download.

// GET is not part of the tus.io protocol
// currently there is no way to GET an upload that is in progress
// TODO allow range based get requests? that end before the current offset
case "GET":
s.doGet(w, r)

// tus.io based upload

// uploads are initiated using the CS3 APIs Initiate Download call
case "POST":
handler.PostFile(w, r)
case "HEAD":
handler.HeadFile(w, r)
case "PATCH":
handler.PatchFile(w, r)
// PUT provides a wrapper around the POST call, to save the caller from
// the trouble of configuring the tus client.
case "PUT":
s.doTusPut(w, r)
// TODO Only attach the DELETE handler if the Terminate() method is provided
case "DELETE":
handler.DelFile(w, r)
}
}))
} else {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
method := r.Method
// https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override
if r.Header.Get("X-HTTP-Method-Override") != "" {
method = r.Header.Get("X-HTTP-Method-Override")
}

switch method {
case "HEAD":
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
}
})
func getDataTX(c *config) (datatx.DataTX, error) {
if f, ok := datatxregistry.NewFuncs[c.DataTX]; ok {
return f(c.DataTXs[c.DataTX])
}

return err
return nil, fmt.Errorf("driver not found: %s", c.DataTX)
}
112 changes: 0 additions & 112 deletions internal/http/services/dataprovider/put.go

This file was deleted.

Loading