Skip to content

Commit

Permalink
copy: do not always fail push if digest mismatches
Browse files Browse the repository at this point in the history
if the computed digest mismatches the expected one for a partial
image, just log a warning and use the computed one.  This is expected
since partial images are stored by their TOC digest, not the diffID
for the layer.

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
  • Loading branch information
giuseppe committed Jun 6, 2023
1 parent 86e0afd commit 9a87d2e
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 36 deletions.
15 changes: 11 additions & 4 deletions copy/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
// and returns a complete blobInfo of the copied blob.
func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer,
isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) {
isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, incremental, emptyLayer bool) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers;
// that pipeline is built by updating stream.
// === Input: srcReader
Expand All @@ -27,13 +27,16 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read
info: srcInfo,
}

// If the layer is a partial layer, we can rewrite the digest
canRewriteDigest := incremental

// === Process input through digestingReader to validate against the expected digest.
// Be paranoid; in case PutBlob somehow managed to ignore an error from digestingReader,
// use a separate validation failure indicator.
// Note that for this check we don't use the stronger "validationSucceeded" indicator, because
// dest.PutBlob may detect that the layer already exists, in which case we don't
// read stream to the end, and validation does not happen.
digestingReader, err := newDigestingReader(stream.reader, srcInfo.Digest)
digestingReader, err := newDigestingReader(stream.reader, srcInfo.Digest, canRewriteDigest)
if err != nil {
return types.BlobInfo{}, fmt.Errorf("preparing to verify blob %s: %w", srcInfo.Digest, err)
}
Expand Down Expand Up @@ -128,8 +131,12 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read
}
}

if digestingReader.validationFailed { // Coverage: This should never happen.
return types.BlobInfo{}, fmt.Errorf("Internal error writing blob %s, digest verification failed but was ignored", srcInfo.Digest)
if digestingReader.validationFailed {
if !canRewriteDigest { // Coverage: This should never happen.
return types.BlobInfo{}, fmt.Errorf("Internal error writing blob %s, digest verification failed", srcInfo.Digest)
}
logrus.Warningf("Digest verification failed for blob %s but was ignored", srcInfo.Digest)
return uploadedInfo, nil
}
if stream.info.Digest != "" && uploadedInfo.Digest != stream.info.Digest {
return types.BlobInfo{}, fmt.Errorf("Internal error writing blob %s, blob with digest %s saved with digest %s", srcInfo.Digest, stream.info.Digest, uploadedInfo.Digest)
Expand Down
11 changes: 8 additions & 3 deletions copy/digesting_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ type digestingReader struct {
expectedDigest digest.Digest
validationFailed bool
validationSucceeded bool
canRewriteDigest bool
}

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
// or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest.
// (neither is set if EOF is never reached).
func newDigestingReader(source io.Reader, expectedDigest digest.Digest) (*digestingReader, error) {
func newDigestingReader(source io.Reader, expectedDigest digest.Digest, canRewriteDigest bool) (*digestingReader, error) {
var digester digest.Digester
if err := expectedDigest.Validate(); err != nil {
return nil, fmt.Errorf("Invalid digest specification %s", expectedDigest)
Expand All @@ -37,6 +38,7 @@ func newDigestingReader(source io.Reader, expectedDigest digest.Digest) (*digest
hash: digester.Hash(),
expectedDigest: expectedDigest,
validationFailed: false,
canRewriteDigest: canRewriteDigest,
}, nil
}

Expand All @@ -54,9 +56,12 @@ func (d *digestingReader) Read(p []byte) (int, error) {
actualDigest := d.digester.Digest()
if actualDigest != d.expectedDigest {
d.validationFailed = true
return 0, fmt.Errorf("Digest did not match, expected %s, got %s", d.expectedDigest, actualDigest)
if !d.canRewriteDigest {
return 0, fmt.Errorf("Digest did not match, expected %s, got %s", d.expectedDigest, actualDigest)
}
} else {
d.validationSucceeded = true
}
d.validationSucceeded = true
}
return n, err
}
61 changes: 38 additions & 23 deletions copy/digesting_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestNewDigestingReader(t *testing.T) {
"sha256:0", // Invalid hex value
"sha256:01", // Invalid length of hex value
} {
_, err := newDigestingReader(source, input)
_, err := newDigestingReader(source, input, false)
assert.Error(t, err, input.String())
}
}
Expand All @@ -37,41 +37,56 @@ func TestDigestingReaderRead(t *testing.T) {
}
// Valid input
for _, c := range cases {
source := bytes.NewReader(c.input)
reader, err := newDigestingReader(source, c.digest)
require.NoError(t, err, c.digest.String())
dest := bytes.Buffer{}
n, err := io.Copy(&dest, reader)
assert.NoError(t, err, c.digest.String())
assert.Equal(t, int64(len(c.input)), n, c.digest.String())
assert.Equal(t, c.input, dest.Bytes(), c.digest.String())
assert.False(t, reader.validationFailed, c.digest.String())
assert.True(t, reader.validationSucceeded, c.digest.String())
for _, incremental := range []bool{false, true} {
source := bytes.NewReader(c.input)
reader, err := newDigestingReader(source, c.digest, incremental)
require.NoError(t, err, c.digest.String())
dest := bytes.Buffer{}
n, err := io.Copy(&dest, reader)
assert.NoError(t, err, c.digest.String())
assert.Equal(t, int64(len(c.input)), n, c.digest.String())
assert.Equal(t, c.input, dest.Bytes(), c.digest.String())
assert.False(t, reader.validationFailed, c.digest.String())
assert.True(t, reader.validationSucceeded, c.digest.String())
}
}
// Modified input
for _, c := range cases {
source := bytes.NewReader(bytes.Join([][]byte{c.input, []byte("x")}, nil))
reader, err := newDigestingReader(source, c.digest)
reader, err := newDigestingReader(source, c.digest, false)
require.NoError(t, err, c.digest.String())
dest := bytes.Buffer{}
_, err = io.Copy(&dest, reader)
assert.Error(t, err, c.digest.String())
assert.True(t, reader.validationFailed, c.digest.String())
assert.False(t, reader.validationSucceeded, c.digest.String())

// try with an incremental source
source = bytes.NewReader(bytes.Join([][]byte{c.input, []byte("x")}, nil))
reader, err = newDigestingReader(source, c.digest, true)
require.NoError(t, err, c.digest.String())
dest = bytes.Buffer{}
_, err = io.Copy(&dest, reader)
assert.NoError(t, err, c.digest.String())
assert.True(t, reader.validationFailed, c.digest.String())
assert.False(t, reader.validationSucceeded, c.digest.String())
assert.NotEqual(t, c.digest.String(), reader.digester.Digest(), c.digest.String())
}
// Truncated input
for _, c := range cases {
source := bytes.NewReader(c.input)
reader, err := newDigestingReader(source, c.digest)
require.NoError(t, err, c.digest.String())
if len(c.input) != 0 {
dest := bytes.Buffer{}
truncatedLen := int64(len(c.input) - 1)
n, err := io.CopyN(&dest, reader, truncatedLen)
assert.NoError(t, err, c.digest.String())
assert.Equal(t, truncatedLen, n, c.digest.String())
for _, incremental := range []bool{false, true} {
source := bytes.NewReader(c.input)
reader, err := newDigestingReader(source, c.digest, incremental)
require.NoError(t, err, c.digest.String())
if len(c.input) != 0 {
dest := bytes.Buffer{}
truncatedLen := int64(len(c.input) - 1)
n, err := io.CopyN(&dest, reader, truncatedLen)
assert.NoError(t, err, c.digest.String())
assert.Equal(t, truncatedLen, n, c.digest.String())
}
assert.False(t, reader.validationFailed, c.digest.String())
assert.False(t, reader.validationSucceeded, c.digest.String())
}
assert.False(t, reader.validationFailed, c.digest.String())
assert.False(t, reader.validationSucceeded, c.digest.String())
}
}
19 changes: 13 additions & 6 deletions copy/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/containers/image/v5/signature"
"github.com/containers/image/v5/transports"
"github.com/containers/image/v5/types"
"github.com/containers/storage/pkg/chunked"
digest "github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -405,7 +406,13 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool, index, srcRef, manifestLayerInfos[index].EmptyLayer)
toc, err := chunked.GetTOCDigest(manifestLayerInfos[index].Annotations)
if err != nil {
cld.err = err
} else {
incremental := toc != nil
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool, index, srcRef, incremental, manifestLayerInfos[index].EmptyLayer)
}
}
data[index] = cld
}
Expand Down Expand Up @@ -550,7 +557,7 @@ func (ic *imageCopier) copyConfig(ctx context.Context, src types.Image) error {
return types.BlobInfo{}, fmt.Errorf("reading config blob %s: %w", srcInfo.Digest, err)
}

destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false)
destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false, false)
if err != nil {
return types.BlobInfo{}, err
}
Expand Down Expand Up @@ -578,7 +585,7 @@ type diffIDResult struct {
// copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps (de/re/)compressing it,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
// srcRef can be used as an additional hint to the destination during checking whether a layer can be reused but srcRef can be nil.
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, layerIndex int, srcRef reference.Named, emptyLayer bool) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, layerIndex int, srcRef reference.Named, incremental, emptyLayer bool) (types.BlobInfo, digest.Digest, error) {
// If the srcInfo doesn't contain compression information, try to compute it from the
// MediaType, which was either read from a manifest by way of LayerInfos() or constructed
// by LayerInfosForCopy(), if it was supplied at all. If we succeed in copying the blob,
Expand Down Expand Up @@ -696,7 +703,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
defer srcStream.Close()

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer)
blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, incremental, emptyLayer)
if err != nil {
return types.BlobInfo{}, "", err
}
Expand Down Expand Up @@ -762,7 +769,7 @@ func updatedBlobInfoFromReuse(inputInfo types.BlobInfo, reusedBlob private.Reuse
// perhaps (de/re/)compressing the stream,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, <-chan diffIDResult, error) {
diffIDIsNeeded bool, toEncrypt bool, bar *progressBar, layerIndex int, incremental, emptyLayer bool) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(compressiontypes.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

Expand All @@ -787,7 +794,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea
}
}

blobInfo, err := ic.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, false, toEncrypt, bar, layerIndex, emptyLayer) // Sets err to nil on success
blobInfo, err := ic.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, false, toEncrypt, bar, layerIndex, incremental, emptyLayer) // Sets err to nil on success
return blobInfo, diffIDChan, err
// We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan
}
Expand Down

0 comments on commit 9a87d2e

Please sign in to comment.