Skip to content

Commit

Permalink
feat: unpin images from unknown oci bundles
Browse files Browse the repository at this point in the history
after importing all bundles from the oci directory we go through all
containerd images and unpin all images that were imported from bundles
that no longer exist on the system.

we start now to add the following label when importing:

- k0sproject.ocibundle.paths

on this label we keep a list of all oci bundles that contain such image
and we only unpin it if all the oci bundles were removed from the disk.

as now we are unpinning images based on their presence on disk we start
to operate also on rename and remove fs events.

this commit also address an issue where we could leave a containerd
connection opened.
  • Loading branch information
ricardomaraschini committed May 27, 2024
1 parent c924c1f commit 01c203e
Show file tree
Hide file tree
Showing 2 changed files with 338 additions and 20 deletions.
205 changes: 185 additions & 20 deletions pkg/component/worker/ocibundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package worker

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand All @@ -26,6 +27,7 @@ import (

"github.com/avast/retry-go"
"github.com/containerd/containerd"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
Expand All @@ -38,6 +40,12 @@ import (
"github.com/k0sproject/k0s/pkg/debounce"
)

const (
// Follows a list of labels we use to control imported images.
ImagePinnedLabel = "io.cri-containerd.pinned"
ImageSourcePathsLabel = "k0sproject.ocibundle.paths"
)

// OCIBundleReconciler tries to import OCI bundle into the running containerd instance
type OCIBundleReconciler struct {
k0sVars *config.CfgVars
Expand Down Expand Up @@ -66,8 +74,8 @@ func (a *OCIBundleReconciler) Init(_ context.Context) error {
return dir.Init(a.k0sVars.OCIBundleDir, constant.ManifestsDirMode)
}

// loadOne connects to containerd and imports the provided OCI bundle.
func (a *OCIBundleReconciler) loadOne(ctx context.Context, fpath string) error {
// containerdClient returns a connected containerd client.
func (a *OCIBundleReconciler) containerdClient(ctx context.Context) (*containerd.Client, error) {
var client *containerd.Client
sock := filepath.Join(a.k0sVars.RunDir, "containerd.sock")
if err := retry.Do(func() (err error) {
Expand All @@ -86,10 +94,23 @@ func (a *OCIBundleReconciler) loadOne(ctx context.Context, fpath string) error {
}
return nil
}, retry.Context(ctx), retry.Delay(time.Second*5)); err != nil {
return err
if client == nil {
return nil, err
}
_ = client.Close()
return nil, err
}
return client, nil
}

// loadOne connects to containerd and imports the provided OCI bundle.
func (a *OCIBundleReconciler) loadOne(ctx context.Context, fpath string, modtime time.Time) error {
client, err := a.containerdClient(ctx)
if err != nil {
return fmt.Errorf("failed to create containerd client: %w", err)
}
defer client.Close()
if err := a.unpackBundle(ctx, client, fpath); err != nil {
if err := a.unpackBundle(ctx, client, fpath, modtime); err != nil {
return fmt.Errorf("failed to process OCI bundle: %w", err)
}
return nil
Expand Down Expand Up @@ -127,17 +148,83 @@ func (a *OCIBundleReconciler) loadAll(ctx context.Context) {
}

a.log.Infof("Loading OCI bundle %s", fpath)
if err := a.loadOne(ctx, fpath); err != nil {
if err := a.loadOne(ctx, fpath, modtime); err != nil {
a.log.WithError(err).Errorf("Failed to load OCI bundle %s", fpath)
continue
}

a.alreadyImported[fpath] = modtime
a.log.Infof("OCI bundle %s loaded", fpath)
}

if err := a.unpinAll(ctx); err != nil {
a.log.WithError(err).Errorf("Failed to unpin images")
}

a.Emit("finished importing OCI bundles")
}

// unpin unpins containerd images from the image store. we unpin an image if
// the file from where it was imported no longer exists or the file content
// has been changed.
func (a *OCIBundleReconciler) unpinAll(ctx context.Context) error {
client, err := a.containerdClient(ctx)
if err != nil {
return fmt.Errorf("failed to create containerd client: %w", err)
}
defer client.Close()

isvc := client.ImageService()
images, err := isvc.List(ctx)
if err != nil {
return fmt.Errorf("failed to list images")
}

for _, image := range images {
if err := a.unpinOne(ctx, image, isvc); err != nil {
return fmt.Errorf("failed to unpin %s: %w", image.Name, err)
}
}
return nil
}

// unpinOne checks if we can unpin the provided image and if so unpins it.
func (a *OCIBundleReconciler) unpinOne(ctx context.Context, image images.Image, isvc images.Store) error {
// if this image isn't pinned, return immediately.
if v, pin := image.Labels[ImagePinnedLabel]; !pin || v != "pinned" {
return nil
}

// extract the bundle paths from the image labels. if none has been found
// then we don't own this image. return.
sources, err := GetImageSources(image)
if err != nil {
return fmt.Errorf("failed to extract image source: %w", err)
} else if len(sources) == 0 {
return nil
}

// if any of the registered sources is still present, we can't unpin the image.
// we just update the image label to remove references to the bundles that no
// longer exist.
if sources.Exist() {
sources.Refresh()
if err := SetImageSources(&image, sources); err != nil {
return fmt.Errorf("failed to reset image sources: %w", err)
}
_, err := isvc.Update(ctx, image, fmt.Sprintf("labels.%s", ImageSourcePathsLabel))
return err
}

// all bundles referred by this image are no more, we can unpin it.
a.log.Infof("Unpinning image %s", image.Name)
a.EmitWithPayload("unpinning image", image.Name)
delete(image.Labels, ImagePinnedLabel)
delete(image.Labels, ImageSourcePathsLabel)
_, err = isvc.Update(ctx, image)
return err
}

// installWatcher creates a fs watcher on the oci bundle directory. This function calls
// loadAll every time a new file is created or updated on the oci directory. Events are
// debounced with a timeout of 10 seconds. Watcher is started with a buffer so we don't
Expand All @@ -155,13 +242,6 @@ func (a *OCIBundleReconciler) installWatcher(ctx context.Context) error {
debouncer := debounce.Debouncer[fsnotify.Event]{
Input: watcher.Events,
Timeout: 10 * time.Second,
Filter: func(item fsnotify.Event) bool {
switch item.Op {
case fsnotify.Remove, fsnotify.Rename:
return false
}
return true
},
Callback: func(ev fsnotify.Event) {
a.loadAll(ctx)
},
Expand Down Expand Up @@ -202,28 +282,42 @@ func (a *OCIBundleReconciler) Start(ctx context.Context) error {
return nil
}

func (a *OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string) error {
// unpackBundle imports the bundle into the containerd storage. imported images are
// pinned and labeled so we can control them later.
func (a *OCIBundleReconciler) unpackBundle(ctx context.Context, client *containerd.Client, bundlePath string, modtime time.Time) error {
r, err := os.Open(bundlePath)
if err != nil {
return fmt.Errorf("can't open bundle file %s: %w", bundlePath, err)
}
defer r.Close()

images, err := client.Import(ctx, r)
if err != nil {
return fmt.Errorf("can't import bundle: %w", err)
}
is := client.ImageService()

fieldpaths := []string{
fmt.Sprintf("labels.%s", ImagePinnedLabel),
fmt.Sprintf("labels.%s", ImageSourcePathsLabel),
}

isvc := client.ImageService()
for _, i := range images {
// here we add a label to pin the image in the containerd storage and another
// to indicate from which oci buncle (file path) the image was imported from.
a.log.Infof("Imported image %s", i.Name)
// Update labels for each image to include io.cri-containerd.pinned=pinned
fieldpaths := []string{"labels.io.cri-containerd.pinned"}

if i.Labels == nil {
i.Labels = make(map[string]string)
}
i.Labels["io.cri-containerd.pinned"] = "pinned"
_, err := is.Update(ctx, i, fieldpaths...)
if err != nil {
return fmt.Errorf("failed to add io.cri-containerd.pinned label for image %s: %w", i.Name, err)

i.Labels[ImagePinnedLabel] = "pinned"
if err := AddToImageSources(&i, bundlePath, modtime); err != nil {
return fmt.Errorf("failed to add image source: %w", err)
}

if _, err := isvc.Update(ctx, i, fieldpaths...); err != nil {
return fmt.Errorf("failed to add labels for image %s: %w", i.Name, err)
}
}
return nil
Expand All @@ -236,3 +330,74 @@ func (a *OCIBundleReconciler) Stop() error {
a.log.Info("OCI bundle loader stopped")
return nil
}

// ImageSources holds a map of bundle paths with their respective modification times.
// this is used to track from which bundles a given image was imported.
type ImageSources map[string]time.Time

// Refresh removes from the list of source paths all the paths that no longer exists
// or have been modified.
func (i *ImageSources) Refresh() {
newmap := map[string]time.Time{}
for path, modtime := range *i {
finfo, err := os.Stat(path)
if err == nil && finfo.ModTime().Equal(modtime) {
newmap[path] = modtime
}
}
*i = newmap
}

// Exist returns true if a given bundle source file still exists in the node fs.
func (i *ImageSources) Exist() bool {
for path, modtime := range *i {
finfo, err := os.Stat(path)
if err == nil && finfo.ModTime().Equal(modtime) {
return true
}
}
return false
}

// GetImageSources parses the image source label and returns the ImageSources. if
// no label has been set in the image this returns an empty but initiated map.
func GetImageSources(image images.Image) (ImageSources, error) {
paths := map[string]time.Time{}
value, found := image.Labels[ImageSourcePathsLabel]
if !found {
return paths, nil
}
if err := json.Unmarshal([]byte(value), &paths); err != nil {
return nil, fmt.Errorf("failed to unmarshal label: %w", err)
}
return paths, nil
}

// SetImageSources sets the image source label in the image. this function will
// trim out of the sources the ones that no longer exists in the node fs.
func SetImageSources(image *images.Image, sources ImageSources) error {
if len(sources) == 0 {
return nil
}
sources.Refresh()
data, err := json.Marshal(sources)
if err != nil {
return fmt.Errorf("failed to marshal image source: %w", err)
}
if image.Labels == nil {
image.Labels = map[string]string{}
}
image.Labels[ImageSourcePathsLabel] = string(data)
return nil
}

// AddToImageSources adds a new source path to the image sources. this function
// will trim out of the sources the ones that no longer exists in the node fs.
func AddToImageSources(image *images.Image, path string, modtime time.Time) error {
paths, err := GetImageSources(*image)
if err != nil {
return fmt.Errorf("failed to get image sources: %w", err)
}
paths[path] = modtime
return SetImageSources(image, paths)
}
Loading

0 comments on commit 01c203e

Please sign in to comment.