Skip to content

Commit

Permalink
Implement RESTMapper by dynamic shared informer factory
Browse files Browse the repository at this point in the history
  • Loading branch information
astefanutti committed Nov 10, 2022
1 parent 5c123e3 commit 4149c73
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 142 deletions.
186 changes: 91 additions & 95 deletions pkg/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"sync/atomic"
"time"

openapi_v2 "github.com/google/gnostic/openapiv2"
kcpapiextensionsv1informers "github.com/kcp-dev/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
kcpcache "github.com/kcp-dev/apimachinery/pkg/cache"
kcpdynamic "github.com/kcp-dev/client-go/dynamic"
Expand All @@ -36,15 +35,15 @@ import (
"k8s.io/apiextensions-apiserver/pkg/apihelpers"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
"k8s.io/client-go/openapi"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -80,7 +79,8 @@ type DynamicDiscoverySharedInformerFactory struct {
informers map[schema.GroupVersionResource]kcpkubernetesinformers.GenericClusterInformer
startedInformers map[schema.GroupVersionResource]bool
informerStops map[schema.GroupVersionResource]chan struct{}
discoveryData []*metav1.APIResourceList
discoveryData discoveryData
restMapper restMapper

// Support subscribers (e.g. quota) that want to know when informers/discovery have changed.
subscribersLock sync.Mutex
Expand Down Expand Up @@ -120,6 +120,10 @@ func NewDynamicDiscoverySharedInformerFactory(
subscribers: make(map[string]chan<- struct{}),
}

f.restMapper = newRESTMapper(func() (meta.RESTMapper, error) {
return restmapper.NewDiscoveryRESTMapper(f.discoveryData.apiGroupResources), nil
})

f.handlers.Store([]GVREventHandler{})

// Add an index function that indexes a CRD by its group/version/resource.
Expand Down Expand Up @@ -548,6 +552,9 @@ func (d *DynamicDiscoverySharedInformerFactory) updateInformers() {
}

d.discoveryData = gvrsToDiscoveryData(latest)
d.restMapper = newRESTMapper(func() (meta.RESTMapper, error) {
return restmapper.NewDiscoveryRESTMapper(d.discoveryData.apiGroupResources), nil
})

d.subscribersLock.Lock()
defer d.subscribersLock.Unlock()
Expand All @@ -563,11 +570,15 @@ func (d *DynamicDiscoverySharedInformerFactory) updateInformers() {
}
}

// gvrsToDiscoveryData returns "fake"/simulated discovery data for all the resources covered by the factory. It only
// includes enough data in each APIResource to support what kcp currently needs (scheduling, placement, quota).
func gvrsToDiscoveryData(gvrs map[schema.GroupVersionResource]gvrPartialMetadata) []*metav1.APIResourceList {
var discoveryData []*metav1.APIResourceList
gvResources := make(map[schema.GroupVersion][]metav1.APIResource)
// gvrsToDiscoveryData returns discovery data for all the resources covered by the factory. It only
// includes enough data in each APIResource to support what kcp currently needs (scheduling, placement, quota, GC).
func gvrsToDiscoveryData(gvrs map[schema.GroupVersionResource]gvrPartialMetadata) discoveryData {
ret := discoveryData{
apiGroupResources: make([]*restmapper.APIGroupResources, 0),
apiResourceList: make([]*metav1.APIResourceList, 0),
}

gvResources := make(map[string]map[string][]metav1.APIResource)

for gvr, metadata := range gvrs {
apiResource := metav1.APIResource{
Expand All @@ -580,26 +591,56 @@ func gvrsToDiscoveryData(gvrs map[schema.GroupVersionResource]gvrPartialMetadata
// Everything we're informing on supports these
Verbs: []string{"create", "list", "watch", "delete"},
}
gv := gvr.GroupVersion()
gvResources[gv] = append(gvResources[gv], apiResource)
if gvResources[gvr.Group] == nil {
gvResources[gvr.Group] = make(map[string][]metav1.APIResource)
}
gvResources[gvr.Group][gvr.Version] = append(gvResources[gvr.Group][gvr.Version], apiResource)
}

for gv, resources := range gvResources {
sort.Slice(resources, func(i, j int) bool {
return resources[i].Name < resources[j].Name
})
for group, resources := range gvResources {
var versions []metav1.GroupVersionForDiscovery
versionedResources := make(map[string][]metav1.APIResource)

for version, apiResource := range resources {
versions = append(versions, metav1.GroupVersionForDiscovery{GroupVersion: group, Version: version})

sort.Slice(apiResource, func(i, j int) bool {
return apiResource[i].Name < apiResource[j].Name
})

versionedResources[version] = apiResource

apiResourceList := &metav1.APIResourceList{
GroupVersion: metav1.GroupVersion{Group: group, Version: version}.String(),
APIResources: apiResource,
}

ret.apiResourceList = append(ret.apiResourceList, apiResourceList)
}
apiGroup := metav1.APIGroup{
Name: group,
Versions: versions,
// We may want to fill the PreferredVersion based on the storage version,
// though it's not currently required by the kcp controllers that rely on
// the discovery data provided by the dynamic shared informer factory, e.g.,
// the quota and garbage collector controllers.
}

discoveryData = append(discoveryData, &metav1.APIResourceList{
GroupVersion: gv.String(),
APIResources: resources,
ret.apiGroupResources = append(ret.apiGroupResources, &restmapper.APIGroupResources{
Group: apiGroup,
VersionedResources: versionedResources,
})
}

sort.Slice(discoveryData, func(i, j int) bool {
return discoveryData[i].GroupVersion < discoveryData[j].GroupVersion
sort.Slice(ret.apiGroupResources, func(i, j int) bool {
return ret.apiGroupResources[i].Group.Name < ret.apiGroupResources[j].Group.Name
})

sort.Slice(ret.apiResourceList, func(i, j int) bool {
return ret.apiResourceList[i].GroupVersion < ret.apiResourceList[j].GroupVersion
})

return discoveryData
return ret
}

// Start starts any informers that have been created but not yet started. The passed in stop channel is ignored;
Expand Down Expand Up @@ -664,48 +705,18 @@ func (d *DynamicDiscoverySharedInformerFactory) Unsubscribe(id string) {
delete(d.subscribers, id)
}

func (d *DynamicDiscoverySharedInformerFactory) ServerGroups() (*metav1.APIGroupList, error) {
d.informersLock.RLock()
defer d.informersLock.RUnlock()

groups := make([]metav1.APIGroup, len(d.discoveryData))
failedGroups := make(map[schema.GroupVersion]error)

for i, apiResourceList := range d.discoveryData {
gv, err := schema.ParseGroupVersion(apiResourceList.GroupVersion)
if err != nil {
failedGroups[schema.GroupVersion{Group: apiResourceList.GroupVersion}] = err
continue
}
groups[i] = metav1.APIGroup{
Name: gv.Group,
Versions: []metav1.GroupVersionForDiscovery{
{
GroupVersion: gv.String(),
Version: gv.Version,
},
},
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: gv.String(),
Version: gv.Version,
},
}
}

if len(failedGroups) > 0 {
return &metav1.APIGroupList{Groups: groups}, &discovery.ErrGroupDiscoveryFailed{Groups: failedGroups}
}

return &metav1.APIGroupList{Groups: groups}, nil
type discoveryData struct {
apiGroupResources []*restmapper.APIGroupResources
apiResourceList []*metav1.APIResourceList
}

var _ discovery.DiscoveryInterface = &DynamicDiscoverySharedInformerFactory{}
var _ discovery.ServerResourcesInterface = &DynamicDiscoverySharedInformerFactory{}

func (d *DynamicDiscoverySharedInformerFactory) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
d.informersLock.RLock()
defer d.informersLock.RUnlock()

for _, apiResourceList := range d.discoveryData {
for _, apiResourceList := range d.discoveryData.apiResourceList {
if apiResourceList.GroupVersion == groupVersion {
return apiResourceList.DeepCopy(), nil
}
Expand All @@ -723,35 +734,14 @@ func (d *DynamicDiscoverySharedInformerFactory) ServerGroupsAndResources() ([]*m
d.informersLock.RLock()
defer d.informersLock.RUnlock()

retGroups := make([]*metav1.APIGroup, len(d.discoveryData))
failedGroups := make(map[schema.GroupVersion]error)
retResourceList := make([]*metav1.APIResourceList, len(d.discoveryData))

for i, apiResourceList := range d.discoveryData {
gv, err := schema.ParseGroupVersion(apiResourceList.GroupVersion)
if err != nil {
failedGroups[schema.GroupVersion{Group: apiResourceList.GroupVersion}] = err
continue
}
retGroups[i] = &metav1.APIGroup{
Name: gv.Group,
Versions: []metav1.GroupVersionForDiscovery{
{
GroupVersion: gv.String(),
Version: gv.Version,
},
},
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: gv.String(),
Version: gv.Version,
},
}

retResourceList[i] = apiResourceList.DeepCopy()
retGroups := make([]*metav1.APIGroup, len(d.discoveryData.apiGroupResources))
for i, apiGroupResources := range d.discoveryData.apiGroupResources {
retGroups[i] = apiGroupResources.Group.DeepCopy()
}

if len(failedGroups) > 0 {
return retGroups, retResourceList, &discovery.ErrGroupDiscoveryFailed{Groups: failedGroups}
retResourceList := make([]*metav1.APIResourceList, len(d.discoveryData.apiResourceList))
for i, apiResourceList := range d.discoveryData.apiResourceList {
retResourceList[i] = apiResourceList.DeepCopy()
}

return retGroups, retResourceList, nil
Expand All @@ -761,8 +751,8 @@ func (d *DynamicDiscoverySharedInformerFactory) ServerPreferredResources() ([]*m
d.informersLock.RLock()
defer d.informersLock.RUnlock()

ret := make([]*metav1.APIResourceList, len(d.discoveryData))
for i, apiResourceList := range d.discoveryData {
ret := make([]*metav1.APIResourceList, len(d.discoveryData.apiResourceList))
for i, apiResourceList := range d.discoveryData.apiResourceList {
ret[i] = apiResourceList.DeepCopy()
}

Expand All @@ -773,8 +763,8 @@ func (d *DynamicDiscoverySharedInformerFactory) ServerPreferredNamespacedResourc
d.informersLock.RLock()
defer d.informersLock.RUnlock()

ret := make([]*metav1.APIResourceList, len(d.discoveryData))
for i, apiResourceList := range d.discoveryData {
ret := make([]*metav1.APIResourceList, len(d.discoveryData.apiResourceList))
for i, apiResourceList := range d.discoveryData.apiResourceList {
namespacedResources := &metav1.APIResourceList{GroupVersion: apiResourceList.GroupVersion}
for _, resource := range apiResourceList.APIResources {
if resource.Namespaced {
Expand All @@ -787,18 +777,24 @@ func (d *DynamicDiscoverySharedInformerFactory) ServerPreferredNamespacedResourc
return ret, nil
}

func (d *DynamicDiscoverySharedInformerFactory) RESTClient() rest.Interface {
panic("unsupported operation")
func (d *DynamicDiscoverySharedInformerFactory) RESTMapper() meta.ResettableRESTMapper {
return &d.restMapper
}

func (d *DynamicDiscoverySharedInformerFactory) ServerVersion() (*version.Info, error) {
panic("unsupported operation")
func newRESTMapper(fn func() (meta.RESTMapper, error)) restMapper {
return restMapper{
meta.NewLazyRESTMapperLoader(fn),
}
}

func (d *DynamicDiscoverySharedInformerFactory) OpenAPISchema() (*openapi_v2.Document, error) {
panic("unsupported operation")
type restMapper struct {
meta.RESTMapper
}

func (d *DynamicDiscoverySharedInformerFactory) OpenAPIV3() openapi.Client {
panic("unsupported operation")
func (r *restMapper) Reset() {
// NOOP: this is called by the Kubernetes garbage collector controller, that assumes discovery
// is refreshed periodically. As this shared informer factory pushes events whenever discovery
// changes, there is no need to reset the REST mapper during the periodic re-sync of the GC monitors.
}

var _ meta.ResettableRESTMapper = &restMapper{}
Loading

0 comments on commit 4149c73

Please sign in to comment.