Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copy: move c.compression* to imageCopier #1881

Merged
merged 1 commit into from
Mar 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions copy/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ func (ic *imageCopier) bpcCompressUncompressed(stream *sourceStream, detected bp
if ic.c.dest.DesiredLayerCompression() == types.Compress && !detected.isCompressed {
logrus.Debugf("Compressing blob on the fly")
var uploadedAlgorithm *compressiontypes.Algorithm
if ic.c.compressionFormat != nil {
uploadedAlgorithm = ic.c.compressionFormat
if ic.compressionFormat != nil {
uploadedAlgorithm = ic.compressionFormat
} else {
uploadedAlgorithm = defaultCompressionFormat
}

reader, annotations := ic.c.compressedStream(stream.reader, *uploadedAlgorithm)
reader, annotations := ic.compressedStream(stream.reader, *uploadedAlgorithm)
// Note: reader must be closed on all return paths.
stream.reader = reader
stream.info = types.BlobInfo{ // FIXME? Should we preserve more data in src.info?
Expand All @@ -157,7 +157,7 @@ func (ic *imageCopier) bpcCompressUncompressed(stream *sourceStream, detected bp
// bpcRecompressCompressed checks if we should be recompressing a compressed input to another format, and returns a *bpCompressionStepData if so.
func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) {
if ic.c.dest.DesiredLayerCompression() == types.Compress && detected.isCompressed &&
ic.c.compressionFormat != nil && ic.c.compressionFormat.Name() != detected.format.Name() {
ic.compressionFormat != nil && ic.compressionFormat.Name() != detected.format.Name() {
// When the blob is compressed, but the desired format is different, it first needs to be decompressed and finally
// re-compressed using the desired format.
logrus.Debugf("Blob will be converted")
Expand All @@ -173,7 +173,7 @@ func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bp
}
}()

recompressed, annotations := ic.c.compressedStream(decompressed, *ic.c.compressionFormat)
recompressed, annotations := ic.compressedStream(decompressed, *ic.compressionFormat)
// Note: recompressed must be closed on all return paths.
stream.reader = recompressed
stream.info = types.BlobInfo{ // FIXME? Should we preserve more data in src.info?
Expand All @@ -183,10 +183,10 @@ func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bp
succeeded = true
return &bpCompressionStepData{
operation: types.PreserveOriginal,
uploadedAlgorithm: ic.c.compressionFormat,
uploadedAlgorithm: ic.compressionFormat,
uploadedAnnotations: annotations,
srcCompressorName: detected.srcCompressorName,
uploadedCompressorName: ic.c.compressionFormat.Name(),
uploadedCompressorName: ic.compressionFormat.Name(),
closers: []io.Closer{decompressed, recompressed},
}, nil
}
Expand Down Expand Up @@ -318,24 +318,24 @@ func doCompression(dest io.Writer, src io.Reader, metadata map[string]string, co
}

// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compressiontypes.Algorithm) {
func (ic *imageCopier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compressiontypes.Algorithm) {
err := errors.New("Internal error: unexpected panic in compressGoroutine")
defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily.
_ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil
}()

err = doCompression(dest, src, metadata, compressionFormat, c.compressionLevel)
err = doCompression(dest, src, metadata, compressionFormat, ic.compressionLevel)
}

// compressedStream returns a stream the input reader compressed using format, and a metadata map.
// The caller must close the returned reader.
// AFTER the stream is consumed, metadata will be updated with annotations to use on the data.
func (c *copier) compressedStream(reader io.Reader, algorithm compressiontypes.Algorithm) (io.ReadCloser, map[string]string) {
func (ic *imageCopier) compressedStream(reader io.Reader, algorithm compressiontypes.Algorithm) (io.ReadCloser, map[string]string) {
pipeReader, pipeWriter := io.Pipe()
annotations := map[string]string{}
// If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise,
// e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed,
// we don’t care.
go c.compressGoroutine(pipeWriter, reader, annotations, algorithm) // Closes pipeWriter
go ic.compressGoroutine(pipeWriter, reader, annotations, algorithm) // Closes pipeWriter
return pipeReader, annotations
}
9 changes: 0 additions & 9 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/containers/image/v5/internal/private"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache"
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/signature"
"github.com/containers/image/v5/signature/signer"
"github.com/containers/image/v5/transports"
Expand Down Expand Up @@ -142,8 +141,6 @@ type copier struct {
progressInterval time.Duration
progress chan types.ProgressProperties
blobInfoCache internalblobinfocache.BlobInfoCache2
compressionFormat *compressiontypes.Algorithm // Compression algorithm to use, if the user explicitly requested one, or nil.
compressionLevel *int
ociDecryptConfig *encconfig.DecryptConfig
ociEncryptConfig *encconfig.EncryptConfig
concurrentBlobCopiesSemaphore *semaphore.Weighted // Limits the amount of concurrently copied blobs
Expand Down Expand Up @@ -250,12 +247,6 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef,
}
}

if options.DestinationCtx != nil {
// Note that compressionFormat and compressionLevel can be nil.
c.compressionFormat = options.DestinationCtx.CompressionFormat
c.compressionLevel = options.DestinationCtx.CompressionLevel
}

if err := c.setupSigners(options); err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions copy/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type imageCopier struct {
diffIDsAreNeeded bool
cannotModifyManifestReason string // The reason the manifest cannot be modified, or an empty string if it can
canSubstituteBlobs bool
compressionFormat *compressiontypes.Algorithm // Compression algorithm to use, if the user explicitly requested one, or nil.
compressionLevel *int
ociEncryptLayers *[]int
}

Expand Down Expand Up @@ -124,6 +126,11 @@ func (c *copier) copySingleImage(ctx context.Context, policyContext *signature.P
cannotModifyManifestReason: cannotModifyManifestReason,
ociEncryptLayers: options.OciEncryptLayers,
}
if options.DestinationCtx != nil {
// Note that compressionFormat and compressionLevel can be nil.
ic.compressionFormat = options.DestinationCtx.CompressionFormat
ic.compressionLevel = options.DestinationCtx.CompressionLevel
}
// Decide whether we can substitute blobs with semantic equivalents:
// - Don’t do that if we can’t modify the manifest at all
// - Ensure _this_ copy sees exactly the intended data when either processing a signed image or signing it.
Expand Down