diff --git a/cache/blobs.go b/cache/blobs.go index 8d2beefd0654..377d680acf27 100644 --- a/cache/blobs.go +++ b/cache/blobs.go @@ -1,19 +1,15 @@ package cache import ( - "compress/gzip" "context" "fmt" - "io" "os" "strconv" - "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" "github.com/containerd/containerd/diff/walking" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/mount" - "github.com/klauspost/compress/zstd" "github.com/moby/buildkit/session" "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/flightcontrol" @@ -57,8 +53,6 @@ func (sr *immutableRef) computeBlobChain(ctx context.Context, createIfNeeded boo return computeBlobChain(ctx, sr, createIfNeeded, comp, s, filter) } -type compressor func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) - func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool, comp compression.Config, s session.Group, filter map[string]struct{}) error { eg, ctx := errgroup.WithContext(ctx) switch sr.kind() { @@ -92,28 +86,8 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool return nil, errors.WithStack(ErrNoBlobs) } - var mediaType string - var compressorFunc compressor - var finalize func(context.Context, content.Store) (map[string]string, error) - switch comp.Type { - case compression.Uncompressed: - mediaType = ocispecs.MediaTypeImageLayer - case compression.Gzip: - compressorFunc = func(dest io.Writer, _ string) (io.WriteCloser, error) { - return gzipWriter(comp)(dest) - } - mediaType = ocispecs.MediaTypeImageLayerGzip - case compression.EStargz: - compressorFunc, finalize = compressEStargz(comp) - mediaType = ocispecs.MediaTypeImageLayerGzip - case compression.Zstd: - compressorFunc = func(dest io.Writer, _ string) (io.WriteCloser, error) { - return zstdWriter(comp)(dest) - } - mediaType = ocispecs.MediaTypeImageLayer + "+zstd" - default: - return nil, errors.Errorf("unknown layer compression type: %q", comp.Type) - } + compressorFunc, finalize := comp.Type.Compress(comp) + mediaType := comp.Type.MediaType() var lowerRef *immutableRef switch sr.kind() { @@ -206,7 +180,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool } } - if desc.Digest == "" && !isTypeWindows(sr) && (comp.Type == compression.Zstd || comp.Type == compression.EStargz) { + if desc.Digest == "" && !isTypeWindows(sr) && comp.Type.NeedsComputeDiffBySelf() { // These compression types aren't supported by containerd differ. So try to compute diff on buildkit side. // This case can be happen on containerd worker + non-overlayfs snapshotter (e.g. native). // See also: https://github.com/containerd/containerd/issues/4263 @@ -433,7 +407,7 @@ func isTypeWindows(sr *immutableRef) bool { // ensureCompression ensures the specified ref has the blob of the specified compression Type. func ensureCompression(ctx context.Context, ref *immutableRef, comp compression.Config, s session.Group) error { - _, err := g.Do(ctx, fmt.Sprintf("%s-%d", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) { + _, err := g.Do(ctx, fmt.Sprintf("%s-%s", ref.ID(), comp.Type), func(ctx context.Context) (interface{}, error) { desc, err := ref.ociDesc(ctx, ref.descHandlers, true) if err != nil { return nil, err @@ -480,38 +454,3 @@ func ensureCompression(ctx context.Context, ref *immutableRef, comp compression. }) return err } - -func gzipWriter(comp compression.Config) func(io.Writer) (io.WriteCloser, error) { - return func(dest io.Writer) (io.WriteCloser, error) { - level := gzip.DefaultCompression - if comp.Level != nil { - level = *comp.Level - } - return gzip.NewWriterLevel(dest, level) - } -} - -func zstdWriter(comp compression.Config) func(io.Writer) (io.WriteCloser, error) { - return func(dest io.Writer) (io.WriteCloser, error) { - level := zstd.SpeedDefault - if comp.Level != nil { - level = toZstdEncoderLevel(*comp.Level) - } - return zstd.NewWriter(dest, zstd.WithEncoderLevel(level)) - } -} - -func toZstdEncoderLevel(level int) zstd.EncoderLevel { - // map zstd compression levels to go-zstd levels - // once we also have c based implementation move this to helper pkg - if level < 0 { - return zstd.SpeedDefault - } else if level < 3 { - return zstd.SpeedFastest - } else if level < 7 { - return zstd.SpeedDefault - } else if level < 9 { - return zstd.SpeedBetterCompression - } - return zstd.SpeedBestCompression -} diff --git a/cache/blobs_linux.go b/cache/blobs_linux.go index fcb8850a02a9..37310ff3bad6 100644 --- a/cache/blobs_linux.go +++ b/cache/blobs_linux.go @@ -12,6 +12,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/mount" "github.com/moby/buildkit/util/bklog" + "github.com/moby/buildkit/util/compression" "github.com/moby/buildkit/util/overlay" digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" @@ -24,7 +25,7 @@ var emptyDesc = ocispecs.Descriptor{} // diff between lower and upper snapshot. If the passed mounts cannot // be computed (e.g. because the mounts aren't overlayfs), it returns // an error. -func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compressor) (_ ocispecs.Descriptor, ok bool, err error) { +func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor) (_ ocispecs.Descriptor, ok bool, err error) { // Get upperdir location if mounts are overlayfs that can be processed by this differ. upperdir, err := overlay.GetUpperdir(lower, upper) if err != nil { diff --git a/cache/blobs_nolinux.go b/cache/blobs_nolinux.go index 2ccee770e2a8..1567768c1939 100644 --- a/cache/blobs_nolinux.go +++ b/cache/blobs_nolinux.go @@ -6,11 +6,12 @@ package cache import ( "context" + "github.com/moby/buildkit/util/compression" "github.com/containerd/containerd/mount" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) -func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compressor) (_ ocispecs.Descriptor, ok bool, err error) { +func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compression.Compressor) (_ ocispecs.Descriptor, ok bool, err error) { return ocispecs.Descriptor{}, true, errors.Errorf("overlayfs-based diff computing is unsupported") } diff --git a/cache/converter.go b/cache/converter.go index a7e4df193aff..326af241e408 100644 --- a/cache/converter.go +++ b/cache/converter.go @@ -7,10 +7,8 @@ import ( "io" "sync" - cdcompression "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/images/converter" "github.com/containerd/containerd/labels" "github.com/moby/buildkit/identity" @@ -21,106 +19,33 @@ import ( "github.com/pkg/errors" ) -// needsConversion indicates whether a conversion is needed for the specified descriptor to -// be the compressionType. -func needsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, compressionType compression.Type) (bool, error) { - mediaType := desc.MediaType - switch compressionType { - case compression.Uncompressed: - if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Uncompressed { - return false, nil - } - case compression.Gzip: - esgz, err := isEStargz(ctx, cs, desc.Digest) - if err != nil { - return false, err - } - if (!images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Gzip) && !esgz { - return false, nil - } - case compression.Zstd: - if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Zstd { - return false, nil - } - case compression.EStargz: - esgz, err := isEStargz(ctx, cs, desc.Digest) - if err != nil { - return false, err - } - if !images.IsLayerType(mediaType) || esgz { - return false, nil - } - default: - return false, fmt.Errorf("unknown compression type during conversion: %q", compressionType) - } - return true, nil -} - // getConverter returns converter function according to the specified compression type. // If no conversion is needed, this returns nil without error. func getConverter(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, comp compression.Config) (converter.ConvertFunc, error) { - if needs, err := needsConversion(ctx, cs, desc, comp.Type); err != nil { + if needs, err := comp.Type.NeedsConversion(ctx, cs, desc); err != nil { return nil, errors.Wrapf(err, "failed to determine conversion needs") } else if !needs { // No conversion. No need to return an error here. return nil, nil } - c := conversion{target: comp} - - from := compression.FromMediaType(desc.MediaType) - switch from { - case compression.Uncompressed: - case compression.Gzip, compression.Zstd: - c.decompress = func(ctx context.Context, desc ocispecs.Descriptor) (r io.ReadCloser, err error) { - ra, err := cs.ReaderAt(ctx, desc) - if err != nil { - return nil, err - } - esgz, err := isEStargz(ctx, cs, desc.Digest) - if err != nil { - return nil, err - } else if esgz { - r, err = decompressEStargz(io.NewSectionReader(ra, 0, ra.Size())) - if err != nil { - return nil, err - } - } else { - r, err = cdcompression.DecompressStream(io.NewSectionReader(ra, 0, ra.Size())) - if err != nil { - return nil, err - } - } - return &readCloser{r, ra.Close}, nil - } - default: - return nil, errors.Errorf("unsupported source compression type %q from mediatype %q", from, desc.MediaType) + from, err := compression.FromMediaType(desc.MediaType) + if err != nil { + return nil, err } - switch comp.Type { - case compression.Uncompressed: - case compression.Gzip: - c.compress = gzipWriter(comp) - case compression.Zstd: - c.compress = zstdWriter(comp) - case compression.EStargz: - compressorFunc, finalize := compressEStargz(comp) - c.compress = func(w io.Writer) (io.WriteCloser, error) { - return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip) - } - c.finalize = finalize - default: - return nil, errors.Errorf("unknown target compression type during conversion: %q", comp.Type) - } + c := conversion{target: comp} + c.compress, c.finalize = comp.Type.Compress(comp) + c.decompress = from.Decompress return (&c).convert, nil } type conversion struct { target compression.Config - decompress func(context.Context, ocispecs.Descriptor) (io.ReadCloser, error) - compress func(w io.Writer) (io.WriteCloser, error) - finalize func(context.Context, content.Store) (map[string]string, error) + decompress compression.Decompressor + compress compression.Compressor + finalize compression.Finalizer } var bufioPool = sync.Pool{ @@ -151,34 +76,20 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec bufW = bufio.NewWriterSize(w, 128*1024) } defer bufioPool.Put(bufW) - var zw io.WriteCloser = &nopWriteCloser{bufW} - if c.compress != nil { - zw, err = c.compress(zw) - if err != nil { - return nil, err - } + zw, err := c.compress(&nopWriteCloser{bufW}, c.target.Type.MediaType()) + if err != nil { + return nil, err } zw = &onceWriteCloser{WriteCloser: zw} defer zw.Close() // convert this layer diffID := digest.Canonical.Digester() - var rdr io.Reader - if c.decompress == nil { - ra, err := cs.ReaderAt(ctx, desc) - if err != nil { - return nil, err - } - defer ra.Close() - rdr = io.NewSectionReader(ra, 0, ra.Size()) - } else { - rc, err := c.decompress(ctx, desc) - if err != nil { - return nil, err - } - defer rc.Close() - rdr = rc + rdr, err := c.decompress(ctx, cs, desc) + if err != nil { + return nil, err } + defer rdr.Close() if _, err := io.Copy(zw, io.TeeReader(rdr, diffID.Hash())); err != nil { return nil, err } @@ -201,7 +112,7 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec } newDesc := desc - newDesc.MediaType = c.target.Type.DefaultMediaType() + newDesc.MediaType = c.target.Type.MediaType() newDesc.Digest = info.Digest newDesc.Size = info.Size newDesc.Annotations = map[string]string{labels.LabelUncompressed: diffID.Digest().String()} @@ -217,20 +128,6 @@ func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispec return &newDesc, nil } -type readCloser struct { - io.ReadCloser - closeFunc func() error -} - -func (rc *readCloser) Close() error { - err1 := rc.ReadCloser.Close() - err2 := rc.closeFunc() - if err1 != nil { - return errors.Wrapf(err1, "failed to close: %v", err2) - } - return err2 -} - type nopWriteCloser struct { io.Writer } diff --git a/cache/manager_test.go b/cache/manager_test.go index e68819282033..f052ec5c5472 100644 --- a/cache/manager_test.go +++ b/cache/manager_test.go @@ -1345,7 +1345,7 @@ func testSharingCompressionVariant(ctx context.Context, t *testing.T, co *cmOut, if err != nil { return nil, "", err } - return cw, testCase.a.DefaultMediaType(), nil + return cw, testCase.a.MediaType(), nil }) require.NoError(t, err) contentBuffer := contentutil.NewBuffer() @@ -1386,9 +1386,9 @@ func testSharingCompressionVariant(ctx context.Context, t *testing.T, co *cmOut, // check if all compression variables are available on the both refs checkCompression := func(desc ocispecs.Descriptor, compressionType compression.Type) { - require.Equal(t, compressionType.DefaultMediaType(), desc.MediaType, "compression: %v", compressionType) + require.Equal(t, compressionType.MediaType(), desc.MediaType, "compression: %v", compressionType) if compressionType == compression.EStargz { - ok, err := isEStargz(ctx, co.cs, desc.Digest) + ok, err := compression.EStargz.Is(ctx, co.cs, desc.Digest) require.NoError(t, err, "compression: %v", compressionType) require.True(t, ok, "compression: %v", compressionType) } @@ -1481,7 +1481,7 @@ func getCompressor(w io.Writer, compressionType compression.Type, customized boo } pr.Close() }() - return &writeCloser{pw, func() error { <-done; return nil }}, nil + return &compression.WriteCloser{WriteCloser: pw, CloseFunc: func() error { <-done; return nil }}, nil case compression.Zstd: if customized { skippableFrameMagic := []byte{0x50, 0x2a, 0x4d, 0x18} @@ -1767,7 +1767,7 @@ func TestGetRemotes(t *testing.T) { r := refChain[i] isLazy, err := r.isLazy(egctx) require.NoError(t, err) - needs, err := needsConversion(ctx, co.cs, desc, compressionType) + needs, err := compressionType.NeedsConversion(ctx, co.cs, desc) require.NoError(t, err) if needs { require.False(t, isLazy, "layer %q requires conversion so it must be unlazied", desc.Digest) @@ -1998,7 +1998,7 @@ func checkDescriptor(ctx context.Context, t *testing.T, cs content.Store, desc o } // Check annotation values are valid - c := new(counter) + c := new(compression.Counter) ra, err := cs.ReaderAt(ctx, desc) if err != nil && errdefs.IsNotFound(err) { return // lazy layer @@ -2013,7 +2013,7 @@ func checkDescriptor(ctx context.Context, t *testing.T, cs content.Store, desc o require.NoError(t, err) require.Equal(t, diffID.Digest().String(), uncompressedDgst) if compressionType == compression.EStargz { - require.Equal(t, c.size(), uncompressedSize) + require.Equal(t, c.Size(), uncompressedSize) } } diff --git a/cache/refs.go b/cache/refs.go index bbc38c481245..37b3bf6d873e 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -767,12 +767,9 @@ func (sr *immutableRef) getBlobWithCompression(ctx context.Context, compressionT } func getBlobWithCompression(ctx context.Context, cs content.Store, desc ocispecs.Descriptor, compressionType compression.Type) (ocispecs.Descriptor, error) { - if compressionType == compression.UnknownCompression { - return ocispecs.Descriptor{}, fmt.Errorf("cannot get unknown compression type") - } var target *ocispecs.Descriptor if err := walkBlob(ctx, cs, desc, func(desc ocispecs.Descriptor) bool { - if needs, err := needsConversion(ctx, cs, desc, compressionType); err == nil && !needs { + if needs, err := compressionType.NeedsConversion(ctx, cs, desc); err == nil && !needs { target = &desc return false } @@ -881,7 +878,7 @@ func filterAnnotationsForSave(a map[string]string) (b map[string]string) { if a == nil { return nil } - for _, k := range append(eStargzAnnotations, containerdUncompressed) { + for _, k := range append(compression.EStargzAnnotations, containerdUncompressed) { v, ok := a[k] if !ok { continue diff --git a/cache/remote.go b/cache/remote.go index d0ac594b6ac8..d4eb37f0bca6 100644 --- a/cache/remote.go +++ b/cache/remote.go @@ -213,7 +213,7 @@ func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refC } if refCfg.Compression.Force { - if needs, err := needsConversion(ctx, sr.cm.ContentStore, desc, refCfg.Compression.Type); err != nil { + if needs, err := refCfg.Compression.Type.NeedsConversion(ctx, sr.cm.ContentStore, desc); err != nil { return nil, err } else if needs { // ensure the compression type. diff --git a/cache/remotecache/local/local.go b/cache/remotecache/local/local.go index 18c73364c03b..c84f6741110b 100644 --- a/cache/remotecache/local/local.go +++ b/cache/remotecache/local/local.go @@ -102,11 +102,15 @@ func getContentStore(ctx context.Context, sm *session.Manager, g session.Group, } func attrsToCompression(attrs map[string]string) (*compression.Config, error) { - compressionType := compression.Default + var compressionType compression.Type if v, ok := attrs[attrLayerCompression]; ok { - if c := compression.Parse(v); c != compression.UnknownCompression { - compressionType = c + c, err := compression.Parse(v) + if err != nil { + return nil, err } + compressionType = c + } else { + compressionType = compression.Default } compressionConfig := compression.New(compressionType) if v, ok := attrs[attrForceCompression]; ok { diff --git a/cache/remotecache/registry/registry.go b/cache/remotecache/registry/registry.go index cfe54e52aa6e..e3b32eb29657 100644 --- a/cache/remotecache/registry/registry.go +++ b/cache/remotecache/registry/registry.go @@ -131,11 +131,15 @@ func (dsl *withDistributionSourceLabel) SnapshotLabels(descs []ocispecs.Descript } func attrsToCompression(attrs map[string]string) (*compression.Config, error) { - compressionType := compression.Default + var compressionType compression.Type if v, ok := attrs[attrLayerCompression]; ok { - if c := compression.Parse(v); c != compression.UnknownCompression { - compressionType = c + c, err := compression.Parse(v) + if err != nil { + return nil, err } + compressionType = c + } else { + compressionType = compression.Default } compressionConfig := compression.New(compressionType) if v, ok := attrs[attrForceCompression]; ok { diff --git a/cache/remotecache/v1/cachestorage.go b/cache/remotecache/v1/cachestorage.go index 7ba7eb0f6059..a4f7f6ad055f 100644 --- a/cache/remotecache/v1/cachestorage.go +++ b/cache/remotecache/v1/cachestorage.go @@ -276,7 +276,7 @@ func (cs *cacheResultStorage) LoadRemotes(ctx context.Context, res solver.CacheR // Any of blobs in the remote must meet the specified compression option. match := false for _, desc := range r.result.Descriptors { - m := compressionopts.Type.IsMediaType(desc.MediaType) + m := compression.IsMediaType(compressionopts.Type, desc.MediaType) match = match || m if compressionopts.Force && !m { match = false diff --git a/exporter/containerimage/opts.go b/exporter/containerimage/opts.go index 67f64513a27a..1a0b2d87fb59 100644 --- a/exporter/containerimage/opts.go +++ b/exporter/containerimage/opts.go @@ -39,8 +39,6 @@ type ImageCommitOpts struct { func (c *ImageCommitOpts) Load(opt map[string]string) (map[string]string, error) { rest := make(map[string]string) - esgz := false - as, optb, err := ParseAnnotations(toBytesMap(opt)) if err != nil { return nil, err @@ -58,19 +56,7 @@ func (c *ImageCommitOpts) Load(opt map[string]string) (map[string]string, error) case keyImageName: c.ImageName = v case keyLayerCompression: - switch v { - case "gzip": - c.RefCfg.Compression.Type = compression.Gzip - case "estargz": - c.RefCfg.Compression.Type = compression.EStargz - esgz = true - case "zstd": - c.RefCfg.Compression.Type = compression.Zstd - case "uncompressed": - c.RefCfg.Compression.Type = compression.Uncompressed - default: - err = errors.Errorf("unsupported layer compression type: %v", v) - } + c.RefCfg.Compression.Type, err = compression.Parse(v) case keyCompressionLevel: ii, err2 := strconv.ParseInt(v, 10, 64) if err != nil { @@ -98,8 +84,8 @@ func (c *ImageCommitOpts) Load(opt map[string]string) (map[string]string, error) } } - if esgz { - c.EnableOCITypes("estargz") + if c.RefCfg.Compression.Type.OnlySupportOCITypes() { + c.EnableOCITypes(c.RefCfg.Compression.Type.String()) } c.AddAnnotations(as) diff --git a/util/compression/compression.go b/util/compression/compression.go index ba44a9270b4e..8f040fb31f72 100644 --- a/util/compression/compression.go +++ b/util/compression/compression.go @@ -3,8 +3,11 @@ package compression import ( "bytes" "context" + "fmt" "io" + "sync" + cdcompression "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/stargz-snapshotter/estargz" @@ -14,24 +17,41 @@ import ( "github.com/sirupsen/logrus" ) -// Type represents compression type for blob data. -type Type int +type Compressor func(dest io.Writer, mediaType string) (io.WriteCloser, error) +type Decompressor func(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) +type Finalizer func(context.Context, content.Store) (map[string]string, error) + +// Type represents compression type for blob data, which needs +// to be implemented for each compression type. +type Type interface { + Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) + Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) + NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) + NeedsComputeDiffBySelf() bool + OnlySupportOCITypes() bool + MediaType() string + String() string +} -const ( +type ( + uncompressedType struct{} + gzipType struct{} + estargzType struct{} + zstdType struct{} +) + +var ( // Uncompressed indicates no compression. - Uncompressed Type = iota + Uncompressed = uncompressedType{} // Gzip is used for blob data. - Gzip + Gzip = gzipType{} // EStargz is used for estargz data. - EStargz + EStargz = estargzType{} // Zstd is used for Zstandard data. - Zstd - - // UnknownCompression means not supported yet. - UnknownCompression Type = -1 + Zstd = zstdType{} ) type Config struct { @@ -61,69 +81,41 @@ const ( mediaTypeImageLayerZstd = ocispecs.MediaTypeImageLayer + "+zstd" // unreleased image-spec#790 ) -var Default = Gzip +var Default gzipType = Gzip -func Parse(t string) Type { +func Parse(t string) (Type, error) { switch t { - case "uncompressed": - return Uncompressed - case "gzip": - return Gzip - case "estargz": - return EStargz - case "zstd": - return Zstd - default: - return UnknownCompression - } -} - -func (ct Type) String() string { - switch ct { - case Uncompressed: - return "uncompressed" - case Gzip: - return "gzip" - case EStargz: - return "estargz" - case Zstd: - return "zstd" - default: - return "unknown" - } -} - -func (ct Type) DefaultMediaType() string { - switch ct { - case Uncompressed: - return ocispecs.MediaTypeImageLayer - case Gzip, EStargz: - return ocispecs.MediaTypeImageLayerGzip - case Zstd: - return mediaTypeImageLayerZstd + case Uncompressed.String(): + return Uncompressed, nil + case Gzip.String(): + return Gzip, nil + case EStargz.String(): + return EStargz, nil + case Zstd.String(): + return Zstd, nil default: - return ocispecs.MediaTypeImageLayer + "+unknown" + return nil, fmt.Errorf("unsupported compression type %s", t) } } -func (ct Type) IsMediaType(mt string) bool { +func IsMediaType(ct Type, mt string) bool { mt, ok := toOCILayerType[mt] if !ok { return false } - return mt == ct.DefaultMediaType() + return mt == ct.MediaType() } -func FromMediaType(mediaType string) Type { +func FromMediaType(mediaType string) (Type, error) { switch toOCILayerType[mediaType] { case ocispecs.MediaTypeImageLayer, ocispecs.MediaTypeImageLayerNonDistributable: - return Uncompressed + return Uncompressed, nil case ocispecs.MediaTypeImageLayerGzip, ocispecs.MediaTypeImageLayerNonDistributableGzip: - return Gzip + return Gzip, nil case mediaTypeImageLayerZstd, ocispecs.MediaTypeImageLayerNonDistributableZstd: - return Zstd + return Zstd, nil default: - return UnknownCompression + return nil, fmt.Errorf("unsupported media type %s", mediaType) } } @@ -170,7 +162,7 @@ func detectCompressionType(cr *io.SectionReader) (Type, error) { // means just create an empty layer. // // See issue docker/docker#18170 - return UnknownCompression, err + return nil, err } if _, _, err := estargz.OpenFooter(cr); err == nil { @@ -241,3 +233,72 @@ func ConvertAllLayerMediaTypes(oci bool, descs ...ocispecs.Descriptor) []ocispec } return converted } + +func decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (r io.ReadCloser, err error) { + ra, err := cs.ReaderAt(ctx, desc) + if err != nil { + return nil, err + } + esgz, err := EStargz.Is(ctx, cs, desc.Digest) + if err != nil { + return nil, err + } else if esgz { + r, err = decompressEStargz(io.NewSectionReader(ra, 0, ra.Size())) + if err != nil { + return nil, err + } + } else { + r, err = cdcompression.DecompressStream(io.NewSectionReader(ra, 0, ra.Size())) + if err != nil { + return nil, err + } + } + return &readCloser{r, ra.Close}, nil +} + +type readCloser struct { + io.ReadCloser + closeFunc func() error +} + +func (rc *readCloser) Close() error { + err1 := rc.ReadCloser.Close() + err2 := rc.closeFunc() + if err1 != nil { + return errors.Wrapf(err1, "failed to close: %v", err2) + } + return err2 +} + +type WriteCloser struct { + io.WriteCloser + CloseFunc func() error +} + +func (wc *WriteCloser) Close() error { + err1 := wc.WriteCloser.Close() + err2 := wc.CloseFunc() + if err1 != nil { + return errors.Wrapf(err1, "failed to close: %v", err2) + } + return err2 +} + +type Counter struct { + n int64 + mu sync.Mutex +} + +func (c *Counter) Write(p []byte) (n int, err error) { + c.mu.Lock() + c.n += int64(len(p)) + c.mu.Unlock() + return len(p), nil +} + +func (c *Counter) Size() (n int64) { + c.mu.Lock() + n = c.n + c.mu.Unlock() + return +} diff --git a/cache/estargz.go b/util/compression/estargz.go similarity index 80% rename from cache/estargz.go rename to util/compression/estargz.go index f67d14925d49..aa9bb107e84d 100644 --- a/cache/estargz.go +++ b/util/compression/estargz.go @@ -1,4 +1,4 @@ -package cache +package compression import ( "archive/tar" @@ -11,23 +11,28 @@ import ( cdcompression "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" "github.com/containerd/stargz-snapshotter/estargz" - "github.com/moby/buildkit/util/compression" digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) -var eStargzAnnotations = []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation} +var EStargzAnnotations = []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation} -// compressEStargz writes the passed blobs stream as an eStargz-compressed blob. -// finalize function finalizes the written blob metadata and returns all eStargz annotations. -func compressEStargz(comp compression.Config) (compressorFunc compressor, finalize func(context.Context, content.Store) (map[string]string, error)) { +const containerdUncompressed = "containerd.io/uncompressed" +const estargzLabel = "buildkit.io/compression/estargz" + +func (c estargzType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) { var cInfo *compressionInfo var writeErr error var mu sync.Mutex return func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) { - if compression.FromMediaType(requiredMediaType) != compression.Gzip { + ct, err := FromMediaType(requiredMediaType) + if err != nil { + return nil, err + } + if ct != Gzip { return nil, fmt.Errorf("unsupported media type for estargz compressor %q", requiredMediaType) } done := make(chan struct{}) @@ -76,7 +81,7 @@ func compressEStargz(comp compression.Config) (compressorFunc compressor, finali pr.Close() return nil }() - return &writeCloser{pw, func() error { + return &WriteCloser{pw, func() error { <-done // wait until the write completes return nil }}, nil @@ -113,11 +118,40 @@ func compressEStargz(comp compression.Config) (compressorFunc compressor, finali } } -const estargzLabel = "buildkit.io/compression/estargz" +func (c estargzType) Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) { + return decompress(ctx, cs, desc) +} + +func (c estargzType) NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) { + esgz, err := c.Is(ctx, cs, desc.Digest) + if err != nil { + return false, err + } + if !images.IsLayerType(desc.MediaType) || esgz { + return false, nil + } + return true, nil +} + +func (c estargzType) NeedsComputeDiffBySelf() bool { + return true +} + +func (c estargzType) OnlySupportOCITypes() bool { + return true +} + +func (c estargzType) MediaType() string { + return ocispecs.MediaTypeImageLayerGzip +} + +func (c estargzType) String() string { + return "estargz" +} // isEStargz returns true when the specified digest of content exists in // the content store and it's eStargz. -func isEStargz(ctx context.Context, cs content.Store, dgst digest.Digest) (bool, error) { +func (c estargzType) Is(ctx context.Context, cs content.Store, dgst digest.Digest) (bool, error) { info, err := cs.Info(ctx, dgst) if err != nil { return false, nil @@ -178,39 +212,6 @@ func decompressEStargz(r *io.SectionReader) (io.ReadCloser, error) { return estargz.Unpack(r, new(estargz.GzipDecompressor)) } -type writeCloser struct { - io.WriteCloser - closeFunc func() error -} - -func (wc *writeCloser) Close() error { - err1 := wc.WriteCloser.Close() - err2 := wc.closeFunc() - if err1 != nil { - return errors.Wrapf(err1, "failed to close: %v", err2) - } - return err2 -} - -type counter struct { - n int64 - mu sync.Mutex -} - -func (c *counter) Write(p []byte) (n int, err error) { - c.mu.Lock() - c.n += int64(len(p)) - c.mu.Unlock() - return len(p), nil -} - -func (c *counter) size() (n int64) { - c.mu.Lock() - n = c.n - c.mu.Unlock() - return -} - type compressionInfo struct { blobInfo tocDigest digest.Digest @@ -227,7 +228,7 @@ func calculateBlobInfo() (io.WriteCloser, chan blobInfo) { pr, pw := io.Pipe() go func() { defer pr.Close() - c := new(counter) + c := new(Counter) dgstr := digest.Canonical.Digester() diffID := digest.Canonical.Digester() decompressR, err := cdcompression.DecompressStream(io.TeeReader(pr, dgstr.Hash())) @@ -244,7 +245,7 @@ func calculateBlobInfo() (io.WriteCloser, chan blobInfo) { pr.CloseWithError(err) return } - res <- blobInfo{dgstr.Digest(), diffID.Digest(), c.size()} + res <- blobInfo{dgstr.Digest(), diffID.Digest(), c.Size()} }() return pw, res } diff --git a/util/compression/gzip.go b/util/compression/gzip.go new file mode 100644 index 000000000000..081abef956c1 --- /dev/null +++ b/util/compression/gzip.go @@ -0,0 +1,65 @@ +package compression + +import ( + "compress/gzip" + "context" + "io" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + ocispecs "github.com/opencontainers/image-spec/specs-go/v1" +) + +func (c gzipType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) { + return func(dest io.Writer, _ string) (io.WriteCloser, error) { + return gzipWriter(comp)(dest) + }, nil +} + +func (c gzipType) Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) { + return decompress(ctx, cs, desc) +} + +func (c gzipType) NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) { + esgz, err := EStargz.Is(ctx, cs, desc.Digest) + if err != nil { + return false, err + } + if !images.IsLayerType(desc.MediaType) { + return false, nil + } + ct, err := FromMediaType(desc.MediaType) + if err != nil { + return false, err + } + if ct == Gzip && !esgz { + return false, nil + } + return true, nil +} + +func (c gzipType) NeedsComputeDiffBySelf() bool { + return false +} + +func (c gzipType) OnlySupportOCITypes() bool { + return false +} + +func (c gzipType) MediaType() string { + return ocispecs.MediaTypeImageLayerGzip +} + +func (c gzipType) String() string { + return "gzip" +} + +func gzipWriter(comp Config) func(io.Writer) (io.WriteCloser, error) { + return func(dest io.Writer) (io.WriteCloser, error) { + level := gzip.DefaultCompression + if comp.Level != nil { + level = *comp.Level + } + return gzip.NewWriterLevel(dest, level) + } +} diff --git a/util/compression/uncompressed.go b/util/compression/uncompressed.go new file mode 100644 index 000000000000..476582628bd5 --- /dev/null +++ b/util/compression/uncompressed.go @@ -0,0 +1,64 @@ +package compression + +import ( + "context" + "io" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/docker/docker/pkg/ioutils" + ocispecs "github.com/opencontainers/image-spec/specs-go/v1" +) + +type nopWriteCloser struct { + io.Writer +} + +func (w *nopWriteCloser) Close() error { + return nil +} + +func (c uncompressedType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) { + return func(dest io.Writer, mediaType string) (io.WriteCloser, error) { + return &nopWriteCloser{dest}, nil + }, nil +} + +func (c uncompressedType) Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) { + ra, err := cs.ReaderAt(ctx, desc) + if err != nil { + return nil, err + } + rdr := io.NewSectionReader(ra, 0, ra.Size()) + return ioutils.NewReadCloserWrapper(rdr, ra.Close), nil +} + +func (c uncompressedType) NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) { + if !images.IsLayerType(desc.MediaType) { + return false, nil + } + ct, err := FromMediaType(desc.MediaType) + if err != nil { + return false, err + } + if ct == Uncompressed { + return false, nil + } + return true, nil +} + +func (c uncompressedType) NeedsComputeDiffBySelf() bool { + return false +} + +func (c uncompressedType) OnlySupportOCITypes() bool { + return false +} + +func (c uncompressedType) MediaType() string { + return ocispecs.MediaTypeImageLayer +} + +func (c uncompressedType) String() string { + return "uncompressed" +} diff --git a/util/compression/zstd.go b/util/compression/zstd.go new file mode 100644 index 000000000000..61313e3f51fe --- /dev/null +++ b/util/compression/zstd.go @@ -0,0 +1,76 @@ +package compression + +import ( + "context" + "io" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/klauspost/compress/zstd" + ocispecs "github.com/opencontainers/image-spec/specs-go/v1" +) + +func (c zstdType) Compress(comp Config) (compressorFunc Compressor, finalize Finalizer) { + return func(dest io.Writer, _ string) (io.WriteCloser, error) { + return zstdWriter(comp)(dest) + }, nil +} + +func (c zstdType) Decompress(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (io.ReadCloser, error) { + return decompress(ctx, cs, desc) +} + +func (c zstdType) NeedsConversion(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (bool, error) { + if !images.IsLayerType(desc.MediaType) { + return false, nil + } + ct, err := FromMediaType(desc.MediaType) + if err != nil { + return false, err + } + if ct == Zstd { + return false, nil + } + return true, nil +} + +func (c zstdType) NeedsComputeDiffBySelf() bool { + return true +} + +func (c zstdType) OnlySupportOCITypes() bool { + return false +} + +func (c zstdType) MediaType() string { + return mediaTypeImageLayerZstd +} + +func (c zstdType) String() string { + return "zstd" +} + +func zstdWriter(comp Config) func(io.Writer) (io.WriteCloser, error) { + return func(dest io.Writer) (io.WriteCloser, error) { + level := zstd.SpeedDefault + if comp.Level != nil { + level = toZstdEncoderLevel(*comp.Level) + } + return zstd.NewWriter(dest, zstd.WithEncoderLevel(level)) + } +} + +func toZstdEncoderLevel(level int) zstd.EncoderLevel { + // map zstd compression levels to go-zstd levels + // once we also have c based implementation move this to helper pkg + if level < 0 { + return zstd.SpeedDefault + } else if level < 3 { + return zstd.SpeedFastest + } else if level < 7 { + return zstd.SpeedDefault + } else if level < 9 { + return zstd.SpeedBetterCompression + } + return zstd.SpeedBestCompression +}