Skip to content

Commit

Permalink
Merge pull request #1359 from giuseppe/partial-support-200-ret-code
Browse files Browse the repository at this point in the history
docker: convert fully body to partial requests
  • Loading branch information
mtrmac authored Aug 25, 2021
2 parents d6f64a2 + ea19c43 commit 17bb73d
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 52 deletions.
160 changes: 108 additions & 52 deletions docker/docker_image_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,78 @@ func (s *dockerImageSource) HasThreadSafeGetBlob() bool {
return true
}

// splitHTTP200ResponseToPartial splits a 200 response in multiple streams as specified by the chunks
func splitHTTP200ResponseToPartial(streams chan io.ReadCloser, errs chan error, body io.ReadCloser, chunks []internalTypes.ImageSourceChunk) {
defer close(streams)
defer close(errs)
currentOffset := uint64(0)

body = makeBufferedNetworkReader(body, 64, 16384)
defer body.Close()
for _, c := range chunks {
if c.Offset != currentOffset {
if c.Offset < currentOffset {
errs <- fmt.Errorf("invalid chunk offset specified %v (expected >= %v)", c.Offset, currentOffset)
break
}
toSkip := c.Offset - currentOffset
if _, err := io.Copy(ioutil.Discard, io.LimitReader(body, int64(toSkip))); err != nil {
errs <- err
break
}
currentOffset += toSkip
}
s := signalCloseReader{
closed: make(chan interface{}),
stream: ioutil.NopCloser(io.LimitReader(body, int64(c.Length))),
consumeStream: true,
}
streams <- s

// Wait until the stream is closed before going to the next chunk
<-s.closed
currentOffset += c.Length
}
}

// handle206Response reads a 206 response and send each part as a separate ReadCloser to the streams chan.
func handle206Response(streams chan io.ReadCloser, errs chan error, body io.ReadCloser, chunks []internalTypes.ImageSourceChunk, mediaType string, params map[string]string) {
defer close(streams)
defer close(errs)
if !strings.HasPrefix(mediaType, "multipart/") {
streams <- body
return
}
boundary, found := params["boundary"]
if !found {
errs <- errors.Errorf("could not find boundary")
body.Close()
return
}
buffered := makeBufferedNetworkReader(body, 64, 16384)
defer buffered.Close()
mr := multipart.NewReader(buffered, boundary)
for {
p, err := mr.NextPart()
if err != nil {
if err != io.EOF {
errs <- err
}
return
}
s := signalCloseReader{
closed: make(chan interface{}),
stream: p,
}
streams <- s
// NextPart() cannot be called while the current part
// is being read, so wait until it is closed
<-s.closed
}
}

// GetBlobAt returns a stream for the specified blob.
// The specified chunks must be not overlapping and sorted by their offset.
func (s *dockerImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []internalTypes.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
headers := make(map[string][]string)

Expand All @@ -305,53 +376,30 @@ func (s *dockerImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo,
}
return nil, nil, err
}
if res.StatusCode != http.StatusPartialContent {
res.Body.Close()
return nil, nil, errors.Errorf("invalid status code returned when fetching blob %d (%s)", res.StatusCode, http.StatusText(res.StatusCode))
}

mediaType, params, err := mime.ParseMediaType(res.Header.Get("Content-Type"))
if err != nil {
return nil, nil, err
}
switch res.StatusCode {
case http.StatusOK:
// if the server replied with a 200 status code, convert the full body response to a series of
// streams as it would have been done with 206.
streams := make(chan io.ReadCloser)
errs := make(chan error)
go splitHTTP200ResponseToPartial(streams, errs, res.Body, chunks)
return streams, errs, nil
case http.StatusPartialContent:
mediaType, params, err := mime.ParseMediaType(res.Header.Get("Content-Type"))
if err != nil {
return nil, nil, err
}

streams := make(chan io.ReadCloser)
errs := make(chan error)
streams := make(chan io.ReadCloser)
errs := make(chan error)

go func() {
defer close(streams)
defer close(errs)
if !strings.HasPrefix(mediaType, "multipart/") {
streams <- res.Body
return
}
boundary, found := params["boundary"]
if !found {
errs <- errors.Errorf("could not find boundary")
return
}
buffered := makeBufferedNetworkReader(res.Body, 64, 16384)
defer buffered.Close()
mr := multipart.NewReader(buffered, boundary)
for {
p, err := mr.NextPart()
if err != nil {
if err != io.EOF {
errs <- err
}
return
}
s := signalCloseReader{
Closed: make(chan interface{}),
Stream: p,
}
streams <- s
// NextPart() cannot be called while the current part
// is being read, so wait until it is closed
<-s.Closed
}
}()
return streams, errs, nil
go handle206Response(streams, errs, res.Body, chunks, mediaType, params)
return streams, errs, nil
default:
res.Body.Close()
return nil, nil, errors.Errorf("invalid status code returned when fetching blob %d (%s)", res.StatusCode, http.StatusText(res.StatusCode))
}
}

// GetBlob returns a stream for the specified blob, and the blob’s size (or -1 if unknown).
Expand Down Expand Up @@ -585,7 +633,7 @@ type bufferedNetworkReaderBuffer struct {
}

type bufferedNetworkReader struct {
stream io.Reader
stream io.ReadCloser
emptyBuffer chan *bufferedNetworkReaderBuffer
readyBuffer chan *bufferedNetworkReaderBuffer
terminate chan bool
Expand All @@ -611,9 +659,10 @@ func handleBufferedNetworkReader(br *bufferedNetworkReader) {
}
}

func (n *bufferedNetworkReader) Close() {
func (n *bufferedNetworkReader) Close() error {
close(n.terminate)
close(n.emptyBuffer)
return n.stream.Close()
}

func (n *bufferedNetworkReader) read(p []byte) (int, error) {
Expand Down Expand Up @@ -657,7 +706,7 @@ func (n *bufferedNetworkReader) Read(p []byte) (int, error) {
return n.read(p)
}

func makeBufferedNetworkReader(stream io.Reader, nBuffers, bufferSize uint) *bufferedNetworkReader {
func makeBufferedNetworkReader(stream io.ReadCloser, nBuffers, bufferSize uint) *bufferedNetworkReader {
br := bufferedNetworkReader{
stream: stream,
emptyBuffer: make(chan *bufferedNetworkReaderBuffer, nBuffers),
Expand All @@ -680,15 +729,22 @@ func makeBufferedNetworkReader(stream io.Reader, nBuffers, bufferSize uint) *buf
}

type signalCloseReader struct {
Closed chan interface{}
Stream io.ReadCloser
closed chan interface{}
stream io.ReadCloser
consumeStream bool
}

func (s signalCloseReader) Read(p []byte) (int, error) {
return s.Stream.Read(p)
return s.stream.Read(p)
}

func (s signalCloseReader) Close() error {
defer close(s.Closed)
return s.Stream.Close()
defer close(s.closed)
if s.consumeStream {
if _, err := io.Copy(ioutil.Discard, s.stream); err != nil {
s.stream.Close()
return err
}
}
return s.stream.Close()
}
87 changes: 87 additions & 0 deletions docker/docker_image_src_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package docker

import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand All @@ -11,6 +13,7 @@ import (
"strings"
"testing"

internalTypes "github.com/containers/image/v5/internal/types"
"github.com/containers/image/v5/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -95,3 +98,87 @@ func TestSimplifyContentType(t *testing.T) {
assert.Equal(t, c.expected, out, c.input)
}
}

func readNextStream(streams chan io.ReadCloser, errs chan error) ([]byte, error) {
select {
case r := <-streams:
if r == nil {
return nil, nil
}
defer r.Close()
return ioutil.ReadAll(r)
case err := <-errs:
return nil, err
}
}

type verifyGetBlobAtData struct {
expectedData []byte
expectedError error
}

func verifyGetBlobAtOutput(t *testing.T, streams chan io.ReadCloser, errs chan error, expected []verifyGetBlobAtData) {
for _, c := range expected {
data, err := readNextStream(streams, errs)
assert.Equal(t, c.expectedData, data)
assert.Equal(t, c.expectedError, err)
}
}

func TestSplitHTTP200ResponseToPartial(t *testing.T) {
body := ioutil.NopCloser(bytes.NewReader([]byte("123456789")))
defer body.Close()
streams := make(chan io.ReadCloser)
errs := make(chan error)
chunks := []internalTypes.ImageSourceChunk{
{Offset: 1, Length: 2},
{Offset: 4, Length: 1},
}
go splitHTTP200ResponseToPartial(streams, errs, body, chunks)

expected := []verifyGetBlobAtData{
{[]byte("23"), nil},
{[]byte("5"), nil},
{[]byte(nil), nil},
}

verifyGetBlobAtOutput(t, streams, errs, expected)
}

func TestHandle206Response(t *testing.T) {
body := ioutil.NopCloser(bytes.NewReader([]byte("--AAA\r\n\r\n23\r\n--AAA\r\n\r\n5\r\n--AAA--")))
defer body.Close()
streams := make(chan io.ReadCloser)
errs := make(chan error)
chunks := []internalTypes.ImageSourceChunk{
{Offset: 1, Length: 2},
{Offset: 4, Length: 1},
}
mediaType := "multipart/form-data"
params := map[string]string{
"boundary": "AAA",
}
go handle206Response(streams, errs, body, chunks, mediaType, params)

expected := []verifyGetBlobAtData{
{[]byte("23"), nil},
{[]byte("5"), nil},
{[]byte(nil), nil},
}
verifyGetBlobAtOutput(t, streams, errs, expected)

body = ioutil.NopCloser(bytes.NewReader([]byte("HELLO")))
defer body.Close()
streams = make(chan io.ReadCloser)
errs = make(chan error)
chunks = []internalTypes.ImageSourceChunk{{Offset: 100, Length: 5}}
mediaType = "text/plain"
params = map[string]string{}
go handle206Response(streams, errs, body, chunks, mediaType, params)

expected = []verifyGetBlobAtData{
{[]byte("HELLO"), nil},
{[]byte(nil), nil},
}
verifyGetBlobAtOutput(t, streams, errs, expected)
}
1 change: 1 addition & 0 deletions internal/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type ImageSourceChunk struct {
// This API is experimental and can be changed without bumping the major version number.
type ImageSourceSeekable interface {
// GetBlobAt returns a stream for the specified blob.
// The specified chunks must be not overlapping and sorted by their offset.
GetBlobAt(context.Context, publicTypes.BlobInfo, []ImageSourceChunk) (chan io.ReadCloser, chan error, error)
}

Expand Down

0 comments on commit 17bb73d

Please sign in to comment.