Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial range request support #1388

Merged
merged 6 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/unreleased/range-requests.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: Support range header in GET requests

To allow resuming a download we now support GET requests with a range header.

https://github.com/cs3org/reva/pull/1388
https://github.com/owncloud/ocis-reva/issues/12
33 changes: 22 additions & 11 deletions internal/http/services/datagateway/datagateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"net/url"
"path"
"strconv"
"time"

"github.com/cs3org/reva/pkg/appctx"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (s *svc) doHead(w http.ResponseWriter, r *http.Request) {
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
log.Error().Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand All @@ -180,15 +181,15 @@ func (s *svc) doHead(w http.ResponseWriter, r *http.Request) {
httpClient := s.client
httpReq, err := rhttp.NewRequest(ctx, "HEAD", claims.Target, nil)
if err != nil {
log.Err(err).Msg("wrong request")
log.Error().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing HEAD request to data service")
log.Error().Err(err).Msg("error doing HEAD request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -211,7 +212,7 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
log.Error().Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand All @@ -221,30 +222,40 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
httpClient := s.client
httpReq, err := rhttp.NewRequest(ctx, "GET", claims.Target, nil)
if err != nil {
log.Err(err).Msg("wrong request")
log.Error().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing GET request to data service")
log.Error().Err(err).Msg("error doing GET request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()

copyHeader(w.Header(), httpRes.Header)
if httpRes.StatusCode != http.StatusOK {
w.WriteHeader(httpRes.StatusCode)
// TODO why do we swallow the body?
w.WriteHeader(httpRes.StatusCode)
if httpRes.StatusCode != http.StatusOK && httpRes.StatusCode != http.StatusPartialContent {
return
}

w.WriteHeader(http.StatusOK)
_, err = io.Copy(w, httpRes.Body)
var c int64
c, err = io.Copy(w, httpRes.Body)
if err != nil {
log.Err(err).Msg("error writing body after headers were sent")
log.Error().Err(err).Msg("error writing body after headers were sent")
ishank011 marked this conversation as resolved.
Show resolved Hide resolved
}
if httpRes.Header.Get("Content-Length") != "" {
i, err := strconv.ParseInt(httpRes.Header.Get("Content-Length"), 10, 64)
if err != nil {
log.Error().Err(err).Str("content-length", httpRes.Header.Get("Content-Length")).Msg("invalid content length in dataprovider response")
}
if i != c {
log.Error().Int64("content-length", i).Int64("transferred-bytes", c).Msg("content length vs transferred bytes mismatch")
}
}
}

Expand Down
35 changes: 29 additions & 6 deletions internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ import (

func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
ctx := r.Context()
ctx, span := trace.StartSpan(ctx, "head")
ctx, span := trace.StartSpan(ctx, "get")
defer span.End()

ns = applyLayout(ctx, ns)

fn := path.Join(ns, r.URL.Path)

sublog := appctx.GetLogger(ctx).With().Str("path", fn).Logger()
sublog := appctx.GetLogger(ctx).With().Str("path", fn).Str("svc", "ocdav").Str("handler", "get").Logger()

client, err := s.getClient()
if err != nil {
Expand Down Expand Up @@ -109,6 +109,11 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
return
}
httpReq.Header.Set(datagateway.TokenTransportHeader, token)

if r.Header.Get("Range") != "" {
ishank011 marked this conversation as resolved.
Show resolved Hide resolved
httpReq.Header.Set("Range", r.Header.Get("Range"))
}

httpClient := s.client

httpRes, err := httpClient.Do(httpReq)
Expand All @@ -119,8 +124,8 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
}
defer httpRes.Body.Close()

if httpRes.StatusCode != http.StatusOK {
w.WriteHeader(http.StatusInternalServerError)
if httpRes.StatusCode != http.StatusOK && httpRes.StatusCode != http.StatusPartialContent {
w.WriteHeader(httpRes.StatusCode)
return
}

Expand All @@ -133,13 +138,31 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))

if httpRes.StatusCode == http.StatusPartialContent {
w.Header().Set("Content-Range", httpRes.Header.Get("Content-Range"))
w.Header().Set("Content-Length", httpRes.Header.Get("Content-Length"))
w.WriteHeader(http.StatusPartialContent)
} else {
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))
}
/*
if md.Checksum != "" {
w.Header().Set("OC-Checksum", md.Checksum)
}
*/
if _, err := io.Copy(w, httpRes.Body); err != nil {
var c int64
if c, err = io.Copy(w, httpRes.Body); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return an error status in this case, right? Currently it would just return OK or partial content if the GET request above didn't return any error. Same for the errors below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can't because the header has already been sent and the Copy is streaming the bytes.

All we can do is log that something went wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but if the copy fails, user would still receive a 200 with corrupt file contents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. even if we would buffer everything into ram ... we cannot change the http header when the body is being sent. The best we can do is log an error.

sublog.Error().Err(err).Msg("error finishing copying data to response")
}
if httpRes.Header.Get("Content-Length") != "" {
i, err := strconv.ParseInt(httpRes.Header.Get("Content-Length"), 10, 64)
if err != nil {
sublog.Error().Err(err).Str("content-length", httpRes.Header.Get("Content-Length")).Msg("invalid content length in datagateway response")
}
if i != c {
sublog.Error().Int64("content-length", i).Int64("transferred-bytes", c).Msg("content length vs transferred bytes mismatch")
}
}
// TODO we need to send the If-Match etag in the GET to the datagateway to prevent race conditions between stating and reading the file
}
5 changes: 4 additions & 1 deletion internal/http/services/owncloud/ocdav/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (s *svc) handleHead(w http.ResponseWriter, r *http.Request, ns string) {
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))
if info.Type != provider.ResourceType_RESOURCE_TYPE_CONTAINER {
w.Header().Set("Accept-Ranges", "bytes")
}
w.WriteHeader(http.StatusOK)
}
46 changes: 8 additions & 38 deletions pkg/rhttp/datatx/manager/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@
package simple

import (
"io"
"net/http"

"github.com/pkg/errors"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rhttp/datatx"
"github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/pkg/storage"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

func init() {
Expand Down Expand Up @@ -64,42 +63,13 @@ func New(m map[string]interface{}) (datatx.DataTX, error) {

func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
ctx := r.Context()
log := appctx.GetLogger(ctx)
var fn string
files, ok := r.URL.Query()["filename"]
if !ok || len(files[0]) < 1 {
fn = r.URL.Path
} else {
fn = files[0]
}

ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}}

rc, err := fs.Download(ctx, ref)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
log.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusNotFound)
} else {
log.Err(err).Msg("datasvc: error downloading file")
w.WriteHeader(http.StatusInternalServerError)
}
return
}
defer rc.Close()

_, err = io.Copy(w, rc)
if err != nil {
log.Error().Err(err).Msg("error copying data to response")
return
}
ctx := r.Context()
sublog := appctx.GetLogger(ctx).With().Str("datatx", "simple").Logger()

switch r.Method {
case "GET", "HEAD":
download.GetOrHeadFile(w, r, fs)
case "PUT":
ctx := r.Context()
log := appctx.GetLogger(ctx)
fn := r.URL.Path
defer r.Body.Close()

Expand All @@ -111,7 +81,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
w.WriteHeader(http.StatusPartialContent)
return
}
log.Error().Err(err).Msg("error uploading file")
sublog.Error().Err(err).Msg("error uploading file")
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
37 changes: 2 additions & 35 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
package tus

import (
"io"
"net/http"

"github.com/pkg/errors"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rhttp/datatx"
"github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/pkg/storage"
"github.com/mitchellh/mapstructure"
tusd "github.com/tus/tusd/pkg/handler"
Expand Down Expand Up @@ -88,8 +86,6 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
}

h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log := appctx.GetLogger(r.Context())
log.Info().Msgf("tusd routing: path=%s", r.URL.Path)

method := r.Method
// https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override
Expand All @@ -106,37 +102,8 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
handler.PatchFile(w, r)
case "DELETE":
handler.DelFile(w, r)
// TODO(pvince81): allow for range-based requests?
case "GET":
ctx := r.Context()
log := appctx.GetLogger(ctx)
var fn string
files, ok := r.URL.Query()["filename"]
if !ok || len(files[0]) < 1 {
fn = r.URL.Path
} else {
fn = files[0]
}

ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}}

rc, err := fs.Download(ctx, ref)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
log.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusNotFound)
} else {
log.Err(err).Msg("datasvc: error downloading file")
w.WriteHeader(http.StatusInternalServerError)
}
return
}

_, err = io.Copy(w, rc)
if err != nil {
log.Error().Err(err).Msg("error copying data to response")
return
}
download.GetOrHeadFile(w, r, fs)
default:
w.WriteHeader(http.StatusNotImplemented)
}
Expand Down
Loading