Skip to content

Commit

Permalink
feat(handler, s3store): implement ServerDataStore for direct content …
Browse files Browse the repository at this point in the history
…serving, closes tus#1064

- Add ServerDataStore interface and S3ServableUpload implementation
- Update handlers to use ServerDataStore when available
- Implement range request handling for S3ServableUpload
- Add tests for new ServerDataStore functionality
- Update Go version to 1.22.1
  • Loading branch information
pcfreak30 committed Oct 20, 2024
1 parent 9779a84 commit c956ccb
Show file tree
Hide file tree
Showing 6 changed files with 489 additions and 1 deletion.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/tus/tusd/v2
// Specify the Go version needed for the Heroku deployment
// See https://github.com/heroku/heroku-buildpack-go#go-module-specifics
// +heroku goVersion go1.22
go 1.21.0
go 1.22.1

toolchain go1.22.7

require (
Expand Down
6 changes: 6 additions & 0 deletions pkg/handler/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type StoreComposer struct {
Concater ConcaterDataStore
UsesLengthDeferrer bool
LengthDeferrer LengthDeferrerDataStore
Server ServerDataStore
UsesServer bool
}

// NewStoreComposer creates a new and empty store composer.
Expand Down Expand Up @@ -85,3 +87,7 @@ func (store *StoreComposer) UseLengthDeferrer(ext LengthDeferrerDataStore) {
store.UsesLengthDeferrer = ext != nil
store.LengthDeferrer = ext
}
func (store *StoreComposer) UseServer(ext ServerDataStore) {
store.UsesServer = ext != nil
store.Server = ext
}
11 changes: 11 additions & 0 deletions pkg/handler/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"context"
"io"
"net/http"
)

type MetaData map[string]string
Expand Down Expand Up @@ -121,6 +122,16 @@ type DataStore interface {
GetUpload(ctx context.Context, id string) (upload Upload, err error)
}

// ServableUpload defines the method for serving content directly
type ServableUpload interface {
ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error
}

// ServerDataStore is the interface for data stores that can serve content directly
type ServerDataStore interface {
AsServableUpload(upload Upload) ServableUpload
}

type TerminatableUpload interface {
// Terminate an upload so any further requests to the upload resource will
// return the ErrNotFound error.
Expand Down
11 changes: 11 additions & 0 deletions pkg/handler/unrouted_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,17 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
return
}

// If the data store implements ServerDataStore, use the ServableUpload interface
if handler.composer.UsesServer {
servableUpload := handler.composer.Server.AsServableUpload(upload)
err = servableUpload.ServeContent(c, w, r)
if err != nil {
handler.sendError(c, err)
}
return
}

// Fall back to the existing GetReader implementation if ServerDataStore is not implemented
contentType, contentDisposition := filterContentType(info)
resp := HTTPResponse{
StatusCode: http.StatusOK,
Expand Down
178 changes: 178 additions & 0 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"net/http"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -100,6 +101,91 @@ import (
// considered valid into a header value according to RFC2616.
var nonPrintableRegexp = regexp.MustCompile(`[^\x09\x20-\x7E]`)

var _ handler.ServerDataStore = &S3Store{}

func (store S3Store) AsServableUpload(upload handler.Upload) handler.ServableUpload {
return &S3ServableUpload{
store: &store,
upload: upload.(*s3Upload),
}
}

type S3ServableUpload struct {
store *S3Store
upload *s3Upload
}

func (su *S3ServableUpload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// Get file info
info, err := su.upload.GetInfo(ctx)
if err != nil {
return err
}

// Prepare GetObject input
input := &s3.GetObjectInput{
Bucket: aws.String(su.store.Bucket),
Key: su.store.keyWithPrefix(su.upload.objectId),
}

// Handle range requests
rangeHeader := r.Header.Get("Range")
if rangeHeader != "" {
if err := su.handleRangeRequest(ctx, w, r, info, input, rangeHeader); err != nil {
return err
}
return nil
}

// For non-range requests, serve the entire file
result, err := su.store.Service.GetObject(ctx, input)
if err != nil {
return err
}
defer result.Body.Close()

// Set headers
w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10))
w.Header().Set("Content-Type", info.MetaData["filetype"])
w.Header().Set("ETag", *result.ETag)

// Stream the content
_, err = su.store.copyToResponseWriter(w, result.Body)
return err
}

func (su *S3ServableUpload) handleRangeRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, info handler.FileInfo, input *s3.GetObjectInput, rangeHeader string) error {
ranges, err := parseRange(rangeHeader, info.Size)
if err != nil {
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return err
}

if len(ranges) > 1 {
return fmt.Errorf("multiple ranges are not supported")
}

// Set the range in the GetObject input
input.Range = aws.String(fmt.Sprintf("bytes=%d-%d", ranges[0].start, ranges[0].end))

result, err := su.store.Service.GetObject(ctx, input)
if err != nil {
return err
}
defer result.Body.Close()

// Set headers for partial content
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", ranges[0].start, ranges[0].end, info.Size))
w.Header().Set("Content-Length", strconv.FormatInt(ranges[0].end-ranges[0].start+1, 10))
w.Header().Set("Content-Type", info.MetaData["filetype"])
w.Header().Set("ETag", *result.ETag)
w.WriteHeader(http.StatusPartialContent)

// Stream the content
_, err = su.store.copyToResponseWriter(w, result.Body)
return err
}

// See the handler.DataStore interface for documentation about the different
// methods.
type S3Store struct {
Expand Down Expand Up @@ -1249,3 +1335,95 @@ func (store S3Store) releaseUploadSemaphore() {
store.uploadSemaphore.Release()
store.uploadSemaphoreDemandMetric.Dec()
}

func (store S3Store) copyToResponseWriter(w http.ResponseWriter, r io.Reader) (int64, error) {
// If possible, use http.ResponseWriter's WriteString or Write methods directly
if wt, ok := w.(interface {
WriteString(string) (int, error)
}); ok {
// Use a buffer to read from the io.Reader
buf := make([]byte, 32*1024)
var written int64
for {
n, err := r.Read(buf)
if n > 0 {
wn, werr := wt.WriteString(string(buf[:n]))
written += int64(wn)
if werr != nil {
return written, werr
}
}
if err != nil {
if err == io.EOF {
return written, nil
}
return written, err
}
}
}

// Fallback to io.Copy if WriteString is not available
return io.Copy(w, r)
}

// Helper function to parse range header
func parseRange(rangeHeader string, size int64) ([]struct{ start, end int64 }, error) {
if rangeHeader == "" {
return nil, fmt.Errorf("empty range header")
}

const b = "bytes="
if !strings.HasPrefix(rangeHeader, b) {
return nil, fmt.Errorf("invalid range header format")
}

var ranges []struct{ start, end int64 }
for _, ra := range strings.Split(rangeHeader[len(b):], ",") {
ra = strings.TrimSpace(ra)
if ra == "" {
continue
}
i := strings.Index(ra, "-")
if i < 0 {
return nil, fmt.Errorf("invalid range format")
}
start, end := strings.TrimSpace(ra[:i]), strings.TrimSpace(ra[i+1:])
var r struct{ start, end int64 }
if start == "" {
// suffix-byte-range-spec, like "-100"
n, err := strconv.ParseInt(end, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid range format")
}
if n > size {
n = size
}
r.start = size - n
r.end = size - 1
} else {
i, err := strconv.ParseInt(start, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid range format")
}
if i >= size {
return nil, fmt.Errorf("range out of bounds")
}
r.start = i
if end == "" {
// byte-range-spec, like "100-"
r.end = size - 1
} else {
i, err := strconv.ParseInt(end, 10, 64)
if err != nil || i >= size || i < r.start {
return nil, fmt.Errorf("invalid range format")
}
r.end = i
}
}
ranges = append(ranges, r)
}
if len(ranges) == 0 {
return nil, fmt.Errorf("no valid ranges")
}
return ranges, nil
}
Loading

0 comments on commit c956ccb

Please sign in to comment.