Skip to content

Commit

Permalink
implement multi range response
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Jan 6, 2021
1 parent ed436a3 commit 9b4676f
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 220 deletions.
1 change: 1 addition & 0 deletions changelog/unreleased/range-requests.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ Enhancement: Support range header in GET requests
To allow resuming a download we now support GET requests with a range header.

https://github.com/cs3org/reva/pull/1388
https://github.com/owncloud/ocis-reva/issues/12
179 changes: 179 additions & 0 deletions pkg/rhttp/datatx/download/download.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2018-2020 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

// Package download provides a library to handle file download requests.
package download

import (
"fmt"
"io"
"mime/multipart"
"net/http"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/storage"
"github.com/rs/zerolog"
)

// GetOrHeadFile returns the requested file content
func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS) {
ctx := r.Context()
sublog := appctx.GetLogger(ctx).With().Str("datatx", "tus").Logger()

var fn string
files, ok := r.URL.Query()["filename"]
if !ok || len(files[0]) < 1 {
fn = r.URL.Path
} else {
fn = files[0]
}

ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}}

// TODO check preconditions like If-Range, If-Match ...

var md *provider.ResourceInfo
var err error

// do a stat to set a Content-Length header

if md, err = fs.GetMD(ctx, ref, nil); err != nil {
handleError(w, &sublog, err)
return
}

var ranges []HTTPRange

if r.Header.Get("Range") != "" {
ranges, err = ParseRange(r.Header.Get("Range"), int64(md.Size))
if err != nil {
if err == ErrNoOverlap {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", md.Size))
}
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprintln(w, err)
return
}
if SumRangesSize(ranges) > int64(md.Size) {
// The total number of bytes in all the ranges
// is larger than the size of the file by
// itself, so this is probably an attack, or a
// dumb client. Ignore the range request.
ranges = nil
}
}

content, err := fs.Download(ctx, ref)
if err != nil {
handleError(w, &sublog, err)
return
}
defer content.Close()

code := http.StatusOK

sendSize := int64(md.Size)
var sendContent io.Reader = content
var s io.Seeker
if len(ranges) > 0 {
sublog.Debug().Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("datasvc: range request")
if s, ok = content.(io.Seeker); !ok {
sublog.Error().Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("datasvc: ReadCloser is not seekable")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
switch {
case len(ranges) == 1:
// RFC 7233, Section 4.1:
// "If a single part is being transferred, the server
// generating the 206 response MUST generate a
// Content-Range header field, describing what range
// of the selected representation is enclosed, and a
// payload consisting of the range.
// ...
// A server MUST NOT generate a multipart response to
// a request for a single range, since a client that
// does not request multiple parts might not support
// multipart responses."
ra := ranges[0]
if _, err := s.Seek(ra.Start, io.SeekStart); err != nil {
sublog.Error().Err(err).Int64("start", ra.Start).Int64("length", ra.Length).Msg("datasvc: content is not seekable")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
sendSize = ra.Length
code = http.StatusPartialContent
w.Header().Set("Content-Range", ra.ContentRange(int64(md.Size)))
case len(ranges) > 1:
sendSize = RangesMIMESize(ranges, md.MimeType, int64(md.Size))
code = http.StatusPartialContent

pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
sendContent = pr
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() {
for _, ra := range ranges {
part, err := mw.CreatePart(ra.MimeHeader(md.MimeType, int64(md.Size)))
if err != nil {
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
if _, err := s.Seek(ra.Start, io.SeekStart); err != nil {
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
if _, err := io.CopyN(part, content, ra.Length); err != nil {
_ = pw.CloseWithError(err) // CloseWithError always returns nil
return
}
}
mw.Close()
pw.Close()
}()

}
}
w.WriteHeader(code)

if r.Method != "HEAD" {
_, err = io.CopyN(w, sendContent, sendSize)
if err != nil {
sublog.Error().Err(err).Msg("error copying data to response")
return
}
}

}

func handleError(w http.ResponseWriter, log *zerolog.Logger, err error) {
switch err.(type) {
case errtypes.IsNotFound:
log.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusNotFound)
case errtypes.IsPermissionDenied:
log.Debug().Err(err).Msg("datasvc: permission denied")
w.WriteHeader(http.StatusForbidden)
default:
log.Error().Err(err).Msg("datasvc: error downloading file")
w.WriteHeader(http.StatusInternalServerError)
}
}
55 changes: 47 additions & 8 deletions pkg/rhttp/datatx/range.go → pkg/rhttp/datatx/download/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

// Package datatx provides a library to abstract the complexity
// of using various data transfer protocols.
package datatx
package download

import (
"errors"
"fmt"
"mime/multipart"
"net/textproto"
"strconv"
"strings"
)

// from https://golang.org/src/net/http/fs.go
// taken from https://golang.org/src/net/http/fs.go

// ErrSeeker is returned by ServeContent's sizeFunc when the content
// doesn't seek properly. The underlying Seeker's error text isn't
Expand All @@ -45,9 +44,17 @@ type HTTPRange struct {
Start, Length int64
}

// FormatRange formats a Range header string as per RFC 7233.
func FormatRange(r HTTPRange, s uint64) string {
return fmt.Sprintf("bytes %d-%d/%d", r.Start, r.Start+r.Length-1, s)
// ContentRange formats a Range header string as per RFC 7233.
func (r HTTPRange) ContentRange(size int64) string {
return fmt.Sprintf("bytes %d-%d/%d", r.Start, r.Start+r.Length-1, size)
}

// MimeHeader creates range relevant MimeHeaders
func (r HTTPRange) MimeHeader(contentType string, size int64) textproto.MIMEHeader {
return textproto.MIMEHeader{
"Content-Range": {r.ContentRange(size)},
"Content-Type": {contentType},
}
}

// ParseRange parses a Range header string as per RFC 7233.
Expand All @@ -60,7 +67,7 @@ func ParseRange(s string, size int64) ([]HTTPRange, error) {
if !strings.HasPrefix(s, b) {
return nil, errors.New("invalid range")
}
var ranges []HTTPRange
ranges := []HTTPRange{}
noOverlap := false
for _, ra := range strings.Split(s[len(b):], ",") {
ra = textproto.TrimString(ra)
Expand Down Expand Up @@ -119,3 +126,35 @@ func ParseRange(s string, size int64) ([]HTTPRange, error) {
}
return ranges, nil
}

// countingWriter counts how many bytes have been written to it.
type countingWriter int64

func (w *countingWriter) Write(p []byte) (n int, err error) {
*w += countingWriter(len(p))
return len(p), nil
}

// RangesMIMESize returns the number of bytes it takes to encode the
// provided ranges as a multipart response.
func RangesMIMESize(ranges []HTTPRange, contentType string, contentSize int64) (encSize int64) {
var w countingWriter
mw := multipart.NewWriter(&w)
for _, ra := range ranges {
// CreatePart might return an error if the io.Copy for the boundaries fails
// here parts are not filled, so we assume for now thet this will always succeed
_, _ = mw.CreatePart(ra.MimeHeader(contentType, contentSize))
encSize += ra.Length
}
mw.Close()
encSize += int64(w)
return
}

// SumRangesSize adds up the length of all ranges
func SumRangesSize(ranges []HTTPRange) (size int64) {
for _, ra := range ranges {
size += ra.Length
}
return
}
98 changes: 3 additions & 95 deletions pkg/rhttp/datatx/manager/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
package simple

import (
"fmt"
"io"
"net/http"
"strconv"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rhttp/datatx"
"github.com/cs3org/reva/pkg/rhttp/datatx/download"
"github.com/cs3org/reva/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/pkg/storage"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -69,98 +67,8 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
sublog := appctx.GetLogger(ctx).With().Str("datatx", "simple").Logger()

switch r.Method {
case "GET":
var fn string
files, ok := r.URL.Query()["filename"]
if !ok || len(files[0]) < 1 {
fn = r.URL.Path
} else {
fn = files[0]
}

ref := &provider.Reference{Spec: &provider.Reference_Path{Path: fn}}

// TODO check If-Range condition

var ranges []datatx.HTTPRange
var md *provider.ResourceInfo
var err error
if r.Header.Get("Range") != "" {
md, err = fs.GetMD(ctx, ref, nil)
switch err.(type) {
case nil:
ranges, err = datatx.ParseRange(r.Header.Get("Range"), int64(md.Size))
if err != nil || len(ranges) > 1 { // we currently only support one range
if err == datatx.ErrNoOverlap {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", md.Size))
}
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
fmt.Fprintln(w, err)
return
}
w.Header().Set("Content-Range", datatx.FormatRange(ranges[0], md.Size))
case errtypes.IsNotFound:
sublog.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusNotFound)
return
case errtypes.IsPermissionDenied:
sublog.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusForbidden)
return
default:
sublog.Error().Err(err).Msg("datasvc: error downloading file")
w.WriteHeader(http.StatusInternalServerError)
return
}
}

// TODO always do a stat to set a Content-Length header

rc, err := fs.Download(ctx, ref)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
sublog.Debug().Err(err).Msg("datasvc: file not found")
w.WriteHeader(http.StatusNotFound)
} else {
sublog.Error().Err(err).Msg("datasvc: error downloading file")
w.WriteHeader(http.StatusInternalServerError)
}
return
}
defer rc.Close()

var c int64

if len(ranges) > 0 {
sublog.Debug().Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("datasvc: range request")
var s io.Seeker
if s, ok = rc.(io.Seeker); !ok {
sublog.Error().Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("datasvc: ReadCloser is not seekable")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
if _, err = s.Seek(ranges[0].Start, io.SeekStart); err != nil {
sublog.Error().Err(err).Int64("start", ranges[0].Start).Int64("length", ranges[0].Length).Msg("datasvc: could not seek for range request")
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
w.Header().Set("Content-Range", datatx.FormatRange(ranges[0], md.Size)) // md cannot be null because we did a stat for the range request
w.Header().Set("Content-Length", strconv.FormatInt(ranges[0].Length, 10))
w.WriteHeader(http.StatusPartialContent)
c, err = io.CopyN(w, rc, ranges[0].Length)
if ranges[0].Length != c {
sublog.Error().Int64("range-length", ranges[0].Length).Int64("transferred-bytes", c).Msg("range length vs transferred bytes mismatch")
}
} else {
_, err = io.Copy(w, rc)
// TODO check we sent the correct number of bytes. The stat info might be out dated. we need to send the If-Match etag in the GET to the datagateway
}

if err != nil {
sublog.Error().Err(err).Msg("error copying data to response")
return
}

case "GET", "HEAD":
download.GetOrHeadFile(w, r, fs)
case "PUT":
fn := r.URL.Path
defer r.Body.Close()
Expand Down
Loading

0 comments on commit 9b4676f

Please sign in to comment.