Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial range request support #1388

Merged
merged 6 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/unreleased/range-requests.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: Support range header in GET requests

To allow resuming a download we now support GET requests with a range header.

https://github.com/cs3org/reva/pull/1388
https://github.com/owncloud/ocis-reva/issues/12
33 changes: 22 additions & 11 deletions internal/http/services/datagateway/datagateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"net/url"
"path"
"strconv"
"time"

"github.com/cs3org/reva/pkg/appctx"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (s *svc) doHead(w http.ResponseWriter, r *http.Request) {
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
log.Error().Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand All @@ -180,15 +181,15 @@ func (s *svc) doHead(w http.ResponseWriter, r *http.Request) {
httpClient := s.client
httpReq, err := rhttp.NewRequest(ctx, "HEAD", claims.Target, nil)
if err != nil {
log.Err(err).Msg("wrong request")
log.Error().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing HEAD request to data service")
log.Error().Err(err).Msg("error doing HEAD request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -211,7 +212,7 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
claims, err := s.verify(ctx, r)
if err != nil {
err = errors.Wrap(err, "datagateway: error validating transfer token")
log.Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
log.Error().Err(err).Str("token", r.Header.Get(TokenTransportHeader)).Msg("invalid transfer token")
w.WriteHeader(http.StatusForbidden)
return
}
Expand All @@ -221,30 +222,40 @@ func (s *svc) doGet(w http.ResponseWriter, r *http.Request) {
httpClient := s.client
httpReq, err := rhttp.NewRequest(ctx, "GET", claims.Target, nil)
if err != nil {
log.Err(err).Msg("wrong request")
log.Error().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
httpReq.Header = r.Header

httpRes, err := httpClient.Do(httpReq)
if err != nil {
log.Err(err).Msg("error doing GET request to data service")
log.Error().Err(err).Msg("error doing GET request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()

copyHeader(w.Header(), httpRes.Header)
if httpRes.StatusCode != http.StatusOK {
w.WriteHeader(httpRes.StatusCode)
// TODO why do we swallow the body?
w.WriteHeader(httpRes.StatusCode)
if httpRes.StatusCode != http.StatusOK && httpRes.StatusCode != http.StatusPartialContent {
return
}

w.WriteHeader(http.StatusOK)
_, err = io.Copy(w, httpRes.Body)
var c int64
c, err = io.Copy(w, httpRes.Body)
if err != nil {
log.Err(err).Msg("error writing body after headers were sent")
log.Error().Err(err).Msg("error writing body after headers were sent")
ishank011 marked this conversation as resolved.
Show resolved Hide resolved
}
if httpRes.Header.Get("Content-Length") != "" {
i, err := strconv.ParseInt(httpRes.Header.Get("Content-Length"), 10, 64)
if err != nil {
log.Error().Err(err).Str("content-length", httpRes.Header.Get("Content-Length")).Msg("invalid content length in dataprovider response")
}
if i != c {
log.Error().Int64("content-length", i).Int64("transferred-bytes", c).Msg("content length vs transferred bytes mismatch")
}
}
}

Expand Down
35 changes: 29 additions & 6 deletions internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ import (

func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
ctx := r.Context()
ctx, span := trace.StartSpan(ctx, "head")
ctx, span := trace.StartSpan(ctx, "get")
defer span.End()

ns = applyLayout(ctx, ns)

fn := path.Join(ns, r.URL.Path)

sublog := appctx.GetLogger(ctx).With().Str("path", fn).Logger()
sublog := appctx.GetLogger(ctx).With().Str("path", fn).Str("svc", "ocdav").Str("handler", "get").Logger()

client, err := s.getClient()
if err != nil {
Expand Down Expand Up @@ -109,6 +109,11 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
return
}
httpReq.Header.Set(datagateway.TokenTransportHeader, token)

if r.Header.Get("Range") != "" {
ishank011 marked this conversation as resolved.
Show resolved Hide resolved
httpReq.Header.Set("Range", r.Header.Get("Range"))
}

httpClient := s.client

httpRes, err := httpClient.Do(httpReq)
Expand All @@ -119,8 +124,8 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
}
defer httpRes.Body.Close()

if httpRes.StatusCode != http.StatusOK {
w.WriteHeader(http.StatusInternalServerError)
if httpRes.StatusCode != http.StatusOK && httpRes.StatusCode != http.StatusPartialContent {
w.WriteHeader(httpRes.StatusCode)
return
}

Expand All @@ -133,13 +138,31 @@ func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))

if httpRes.StatusCode == http.StatusPartialContent {
w.Header().Set("Content-Range", httpRes.Header.Get("Content-Range"))
w.Header().Set("Content-Length", httpRes.Header.Get("Content-Length"))
w.WriteHeader(http.StatusPartialContent)
} else {
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))
}
/*
if md.Checksum != "" {
w.Header().Set("OC-Checksum", md.Checksum)
}
*/
if _, err := io.Copy(w, httpRes.Body); err != nil {
var c int64
if c, err = io.Copy(w, httpRes.Body); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return an error status in this case, right? Currently it would just return OK or partial content if the GET request above didn't return any error. Same for the errors below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can't because the header has already been sent and the Copy is streaming the bytes.

All we can do is log that something went wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but if the copy fails, user would still receive a 200 with corrupt file contents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. even if we would buffer everything into ram ... we cannot change the http header when the body is being sent. The best we can do is log an error.

sublog.Error().Err(err).Msg("error finishing copying data to response")
}
if httpRes.Header.Get("Content-Length") != "" {
i, err := strconv.ParseInt(httpRes.Header.Get("Content-Length"), 10, 64)
if err != nil {
sublog.Error().Err(err).Str("content-length", httpRes.Header.Get("Content-Length")).Msg("invalid content length in datagateway response")
}
if i != c {
sublog.Error().Int64("content-length", i).Int64("transferred-bytes", c).Msg("content length vs transferred bytes mismatch")
}
}
// TODO we need to send the If-Match etag in the GET to the datagateway to prevent race conditions between stating and reading the file
}
5 changes: 4 additions & 1 deletion internal/http/services/owncloud/ocdav/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (s *svc) handleHead(w http.ResponseWriter, r *http.Request, ns string) {
t := utils.TSToTime(info.Mtime).UTC()
lastModifiedString := t.Format(time.RFC1123Z)
w.Header().Set("Last-Modified", lastModifiedString)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Length", strconv.FormatUint(info.Size, 10))
if info.Type != provider.ResourceType_RESOURCE_TYPE_CONTAINER {
w.Header().Set("Accept-Ranges", "bytes")
}
w.WriteHeader(http.StatusOK)
}
183 changes: 183 additions & 0 deletions pkg/rhttp/datatx/download/download.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2018-2020 CERN
butonic marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

// Package download provides a library to handle file download requests.
package download

import (
"fmt"
"io"
"mime/multipart"
"net/http"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage"
"github.com/rs/zerolog"
)

// GetOrHeadFile returns the requested file content
func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS) {
ctx := r.Context()
sublog := appctx.GetLogger(ctx).With().Str("svc", "datatx").Str("handler", "download").Logger()

var fn string
files, ok := r.URL.Query()["filename"]
if !ok || len(files[0]) < 1 {
fn = r.URL.Path
} else {
fn = files[0]
}

ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}}

// TODO check preconditions like If-Range, If-Match ...

var md *provider.ResourceInfo
var err error

// do a stat to set a Content-Length header

if md, err = fs.GetMD(ctx, ref, nil); err != nil {
handleError(w, &sublog, err, "stat")
return
}

var ranges []HTTPRange

if r.Header.Get("Range") != "" {
ranges, err = ParseRange(r.Header.Get("Range"), int64(md.Size))
if err != nil {
if err == ErrNoOverlap {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", md.Size))
}
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprintln(w, err)
return
}
if SumRangesSize(ranges) > int64(md.Size) {
// The total number of bytes in all the ranges
// is larger than the size of the file by
// itself, so this is probably an attack, or a
// dumb client. Ignore the range request.
ranges = nil
}
}

content, err := fs.Download(ctx, ref)
if err != nil {
handleError(w, &sublog, err, "download")
return
}
defer content.Close()

code := http.StatusOK

sendSize := int64(md.Size)
var sendContent io.Reader = content
var s io.Seeker
if len(ranges) > 0 {
sublog.Debug().Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("range request")
if s, ok = content.(io.Seeker); !ok {
sublog.Error().Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("ReadCloser is not seekable")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
switch {
case len(ranges) == 1:
// RFC 7233, Section 4.1:
// "If a single part is being transferred, the server
// generating the 206 response MUST generate a
// Content-Range header field, describing what range
// of the selected representation is enclosed, and a
// payload consisting of the range.
// ...
// A server MUST NOT generate a multipart response to
// a request for a single range, since a client that
// does not request multiple parts might not support
// multipart responses."
ra := ranges[0]
if _, err := s.Seek(ra.Start, io.SeekStart); err != nil {
sublog.Error().Err(err).Int64("start", ra.Start).Int64("length", ra.Length).Msg("content is not seekable")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
sendSize = ra.Length
code = http.StatusPartialContent
w.Header().Set("Content-Range", ra.ContentRange(int64(md.Size)))
butonic marked this conversation as resolved.
Show resolved Hide resolved
case len(ranges) > 1:
sendSize = RangesMIMESize(ranges, md.MimeType, int64(md.Size))
code = http.StatusPartialContent

pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
sendContent = pr
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() {
for _, ra := range ranges {
part, err := mw.CreatePart(ra.MimeHeader(md.MimeType, int64(md.Size)))
if err != nil {
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
if _, err := s.Seek(ra.Start, io.SeekStart); err != nil {
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
if _, err := io.CopyN(part, content, ra.Length); err != nil {
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
}
mw.Close()
pw.Close()
}()

}
}
w.WriteHeader(code)

if r.Method != "HEAD" {
var c int64
c, err = io.CopyN(w, sendContent, sendSize)
if err != nil {
sublog.Error().Err(err).Msg("error copying data to response")
return
}
if c != sendSize {
sublog.Error().Int64("copied", c).Int64("size", sendSize).Msg("copied vs size mismatch")
}
}

}

func handleError(w http.ResponseWriter, log *zerolog.Logger, err error, action string) {
switch err.(type) {
case errtypes.IsNotFound:
log.Debug().Err(err).Str("action", action).Msg("file not found")
w.WriteHeader(http.StatusNotFound)
case errtypes.IsPermissionDenied:
log.Debug().Err(err).Str("action", action).Msg("permission denied")
w.WriteHeader(http.StatusForbidden)
default:
log.Error().Err(err).Str("action", action).Msg("unexpected error")
w.WriteHeader(http.StatusInternalServerError)
}
}
Loading