Skip to content

Commit

Permalink
Provide layers referenced by an image stream as layers subresource
Browse files Browse the repository at this point in the history
Adds a new GET endpoint to an image stream as a subresource `layers`
that returns an array of every layer referenced by the image stream and
the tags and images included by the image.

The subresource is fed by a store driven informer that caches and
indexes only the layers. Clients get a 500 retry error if the cache has
not initialized yet (the client will silently retry).

Turns the registry access check for a given layer into an O(1) check
instead of O(N) where N is the number of images in the image stream.
  • Loading branch information
smarterclayton committed Jun 12, 2018
1 parent cbdaeb2 commit fbe0541
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pkg/cmd/server/origin/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func LegacyStorage(storage map[schema.GroupVersion]map[string]rest.Storage) map[

case *imagestreametcd.REST:
legacyStorage[resource] = &imagestreametcd.LegacyREST{REST: storage}
case *imagestreametcd.LayersREST:
delete(legacyStorage, resource)

case *routeetcd.REST:
store := *storage.Store
Expand Down
16 changes: 15 additions & 1 deletion pkg/image/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type ExtraConfig struct {
makeV1Storage sync.Once
v1Storage map[string]rest.Storage
v1StorageErr error
startFns []func(<-chan struct{})
}

type ImageAPIServerConfig struct {
Expand Down Expand Up @@ -107,6 +108,15 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, err
}

if err := s.GenericAPIServer.AddPostStartHook("image.openshift.io-apiserver-caches", func(context genericapiserver.PostStartHookContext) error {
for _, fn := range c.ExtraConfig.startFns {
go fn(context.StopCh)
}
return nil
}); err != nil {
return nil, err
}

return s, nil
}

Expand Down Expand Up @@ -168,10 +178,13 @@ func (c *completedConfig) newV1RESTStorage() (map[string]rest.Storage, error) {
whitelister = whitelist.WhitelistAllRegistries()
}

imageLayerIndex := imagestreametcd.NewImageLayerIndex(imageStorage)
c.ExtraConfig.startFns = append(c.ExtraConfig.startFns, imageLayerIndex.Run)

imageRegistry := image.NewRegistry(imageStorage)
imageSignatureStorage := imagesignature.NewREST(imageClient.Image())
imageStreamSecretsStorage := imagesecret.NewREST(coreClient)
imageStreamStorage, imageStreamStatusStorage, internalImageStreamStorage, err := imagestreametcd.NewREST(c.GenericConfig.RESTOptionsGetter, c.ExtraConfig.RegistryHostnameRetriever, authorizationClient.SubjectAccessReviews(), c.ExtraConfig.LimitVerifier, whitelister)
imageStreamStorage, imageStreamLayersStorage, imageStreamStatusStorage, internalImageStreamStorage, err := imagestreametcd.NewREST(c.GenericConfig.RESTOptionsGetter, c.ExtraConfig.RegistryHostnameRetriever, authorizationClient.SubjectAccessReviews(), c.ExtraConfig.LimitVerifier, whitelister, imageLayerIndex)
if err != nil {
return nil, fmt.Errorf("error building REST storage: %v", err)
}
Expand Down Expand Up @@ -206,6 +219,7 @@ func (c *completedConfig) newV1RESTStorage() (map[string]rest.Storage, error) {
v1Storage["imagesignatures"] = imageSignatureStorage
v1Storage["imageStreams/secrets"] = imageStreamSecretsStorage
v1Storage["imageStreams"] = imageStreamStorage
v1Storage["imageStreams/layers"] = imageStreamLayersStorage
v1Storage["imageStreams/status"] = imageStreamStatusStorage
v1Storage["imageStreamImports"] = imageStreamImportStorage
v1Storage["imageStreamImages"] = imageStreamImageStorage
Expand Down
120 changes: 117 additions & 3 deletions pkg/image/registry/imagestream/etcd/etcd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package etcd

import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
Expand Down Expand Up @@ -46,7 +47,8 @@ func NewREST(
subjectAccessReviewRegistry authorizationclient.SubjectAccessReviewInterface,
limitVerifier imageadmission.LimitVerifier,
registryWhitelister whitelist.RegistryWhitelister,
) (*REST, *StatusREST, *InternalREST, error) {
imageLayerIndex ImageLayerIndex,
) (*REST, *LayersREST, *StatusREST, *InternalREST, error) {
store := registry.Store{
NewFunc: func() runtime.Object { return &imageapi.ImageStream{} },
NewListFunc: func() runtime.Object { return &imageapi.ImageStreamList{} },
Expand All @@ -71,9 +73,11 @@ func NewREST(
AttrFunc: storage.AttrFunc(storage.DefaultNamespaceScopedAttr).WithFieldMutation(imageapi.ImageStreamSelector),
}
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

layersREST := &LayersREST{index: imageLayerIndex, store: &store}

statusStrategy := imagestream.NewStatusStrategy(strategy)
statusStore := store
statusStore.Decorator = nil
Expand All @@ -88,7 +92,7 @@ func NewREST(
internalStore.UpdateStrategy = internalStrategy

internalREST := &InternalREST{store: &internalStore}
return rest, statusREST, internalREST, nil
return rest, layersREST, statusREST, internalREST, nil
}

// StatusREST implements the REST endpoint for changing the status of an image stream.
Expand Down Expand Up @@ -138,6 +142,116 @@ func (r *InternalREST) Update(ctx apirequest.Context, name string, objInfo rest.
return r.store.Update(ctx, name, objInfo, createValidation, updateValidation)
}

// LayersREST implements the REST endpoint for changing both the spec and status of an image stream.
type LayersREST struct {
store *registry.Store
index ImageLayerIndex
}

var _ rest.Getter = &LayersREST{}

func (r *LayersREST) New() runtime.Object {
return &imageapi.ImageStreamLayers{}
}

// Get returns the layers for an image stream.
func (r *LayersREST) Get(ctx apirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
if !r.index.HasSynced() {
return nil, errors.NewServerTimeout(r.store.DefaultQualifiedResource, "get", 2)
}
obj, err := r.store.Get(ctx, name, options)
if err != nil {
return nil, err
}
is := obj.(*imageapi.ImageStream)
isl := &imageapi.ImageStreamLayers{
ObjectMeta: is.ObjectMeta,
}

existing, existingBlobs := make(map[string]int), make(map[string]int)
existingImage, existingImageBlobs := make(map[string][]int), make(map[string][]int)
for tag, status := range is.Status.Tags {
var seen map[string]struct{}
for _, item := range status.Items {
if len(item.Image) == 0 {
continue
}
if _, ok := seen[item.Image]; ok {
continue
}
if len(status.Items) > 1 {
if seen == nil {
seen = make(map[string]struct{})
}
seen[item.Image] = struct{}{}
}
if indices, ok := existingImage[item.Image]; ok {
for _, index := range indices {
ref := &isl.Layers[index]
ref.Tags = append(ref.Tags, tag)
}
if indices, ok := existingImageBlobs[item.Image]; ok {
for _, index := range indices {
ref := &isl.Blobs[index]
ref.Tags = append(ref.Tags, tag)
}
}
continue
}

obj, _, _ := r.index.GetIndexer().GetByKey(item.Image)
entry, ok := obj.(*ImageLayers)
if !ok {
continue
}
indices := make([]int, 0, len(entry.Layers))
for _, layer := range entry.Layers {
if index, ok := existing[layer.Name]; ok {
ref := &isl.Layers[index]
ref.Tags = append(ref.Tags, tag)
ref.ImageIDs = append(ref.ImageIDs, item.Image)
indices = append(indices, index)
continue
}
index := len(isl.Layers)
existing[layer.Name] = index
indices = append(indices, index)
isl.Layers = append(isl.Layers, imageapi.ImageLayerReference{
Name: layer.Name,
LayerSize: layer.LayerSize,
MediaType: layer.MediaType,
Tags: []string{tag},
ImageIDs: []string{item.Image},
})
}
existingImage[item.Image] = indices

blobIndices := make([]int, 0, 1)
if layer := entry.Manifest; layer != nil {
if index, ok := existingBlobs[layer.Name]; ok {
ref := &isl.Blobs[index]
ref.Tags = append(ref.Tags, tag)
ref.ImageIDs = append(ref.ImageIDs, item.Image)
blobIndices = append(blobIndices, index)
continue
}
index := len(isl.Blobs)
existingBlobs[layer.Name] = index
blobIndices = append(blobIndices, index)
isl.Blobs = append(isl.Blobs, imageapi.ImageLayerReference{
Name: layer.Name,
LayerSize: layer.LayerSize,
MediaType: layer.MediaType,
Tags: []string{tag},
ImageIDs: []string{item.Image},
})
}
existingImageBlobs[item.Image] = blobIndices
}
}
return isl, nil
}

// LegacyREST allows us to wrap and alter some behavior
type LegacyREST struct {
*REST
Expand Down
159 changes: 159 additions & 0 deletions pkg/image/registry/imagestream/etcd/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package etcd

import (
"fmt"

metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/tools/cache"

imageapi "github.com/openshift/origin/pkg/image/apis/image"
)

type ImageLayerIndex interface {
cache.SharedIndexInformer
}

type ImageStore interface {
rest.Watcher
rest.Lister
}

func NewImageLayerIndex(store ImageStore) ImageLayerIndex {
ctx := apirequest.NewContext()
informer := cache.NewSharedIndexInformer(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
obj, err := store.List(ctx, &metainternalversion.ListOptions{
ResourceVersion: options.ResourceVersion,
})
if err != nil {
return nil, err
}
list, ok := obj.(*imageapi.ImageList)
if !ok {
return nil, fmt.Errorf("unexpected store type %T for layer index", obj)
}
out := &metainternalversion.List{
Items: make([]runtime.Object, len(list.Items)),
}
for i, image := range list.Items {
out.Items[i] = &ImageLayers{
Name: image.Name,
Layers: image.DockerImageLayers,
Manifest: manifestFromImage(&image),
}
}
return out, nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
w, err := store.Watch(ctx, &metainternalversion.ListOptions{
ResourceVersion: options.ResourceVersion,
})
if err != nil {
return nil, err
}
return watch.Filter(w, func(in watch.Event) (out watch.Event, keep bool) {
if in.Object == nil {
return in, true
}
image, ok := in.Object.(*imageapi.Image)
if !ok {
return in, true
}
in.Object = &ImageLayers{
Name: image.Name,
Layers: image.DockerImageLayers,
Manifest: manifestFromImage(image),
}
return in, true
}), nil
},
}, &ImageLayers{}, 0, cache.Indexers{
"layers": func(obj interface{}) ([]string, error) {
entry, ok := obj.(*ImageLayers)
if !ok {
return nil, fmt.Errorf("unexpected cache object %T", obj)
}
keys := make([]string, 0, len(entry.Layers))
for _, layer := range entry.Layers {
keys = append(keys, layer.Name)
}
return keys, nil
},
})
return informer
}

// manifestFromImage attempts to find a manifest blob description from
// an image. Images older than schema2 in Docker do not have a manifest blob.
func manifestFromImage(image *imageapi.Image) *imageapi.ImageLayer {
if image.DockerImageManifestMediaType != "application/vnd.docker.distribution.manifest.v2+json" {
return nil
}
return &imageapi.ImageLayer{
Name: image.DockerImageMetadata.ID,
MediaType: image.DockerImageManifestMediaType,
}
}

type ImageLayers struct {
Name string
Manifest *imageapi.ImageLayer
Layers []imageapi.ImageLayer
}

var _ metav1.Object = &ImageLayers{}

func (l *ImageLayers) GetObjectKind() schema.ObjectKind { return &metav1.TypeMeta{} }
func (l *ImageLayers) DeepCopyObject() runtime.Object {
var layers []imageapi.ImageLayer
if l.Layers != nil {
layers = make([]imageapi.ImageLayer, len(l.Layers))
copy(layers, l.Layers)
}
return &ImageLayers{
Name: l.Name,
Layers: layers,
}
}

// TODO: fix client-go/cache to not need this

func (l *ImageLayers) GetNamespace() string { return "" }
func (l *ImageLayers) SetNamespace(namespace string) {}
func (l *ImageLayers) GetName() string { return l.Name }
func (l *ImageLayers) SetName(name string) {}
func (l *ImageLayers) GetGenerateName() string { return "" }
func (l *ImageLayers) SetGenerateName(name string) {}
func (l *ImageLayers) GetUID() types.UID { return "" }
func (l *ImageLayers) SetUID(uid types.UID) {}
func (l *ImageLayers) GetResourceVersion() string { return "" }
func (l *ImageLayers) SetResourceVersion(version string) {}
func (l *ImageLayers) GetGeneration() int64 { return 0 }
func (l *ImageLayers) SetGeneration(generation int64) {}
func (l *ImageLayers) GetSelfLink() string { return "" }
func (l *ImageLayers) SetSelfLink(selfLink string) {}
func (l *ImageLayers) GetCreationTimestamp() metav1.Time { return metav1.Time{} }
func (l *ImageLayers) SetCreationTimestamp(timestamp metav1.Time) {}
func (l *ImageLayers) GetDeletionTimestamp() *metav1.Time { return nil }
func (l *ImageLayers) SetDeletionTimestamp(timestamp *metav1.Time) {}
func (l *ImageLayers) GetDeletionGracePeriodSeconds() *int64 { return nil }
func (l *ImageLayers) SetDeletionGracePeriodSeconds(*int64) {}
func (l *ImageLayers) GetLabels() map[string]string { return nil }
func (l *ImageLayers) SetLabels(labels map[string]string) {}
func (l *ImageLayers) GetAnnotations() map[string]string { return nil }
func (l *ImageLayers) SetAnnotations(annotations map[string]string) {}
func (l *ImageLayers) GetInitializers() *metav1.Initializers { return nil }
func (l *ImageLayers) SetInitializers(initializers *metav1.Initializers) {}
func (l *ImageLayers) GetFinalizers() []string { return nil }
func (l *ImageLayers) SetFinalizers(finalizers []string) {}
func (l *ImageLayers) GetOwnerReferences() []metav1.OwnerReference { return nil }
func (l *ImageLayers) SetOwnerReferences([]metav1.OwnerReference) {}
func (l *ImageLayers) GetClusterName() string { return "" }
func (l *ImageLayers) SetClusterName(clusterName string) {}

0 comments on commit fbe0541

Please sign in to comment.