Skip to content

Commit

Permalink
initial range request support (#1388)
Browse files Browse the repository at this point in the history
  • Loading branch information
butonic authored Jan 7, 2021
1 parent f390af8 commit c650f5b
Show file tree
Hide file tree
Showing 10 changed files with 425 additions and 127 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/range-requests.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: Support range header in GET requests

To allow resuming a download we now support GET requests with a range header.

https://github.com/cs3org/reva/pull/1388
https://github.com/owncloud/ocis-reva/issues/12
33 changes: 22 additions & 11 deletions internal/http/services/datagateway/datagateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"net/url"
"path"
"strconv"
"time"

"github.com/cs3org/reva/pkg/appctx"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (s *svc) doHead(w http.ResponseWriter, r *http.Request) {
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
log.Error().Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand All @@ -180,15 +181,15 @@ func (s *svc) doHead(w http.ResponseWriter, r *http.Request) {
httpClient := s.client
httpReq, err := rhttp.NewRequest(ctx, "HEAD", claims.Target, nil)
if err != nil {
log.Err(err).Msg("wrong request")
log.Error().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing HEAD request to data service")
log.Error().Err(err).Msg("error doing HEAD request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -211,7 +212,7 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
log.Error().Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand All @@ -221,30 +222,40 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
httpClient := s.client
httpReq, err := rhttp.NewRequest(ctx, "GET", claims.Target, nil)
if err != nil {
log.Err(err).Msg("wrong request")
log.Error().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing GET request to data service")
log.Error().Err(err).Msg("error doing GET request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()

copyHeader(w.Header(), httpRes.Header)
if httpRes.StatusCode != http.StatusOK {
w.WriteHeader(httpRes.StatusCode)
// TODO why do we swallow the body?
w.WriteHeader(httpRes.StatusCode)
if httpRes.StatusCode != http.StatusOK && httpRes.StatusCode != http.StatusPartialContent {
return
}

w.WriteHeader(http.StatusOK)
_, err = io.Copy(w, httpRes.Body)
var c int64
c, err = io.Copy(w, httpRes.Body)
if err != nil {
log.Err(err).Msg("error writing body after headers were sent")
log.Error().Err(err).Msg("error writing body after headers were sent")
}
if httpRes.Header.Get("Content-Length") != "" {
i, err := strconv.ParseInt(httpRes.Header.Get("Content-Length"), 10, 64)
if err != nil {
log.Error().Err(err).Str("content-length", httpRes.Header.Get("Content-Length")).Msg("invalid content length in dataprovider response")
}
if i != c {
log.Error().Int64("content-length", i).Int64("transferred-bytes", c).Msg("content length vs transferred bytes mismatch")
}
}
}

Expand Down
35 changes: 29 additions & 6 deletions internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ import (

func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
ctx := r.Context()
ctx, span := trace.StartSpan(ctx, "head")
ctx, span := trace.StartSpan(ctx, "get")
defer span.End()

ns = applyLayout(ctx, ns)

fn := path.Join(ns, r.URL.Path)

sublog := appctx.GetLogger(ctx).With().Str("path", fn).Logger()
sublog := appctx.GetLogger(ctx).With().Str("path", fn).Str("svc", "ocdav").Str("handler", "get").Logger()

client, err := s.getClient()
if err != nil {
Expand Down Expand Up @@ -109,6 +109,11 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
return
}
httpReq.Header.Set(datagateway.TokenTransportHeader, token)

if r.Header.Get("Range") != "" {
httpReq.Header.Set("Range", r.Header.Get("Range"))
}

httpClient := s.client

httpRes, err := httpClient.Do(httpReq)
Expand All @@ -119,8 +124,8 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
}
defer httpRes.Body.Close()

if httpRes.StatusCode != http.StatusOK {
w.WriteHeader(http.StatusInternalServerError)
if httpRes.StatusCode != http.StatusOK && httpRes.StatusCode != http.StatusPartialContent {
w.WriteHeader(httpRes.StatusCode)
return
}

Expand All @@ -133,13 +138,31 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))

if httpRes.StatusCode == http.StatusPartialContent {
w.Header().Set("Content-Range", httpRes.Header.Get("Content-Range"))
w.Header().Set("Content-Length", httpRes.Header.Get("Content-Length"))
w.WriteHeader(http.StatusPartialContent)
} else {
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))
}
/*
if md.Checksum != "" {
w.Header().Set("OC-Checksum", md.Checksum)
}
*/
if _, err := io.Copy(w, httpRes.Body); err != nil {
var c int64
if c, err = io.Copy(w, httpRes.Body); err != nil {
sublog.Error().Err(err).Msg("error finishing copying data to response")
}
if httpRes.Header.Get("Content-Length") != "" {
i, err := strconv.ParseInt(httpRes.Header.Get("Content-Length"), 10, 64)
if err != nil {
sublog.Error().Err(err).Str("content-length", httpRes.Header.Get("Content-Length")).Msg("invalid content length in datagateway response")
}
if i != c {
sublog.Error().Int64("content-length", i).Int64("transferred-bytes", c).Msg("content length vs transferred bytes mismatch")
}
}
// TODO we need to send the If-Match etag in the GET to the datagateway to prevent race conditions between stating and reading the file
}
5 changes: 4 additions & 1 deletion internal/http/services/owncloud/ocdav/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (s *svc) handleHead(w http.ResponseWriter, r *http.Request, ns string) {
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))
if info.Type != provider.ResourceType_RESOURCE_TYPE_CONTAINER {
w.Header().Set("Accept-Ranges", "bytes")
}
w.WriteHeader(http.StatusOK)
}
46 changes: 8 additions & 38 deletions pkg/rhttp/datatx/manager/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@
package simple

import (
"io"
"net/http"

"github.com/pkg/errors"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rhttp/datatx"
"github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/pkg/storage"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

func init() {
Expand Down Expand Up @@ -64,42 +63,13 @@ func New(m map[string]interface{}) (datatx.DataTX, error) {

func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
ctx := r.Context()
log := appctx.GetLogger(ctx)
var fn string
files, ok := r.URL.Query()["filename"]
if !ok || len(files[0]) < 1 {
fn = r.URL.Path
} else {
fn = files[0]
}

ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}}

rc, err := fs.Download(ctx, ref)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
log.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusNotFound)
} else {
log.Err(err).Msg("datasvc: error downloading file")
w.WriteHeader(http.StatusInternalServerError)
}
return
}
defer rc.Close()

_, err = io.Copy(w, rc)
if err != nil {
log.Error().Err(err).Msg("error copying data to response")
return
}
ctx := r.Context()
sublog := appctx.GetLogger(ctx).With().Str("datatx", "simple").Logger()

switch r.Method {
case "GET", "HEAD":
download.GetOrHeadFile(w, r, fs)
case "PUT":
ctx := r.Context()
log := appctx.GetLogger(ctx)
fn := r.URL.Path
defer r.Body.Close()

Expand All @@ -111,7 +81,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
w.WriteHeader(http.StatusPartialContent)
return
}
log.Error().Err(err).Msg("error uploading file")
sublog.Error().Err(err).Msg("error uploading file")
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
37 changes: 2 additions & 35 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
package tus

import (
"io"
"net/http"

"github.com/pkg/errors"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rhttp/datatx"
"github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/pkg/storage"
"github.com/mitchellh/mapstructure"
tusd "github.com/tus/tusd/pkg/handler"
Expand Down Expand Up @@ -88,8 +86,6 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
}

h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log := appctx.GetLogger(r.Context())
log.Info().Msgf("tusd routing: path=%s", r.URL.Path)

method := r.Method
// https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override
Expand All @@ -106,37 +102,8 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
handler.PatchFile(w, r)
case "DELETE":
handler.DelFile(w, r)
// TODO(pvince81): allow for range-based requests?
case "GET":
ctx := r.Context()
log := appctx.GetLogger(ctx)
var fn string
files, ok := r.URL.Query()["filename"]
if !ok || len(files[0]) < 1 {
fn = r.URL.Path
} else {
fn = files[0]
}

ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}}

rc, err := fs.Download(ctx, ref)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
log.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusNotFound)
} else {
log.Err(err).Msg("datasvc: error downloading file")
w.WriteHeader(http.StatusInternalServerError)
}
return
}

_, err = io.Copy(w, rc)
if err != nil {
log.Error().Err(err).Msg("error copying data to response")
return
}
download.GetOrHeadFile(w, r, fs)
default:
w.WriteHeader(http.StatusNotImplemented)
}
Expand Down
Loading

0 comments on commit c650f5b

Please sign in to comment.