Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for exporting nydus compression type #2581

Merged
merged 6 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,56 @@ jobs:
name: coverage
path: ./coverage

test-nydus:
runs-on: ubuntu-20.04
needs: [base]
strategy:
fail-fast: false
matrix:
pkg:
- ./client
worker:
- containerd
- oci
typ:
- integration
exclude:
- pkg: ./client ./cmd/buildctl ./worker/containerd ./solver ./frontend
typ: dockerfile
include:
- pkg: ./...
skip-integration-tests: 1
typ: integration
steps:
-
name: Checkout
uses: actions/checkout@v3
-
name: Expose GitHub Runtime
uses: crazy-max/ghaction-github-runtime@v2
-
name: Set up QEMU
uses: docker/setup-qemu-action@v2
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
with:
version: ${{ env.BUILDX_VERSION }}
driver-opts: image=${{ env.REPO_SLUG_ORIGIN }}
buildkitd-flags: --debug
-
name: Test pkg=${{ matrix.pkg }} ; typ=${{ matrix.typ }} ; skipit=${{ matrix.skip-integration-tests }} ; worker=${{ matrix.worker }}
run: |
if [ -n "${{ matrix.worker }}" ]; then
export TESTFLAGS="${TESTFLAGS} --tags=nydus --run=//worker=${{ matrix.worker }}$"
fi
./hack/test ${{ matrix.typ }}
env:
BUILDKITD_TAGS: nydus
TESTPKGS: ${{ matrix.pkg }}
SKIP_INTEGRATION_TESTS: ${{ matrix.skip-integration-tests }}
CACHE_FROM: type=gha,scope=${{ env.CACHE_GHA_SCOPE_IT }} type=gha,scope=${{ env.CACHE_GHA_SCOPE_BINARIES }}

test-s3:
runs-on: ubuntu-20.04
needs:
Expand Down
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ARG CNI_VERSION=v1.1.0
ARG STARGZ_SNAPSHOTTER_VERSION=v0.12.0
ARG NERDCTL_VERSION=v0.17.1
ARG DNSNAME_VERSION=v1.3.1
ARG NYDUS_VERSION=v2.1.0

# ALPINE_VERSION sets version for the base layers
ARG ALPINE_VERSION=3.15
Expand Down Expand Up @@ -192,6 +193,14 @@ RUN --mount=target=/root/.cache,type=cache \
xx-verify --static /out/containerd-stargz-grpc && \
xx-verify --static /out/ctr-remote

FROM gobuild-base AS nydus
ARG NYDUS_VERSION
ARG TARGETOS
ARG TARGETARCH
SHELL ["/bin/bash", "-c"]
RUN wget https://github.com/dragonflyoss/image-service/releases/download/$NYDUS_VERSION/nydus-static-$NYDUS_VERSION-$TARGETOS-$TARGETARCH.tgz
RUN mkdir -p /out/nydus-static && tar xzvf nydus-static-$NYDUS_VERSION-$TARGETOS-$TARGETARCH.tgz -C /out

FROM buildkit-export AS buildkit-linux
COPY --link --from=binaries / /usr/bin/
ENTRYPOINT ["buildkitd"]
Expand Down Expand Up @@ -235,6 +244,7 @@ RUN apk add --no-cache shadow shadow-uidmap sudo vim iptables ip6tables dnsmasq
ENV BUILDKIT_INTEGRATION_CONTAINERD_EXTRA="containerd-1.4=/opt/containerd-alt-14/bin,containerd-1.5=/opt/containerd-alt-15/bin"
ENV BUILDKIT_INTEGRATION_SNAPSHOTTER=stargz
ENV CGO_ENABLED=0
COPY --link --from=nydus /out/nydus-static/* /usr/bin/
COPY --link --from=stargz-snapshotter /out/* /usr/bin/
COPY --link --from=rootlesskit /rootlesskit /usr/bin/
COPY --link --from=containerd-alt-14 /out/containerd* /opt/containerd-alt-14/bin/
Expand Down
2 changes: 1 addition & 1 deletion cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.WithStack(ErrNoBlobs)
}

compressorFunc, finalize := comp.Type.Compress(comp)
compressorFunc, finalize := comp.Type.Compress(ctx, comp)
mediaType := comp.Type.MediaType()

var lowerRef *immutableRef
Expand Down
9 changes: 6 additions & 3 deletions cache/blobs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper
if err != nil {
return emptyDesc, false, errors.Wrap(err, "failed to get compressed stream")
}
err = overlay.WriteUpperdir(ctx, io.MultiWriter(compressed, dgstr.Hash()), upperdir, lower)
compressed.Close()
if err != nil {
// Close ensure compressorFunc does some finalization works.
defer compressed.Close()
if err := overlay.WriteUpperdir(ctx, io.MultiWriter(compressed, dgstr.Hash()), upperdir, lower); err != nil {
return emptyDesc, false, errors.Wrap(err, "failed to write compressed diff")
}
if err := compressed.Close(); err != nil {
return emptyDesc, false, errors.Wrap(err, "failed to close compressed diff writer")
}
if labels == nil {
labels = map[string]string{}
}
Expand Down
16 changes: 16 additions & 0 deletions cache/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//go:build !nydus
// +build !nydus

package cache

import (
"context"

"github.com/containerd/containerd/content"
"github.com/moby/buildkit/cache/config"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
)

func needsForceCompression(ctx context.Context, cs content.Store, source ocispecs.Descriptor, refCfg config.RefConfig) bool {
return refCfg.Compression.Force
}
148 changes: 148 additions & 0 deletions cache/compression_nydus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
//go:build nydus
// +build nydus

package cache

import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/moby/buildkit/cache/config"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"

nydusify "github.com/containerd/nydus-snapshotter/pkg/converter"
)

func init() {
additionalAnnotations = append(
additionalAnnotations,
nydusify.LayerAnnotationNydusBlob, nydusify.LayerAnnotationNydusBootstrap, nydusify.LayerAnnotationNydusBlobIDs,
)
}

// Nydus compression type can't be mixed with other compression types in the same image,
// so if `source` is this kind of layer, but the target is other compression type, we
// should do the forced compression.
func needsForceCompression(ctx context.Context, cs content.Store, source ocispecs.Descriptor, refCfg config.RefConfig) bool {
if refCfg.Compression.Force {
return true
}
isNydusBlob, _ := compression.Nydus.Is(ctx, cs, source)
if refCfg.Compression.Type == compression.Nydus {
return !isNydusBlob
}
return isNydusBlob
}

// MergeNydus does two steps:
// 1. Extracts nydus bootstrap from nydus format (nydus blob + nydus bootstrap) for each layer.
// 2. Merge all nydus bootstraps into a final bootstrap (will as an extra layer).
// The nydus bootstrap size is very small, so the merge operation is fast.
func MergeNydus(ctx context.Context, ref ImmutableRef, comp compression.Config, s session.Group) (*ocispecs.Descriptor, error) {
iref, ok := ref.(*immutableRef)
if !ok {
return nil, fmt.Errorf("unsupported ref")
}
refs := iref.layerChain()
if len(refs) == 0 {
return nil, fmt.Errorf("refs can't be empty")
}

// Extracts nydus bootstrap from nydus format for each layer.
var cm *cacheManager
layers := []nydusify.Layer{}
blobIDs := []string{}
for _, ref := range refs {
blobDesc, err := getBlobWithCompressionWithRetry(ctx, ref, comp, s)
if err != nil {
return nil, errors.Wrapf(err, "get compression blob %q", comp.Type)
}
ra, err := ref.cm.ContentStore.ReaderAt(ctx, blobDesc)
if err != nil {
return nil, errors.Wrapf(err, "get reader for compression blob %q", comp.Type)
}
defer ra.Close()
if cm == nil {
cm = ref.cm
}
blobIDs = append(blobIDs, blobDesc.Digest.Hex())
layers = append(layers, nydusify.Layer{
Digest: blobDesc.Digest,
ReaderAt: ra,
})
}

// Merge all nydus bootstraps into a final nydus bootstrap.
pr, pw := io.Pipe()
go func() {
defer pw.Close()
if _, err := nydusify.Merge(ctx, layers, pw, nydusify.MergeOption{
WithTar: true,
}); err != nil {
pw.CloseWithError(errors.Wrapf(err, "merge nydus bootstrap"))
}
}()

// Compress final nydus bootstrap to tar.gz and write into content store.
cw, err := content.OpenWriter(ctx, cm.ContentStore, content.WithRef("nydus-merge-"+iref.getChainID().String()))
if err != nil {
return nil, errors.Wrap(err, "open content store writer")
}
defer cw.Close()

gw := gzip.NewWriter(cw)
uncompressedDgst := digest.SHA256.Digester()
compressed := io.MultiWriter(gw, uncompressedDgst.Hash())
if _, err := io.Copy(compressed, pr); err != nil {
return nil, errors.Wrapf(err, "copy bootstrap targz into content store")
}
if err := gw.Close(); err != nil {
return nil, errors.Wrap(err, "close gzip writer")
}

compressedDgst := cw.Digest()
if err := cw.Commit(ctx, 0, compressedDgst, content.WithLabels(map[string]string{
containerdUncompressed: uncompressedDgst.Digest().String(),
})); err != nil {
if !errdefs.IsAlreadyExists(err) {
return nil, errors.Wrap(err, "commit to content store")
}
}
if err := cw.Close(); err != nil {
return nil, errors.Wrap(err, "close content store writer")
}

info, err := cm.ContentStore.Info(ctx, compressedDgst)
if err != nil {
return nil, errors.Wrap(err, "get info from content store")
}

blobIDsBytes, err := json.Marshal(blobIDs)
if err != nil {
return nil, errors.Wrap(err, "marshal blob ids")
}

desc := ocispecs.Descriptor{
Digest: compressedDgst,
Size: info.Size,
MediaType: ocispecs.MediaTypeImageLayerGzip,
Annotations: map[string]string{
containerdUncompressed: uncompressedDgst.Digest().String(),
// Use this annotation to identify nydus bootstrap layer.
nydusify.LayerAnnotationNydusBootstrap: "true",
// Track all blob digests for nydus snapshotter.
nydusify.LayerAnnotationNydusBlobIDs: string(blobIDsBytes),
},
}

return &desc, nil
}
2 changes: 1 addition & 1 deletion cache/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func getConverter(ctx context.Context, cs content.Store, desc ocispecs.Descripto
}

c := conversion{target: comp}
c.compress, c.finalize = comp.Type.Compress(comp)
c.compress, c.finalize = comp.Type.Compress(ctx, comp)
c.decompress = from.Decompress

return (&c).convert, nil
Expand Down
4 changes: 3 additions & 1 deletion cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"golang.org/x/sync/errgroup"
)

var additionalAnnotations = append(compression.EStargzAnnotations, containerdUncompressed)

// Ref is a reference to cacheable objects.
type Ref interface {
Mountable
Expand Down Expand Up @@ -878,7 +880,7 @@ func filterAnnotationsForSave(a map[string]string) (b map[string]string) {
if a == nil {
return nil
}
for _, k := range append(compression.EStargzAnnotations, containerdUncompressed) {
for _, k := range additionalAnnotations {
v, ok := a[k]
if !ok {
continue
Expand Down
2 changes: 1 addition & 1 deletion cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refC
}
}

if refCfg.Compression.Force {
if needsForceCompression(ctx, sr.cm.ContentStore, desc, refCfg) {
if needs, err := refCfg.Compression.Type.NeedsConversion(ctx, sr.cm.ContentStore, desc); err != nil {
return nil, err
} else if needs {
Expand Down
Loading