Skip to content

Commit

Permalink
initial range request support
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Jan 6, 2021
1 parent 242175e commit f2de9a7
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 36 deletions.
33 changes: 22 additions & 11 deletions internal/http/services/datagateway/datagateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"net/url"
"path"
"strconv"
"time"

"github.com/cs3org/reva/pkg/appctx"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (s *svc) doHead(w http.ResponseWriter, r *http.Request) {
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
log.Error().Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand All @@ -180,15 +181,15 @@ func (s *svc) doHead(w http.ResponseWriter, r *http.Request) {
httpClient := s.client
httpReq, err := rhttp.NewRequest(ctx, "HEAD", claims.Target, nil)
if err != nil {
log.Err(err).Msg("wrong request")
log.Error().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing HEAD request to data service")
log.Error().Err(err).Msg("error doing HEAD request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -211,7 +212,7 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
log.Error().Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand All @@ -221,30 +222,40 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
httpClient := s.client
httpReq, err := rhttp.NewRequest(ctx, "GET", claims.Target, nil)
if err != nil {
log.Err(err).Msg("wrong request")
log.Error().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing GET request to data service")
log.Error().Err(err).Msg("error doing GET request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()

copyHeader(w.Header(), httpRes.Header)
if httpRes.StatusCode != http.StatusOK {
w.WriteHeader(httpRes.StatusCode)
// TODO why do we swallow the body?
w.WriteHeader(httpRes.StatusCode)
if httpRes.StatusCode != http.StatusOK && httpRes.StatusCode != http.StatusPartialContent {
return
}

w.WriteHeader(http.StatusOK)
_, err = io.Copy(w, httpRes.Body)
var c int64
c, err = io.Copy(w, httpRes.Body)
if err != nil {
log.Err(err).Msg("error writing body after headers were sent")
log.Error().Err(err).Msg("error writing body after headers were sent")
}
if httpRes.Header.Get("Content-Length") != "" {
i, err := strconv.ParseInt(httpRes.Header.Get("Content-Length"), 10, 64)
if err != nil {
log.Error().Err(err).Str("content-length", httpRes.Header.Get("Content-Length")).Msg("invalid content length in dataprovider response")
}
if i != c {
log.Error().Int64("content-length", i).Int64("transferred-bytes", c).Msg("content length vs transferred bytes mismatch")
}
}
}

Expand Down
33 changes: 28 additions & 5 deletions internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ import (

func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
ctx := r.Context()
ctx, span := trace.StartSpan(ctx, "head")
ctx, span := trace.StartSpan(ctx, "get")
defer span.End()

ns = applyLayout(ctx, ns)

fn := path.Join(ns, r.URL.Path)

sublog := appctx.GetLogger(ctx).With().Str("path", fn).Logger()
sublog := appctx.GetLogger(ctx).With().Str("path", fn).Str("svc", "ocdav").Str("handler", "get").Logger()

client, err := s.getClient()
if err != nil {
Expand Down Expand Up @@ -109,6 +109,11 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
return
}
httpReq.Header.Set(datagateway.TokenTransportHeader, token)

if r.Header.Get("Range") != "" {
httpReq.Header.Set("Range", r.Header.Get("Range"))
}

httpClient := s.client

httpRes, err := httpClient.Do(httpReq)
Expand All @@ -119,7 +124,7 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
}
defer httpRes.Body.Close()

if httpRes.StatusCode != http.StatusOK {
if httpRes.StatusCode != http.StatusOK && httpRes.StatusCode != http.StatusPartialContent {
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -133,13 +138,31 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))

if httpRes.StatusCode == http.StatusPartialContent {
w.Header().Set("Content-Range", httpRes.Header.Get("Content-Range"))
w.Header().Set("Content-Length", httpRes.Header.Get("Content-Length"))
w.WriteHeader(http.StatusPartialContent)
} else {
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))
}
/*
if md.Checksum != "" {
w.Header().Set("OC-Checksum", md.Checksum)
}
*/
if _, err := io.Copy(w, httpRes.Body); err != nil {
var c int64
if c, err = io.Copy(w, httpRes.Body); err != nil {
sublog.Error().Err(err).Msg("error finishing copying data to response")
}
if httpRes.Header.Get("Content-Length") != "" {
i, err := strconv.ParseInt(httpRes.Header.Get("Content-Length"), 10, 64)
if err != nil {
sublog.Error().Err(err).Str("content-length", httpRes.Header.Get("Content-Length")).Msg("invalid content length in datagateway response")
}
if i != c {
sublog.Error().Int64("content-length", i).Int64("transferred-bytes", c).Msg("content length vs transferred bytes mismatch")
}
}
// TODO we need to send the If-Match etag in the GET to the datagateway to prevent race conditions between stating and reading the file
}
5 changes: 4 additions & 1 deletion internal/http/services/owncloud/ocdav/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (s *svc) handleHead(w http.ResponseWriter, r *http.Request, ns string) {
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))
if info.Type != provider.ResourceType_RESOURCE_TYPE_CONTAINER {
w.Header().Set("Accept-Ranges", "bytes")
}
w.WriteHeader(http.StatusOK)
}
84 changes: 73 additions & 11 deletions pkg/rhttp/datatx/manager/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package simple

import (
"fmt"
"io"
"net/http"

"github.com/pkg/errors"
"strconv"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
Expand All @@ -31,6 +31,7 @@ import (
"github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/pkg/storage"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

func init() {
Expand Down Expand Up @@ -64,10 +65,11 @@ func New(m map[string]interface{}) (datatx.DataTX, error) {

func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
sublog := appctx.GetLogger(ctx).With().Str("datatx", "simple").Logger()

switch r.Method {
case "GET":
ctx := r.Context()
log := appctx.GetLogger(ctx)
var fn string
files, ok := r.URL.Query()["filename"]
if !ok || len(files[0]) < 1 {
Expand All @@ -78,28 +80,88 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {

ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}}

// TODO check If-Range condition

var ranges []datatx.HTTPRange
var md *provider.ResourceInfo
var err error
if r.Header.Get("Range") != "" {
md, err = fs.GetMD(ctx, ref, nil)
switch err.(type) {
case nil:
ranges, err = datatx.ParseRange(r.Header.Get("Range"), int64(md.Size))
if err != nil || len(ranges) > 1 { // we currently only support one range
if err == datatx.ErrNoOverlap {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", md.Size))
}
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprintln(w, err)
return
}
w.Header().Set("Content-Range", datatx.FormatRange(ranges[0], md.Size))
case errtypes.IsNotFound:
sublog.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusNotFound)
return
case errtypes.IsPermissionDenied:
sublog.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusForbidden)
return
default:
sublog.Error().Err(err).Msg("datasvc: error downloading file")
w.WriteHeader(http.StatusInternalServerError)
return
}
}

// TODO always do a stat to set a Content-Length header

rc, err := fs.Download(ctx, ref)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
log.Debug().Err(err).Msg("datasvc: file not found")
sublog.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusNotFound)
} else {
log.Err(err).Msg("datasvc: error downloading file")
sublog.Error().Err(err).Msg("datasvc: error downloading file")
w.WriteHeader(http.StatusInternalServerError)
}
return
}
defer rc.Close()

_, err = io.Copy(w, rc)
var c int64

if len(ranges) > 0 {
sublog.Debug().Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("datasvc: range request")
var s io.Seeker
if s, ok = rc.(io.Seeker); !ok {
sublog.Error().Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("datasvc: ReadCloser is not seekable")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
if _, err = s.Seek(ranges[0].Start, io.SeekStart); err != nil {
sublog.Error().Err(err).Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("datasvc: could not seek for range request")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
w.Header().Set("Content-Range", datatx.FormatRange(ranges[0], md.Size)) // md cannot be null because we did a stat for the range request
w.Header().Set("Content-Length", strconv.FormatInt(ranges[0].Length, 10))
w.WriteHeader(http.StatusPartialContent)
c, err = io.CopyN(w, rc, ranges[0].Length)
if ranges[0].Length != c {
sublog.Error().Int64("range-length", ranges[0].Length).Int64("transferred-bytes", c).Msg("range length vs transferred bytes mismatch")
}
} else {
_, err = io.Copy(w, rc)
// TODO check we sent the correct number of bytes. The stat info might be out dated. we need to send the If-Match etag in the GET to the datagateway
}

if err != nil {
log.Error().Err(err).Msg("error copying data to response")
sublog.Error().Err(err).Msg("error copying data to response")
return
}

case "PUT":
ctx := r.Context()
log := appctx.GetLogger(ctx)
fn := r.URL.Path
defer r.Body.Close()

Expand All @@ -111,7 +173,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
w.WriteHeader(http.StatusPartialContent)
return
}
log.Error().Err(err).Msg("error uploading file")
sublog.Error().Err(err).Msg("error uploading file")
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
Loading

0 comments on commit f2de9a7

Please sign in to comment.