diff --git a/Makefile b/Makefile index 5975ae93a..499515c52 100644 --- a/Makefile +++ b/Makefile @@ -94,7 +94,7 @@ apiserver: .PHONY: binding-apiserver binding-apiserver: CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \ - -ldflags $(LDFLAGS) \ + -ldflags "$(LDFLAGS)" \ -o bin/binding-apiserver \ cmd/binding-apiserver/main.go diff --git a/cmd/apiserver/app/apiserver.go b/cmd/apiserver/app/apiserver.go index 541b2a363..9dd16a869 100644 --- a/cmd/apiserver/app/apiserver.go +++ b/cmd/apiserver/app/apiserver.go @@ -32,7 +32,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command { } cliflag.PrintFlags(cmd.Flags()) - config, err := opts.Config(false) + config, err := opts.Config() if err != nil { return err } diff --git a/cmd/apiserver/app/options/options.go b/cmd/apiserver/app/options/options.go index 326b1ce84..c133326a7 100644 --- a/cmd/apiserver/app/options/options.go +++ b/cmd/apiserver/app/options/options.go @@ -75,7 +75,7 @@ func (o *ClusterPediaServerOptions) Validate() error { return utilerrors.NewAggregate(errors) } -func (o *ClusterPediaServerOptions) Config(bindingSyncController bool) (*apiserver.Config, error) { +func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) { if err := o.Validate(); err != nil { return nil, err } @@ -109,9 +109,8 @@ func (o *ClusterPediaServerOptions) Config(bindingSyncController bool) (*apiserv } return &apiserver.Config{ - GenericConfig: genericConfig, - StorageFactory: storage, - BindingSyncController: bindingSyncController, + GenericConfig: genericConfig, + StorageFactory: storage, }, nil } diff --git a/cmd/binding-apiserver/app/binding_apiserver.go b/cmd/binding-apiserver/app/binding_apiserver.go index 1b8ed1e55..5701a83b5 100644 --- a/cmd/binding-apiserver/app/binding_apiserver.go +++ b/cmd/binding-apiserver/app/binding_apiserver.go @@ -2,6 +2,7 @@ package app import ( "context" + "fmt" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/util/runtime" @@ -13,6 +14,8 @@ import ( "k8s.io/component-base/term" "github.com/clusterpedia-io/clusterpedia/cmd/apiserver/app/options" + "github.com/clusterpedia-io/clusterpedia/pkg/generated/clientset/versioned" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager" clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" "github.com/clusterpedia-io/clusterpedia/pkg/version/verflag" ) @@ -32,12 +35,28 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command { } cliflag.PrintFlags(cmd.Flags()) - config, err := opts.Config(true) + config, err := opts.Config() if err != nil { return err } - server, err := config.Complete().New() + completedConfig := config.Complete() + if completedConfig.ClientConfig == nil { + return fmt.Errorf("CompletedConfig.New() called with config.ClientConfig == nil") + } + if completedConfig.StorageFactory == nil { + return fmt.Errorf("CompletedConfig.New() called with config.StorageFactory == nil") + } + + crdclient, err := versioned.NewForConfig(completedConfig.ClientConfig) + if err != nil { + return err + } + + synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory) + go synchromanager.Run(1, ctx.Done()) + + server, err := completedConfig.New() if err != nil { return err } diff --git a/deploy/binding-apiserver/clusterpedia_binding_apiserver_rbac.yaml b/deploy/binding-apiserver/clusterpedia_binding_apiserver_rbac.yaml new file mode 100644 index 000000000..3ab2ec2bc --- /dev/null +++ b/deploy/binding-apiserver/clusterpedia_binding_apiserver_rbac.yaml @@ -0,0 +1,24 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: clusterpedia +rules: + - apiGroups: ['*'] + resources: ['*'] + verbs: ["*"] + - nonResourceURLs: ['*'] + verbs: ["get"] + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: clusterpedia +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: clusterpedia +subjects: + - kind: ServiceAccount + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system diff --git a/deploy/clusterpedia_apiserver_rbac.yaml b/deploy/clusterpedia_apiserver_rbac.yaml index caa6735b8..c5e40237e 100644 --- a/deploy/clusterpedia_apiserver_rbac.yaml +++ b/deploy/clusterpedia_apiserver_rbac.yaml @@ -28,6 +28,3 @@ subjects: - kind: ServiceAccount name: clusterpedia-controller-manager namespace: clusterpedia-system - - kind: ServiceAccount - name: clusterpedia-binding-apiserver - namespace: clusterpedia-system \ No newline at end of file diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 41bc564d9..04ddcd7b1 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -2,7 +2,6 @@ package apiserver import ( "context" - "fmt" "net/http" metainternal "k8s.io/apimachinery/pkg/apis/meta/internalversion" @@ -26,7 +25,6 @@ import ( informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver" "github.com/clusterpedia-io/clusterpedia/pkg/storage" - "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager" "github.com/clusterpedia-io/clusterpedia/pkg/utils/filters" ) @@ -65,8 +63,6 @@ type Config struct { GenericConfig *genericapiserver.RecommendedConfig StorageFactory storage.StorageFactory - // BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager - BindingSyncController bool } type ClusterPediaServer struct { @@ -78,8 +74,6 @@ type completedConfig struct { ClientConfig *clientrest.Config StorageFactory storage.StorageFactory - // BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager - BindingSyncController bool } // CompletedConfig embeds a private pointer that cannot be instantiated outside of this package. @@ -93,7 +87,6 @@ func (cfg *Config) Complete() CompletedConfig { cfg.GenericConfig.Complete(), cfg.GenericConfig.ClientConfig, cfg.StorageFactory, - cfg.BindingSyncController, } c.GenericConfig.Version = &version.Info{ @@ -105,13 +98,6 @@ func (cfg *Config) Complete() CompletedConfig { } func (config completedConfig) New() (*ClusterPediaServer, error) { - if config.ClientConfig == nil { - return nil, fmt.Errorf("CompletedConfig.New() called with config.ClientConfig == nil") - } - if config.StorageFactory == nil { - return nil, fmt.Errorf("CompletedConfig.New() called with config.StorageFactory == nil") - } - discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig) if err != nil { return nil, err @@ -167,11 +153,6 @@ func (config completedConfig) New() (*ClusterPediaServer, error) { clusterpediaInformerFactory.Start(context.StopCh) clusterpediaInformerFactory.WaitForCacheSync(context.StopCh) - if config.BindingSyncController { - synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory) - synchromanager.Run(1, context.StopCh) - } - return nil }) diff --git a/pkg/kubeapiserver/clusterresource_controller.go b/pkg/kubeapiserver/clusterresource_controller.go index 8f23b1796..b974851db 100644 --- a/pkg/kubeapiserver/clusterresource_controller.go +++ b/pkg/kubeapiserver/clusterresource_controller.go @@ -83,7 +83,7 @@ func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alp return } - discoveryapis := c.restManager.LoadResources(resources, cluster.Name) + discoveryapis := c.restManager.LoadResources(resources) c.discoveryManager.SetClusterGroupResource(cluster.Name, discoveryapis) c.clusterresources[cluster.Name] = resources diff --git a/pkg/kubeapiserver/resourcerest/storage.go b/pkg/kubeapiserver/resourcerest/storage.go index 220a43570..8420f665d 100644 --- a/pkg/kubeapiserver/resourcerest/storage.go +++ b/pkg/kubeapiserver/resourcerest/storage.go @@ -21,10 +21,8 @@ import ( "github.com/clusterpedia-io/api/clusterpedia/v1beta1" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/printers" "github.com/clusterpedia-io/clusterpedia/pkg/storage" - cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" "github.com/clusterpedia-io/clusterpedia/pkg/utils/negotiation" "github.com/clusterpedia-io/clusterpedia/pkg/utils/request" - utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) type RESTStorage struct { @@ -127,58 +125,5 @@ func (s *RESTStorage) ConvertToTable(ctx context.Context, object runtime.Object, } func (s *RESTStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { - resourceversion := options.ResourceVersion - watchRV, err := cache.NewClusterResourceVersionFromString(resourceversion) - if err != nil { - // To match the uncached watch implementation, once we have passed authn/authz/admission, - // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, - // rather than a directly returned error. - return newErrWatcher(err), nil - } - - watcher := cache.NewCacheWatcher(100) - watchCache := s.Storage.GetStorageConfig().WatchCache - watchCache.Lock() - defer watchCache.Unlock() - - initEvents, err := watchCache.GetAllEventsSinceThreadUnsafe(watchRV) - if err != nil { - // To match the uncached watch implementation, once we have passed authn/authz/admission, - // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, - // rather than a directly returned error. - return newErrWatcher(err), nil - } - - func() { - watchCache.WatchersLock.Lock() - defer watchCache.WatchersLock.Unlock() - - watchCache.WatchersBuffer = append(watchCache.WatchersBuffer, watcher) - }() - - go watcher.Process(ctx, initEvents) - return watcher, nil -} - -type errWatcher struct { - result chan watch.Event -} - -func newErrWatcher(err error) *errWatcher { - errEvent := utilwatch.NewErrorEvent(err) - - // Create a watcher with room for a single event, populate it, and close the channel - watcher := &errWatcher{result: make(chan watch.Event, 1)} - watcher.result <- errEvent - close(watcher.result) - - return watcher -} - -func (c *errWatcher) ResultChan() <-chan watch.Event { - return c.result -} - -func (c *errWatcher) Stop() { - // no-op + return s.Storage.Watch(ctx, options) } diff --git a/pkg/kubeapiserver/restmanager.go b/pkg/kubeapiserver/restmanager.go index b259ae738..52649bc20 100644 --- a/pkg/kubeapiserver/restmanager.go +++ b/pkg/kubeapiserver/restmanager.go @@ -101,7 +101,7 @@ func (m *RESTManager) GetRESTResourceInfo(gvr schema.GroupVersionResource) RESTR return infos[gvr] } -func (m *RESTManager) LoadResources(infos ResourceInfoMap, cluster string) map[schema.GroupResource]discovery.ResourceDiscoveryAPI { +func (m *RESTManager) LoadResources(infos ResourceInfoMap) map[schema.GroupResource]discovery.ResourceDiscoveryAPI { apigroups := m.groups.Load().(map[string]metav1.APIGroup) apiresources := m.resources.Load().(map[schema.GroupResource]metav1.APIResource) restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo) @@ -175,7 +175,7 @@ func (m *RESTManager) LoadResources(infos ResourceInfoMap, cluster string) map[s m.addAPIResourcesLocked(addedAPIResources) } if len(addedInfos) != 0 { - m.addRESTResourceInfosLocked(addedInfos, cluster) + m.addRESTResourceInfosLocked(addedInfos) } return apis } @@ -208,7 +208,7 @@ func (m *RESTManager) addAPIResourcesLocked(addedResources map[schema.GroupResou m.resources.Store(resources) } -func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo, cluster string) { +func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo) { restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo) infos := make(map[schema.GroupVersionResource]RESTResourceInfo, len(restinfos)+len(addedInfos)) @@ -225,9 +225,9 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers var err error var storage *resourcerest.RESTStorage if resourcescheme.LegacyResourceScheme.IsGroupRegistered(gvr.Group) { - storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind, cluster) + storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind) } else { - storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind, cluster) + storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind) } if err != nil { klog.ErrorS(err, "Failed to gen resource rest storage", "gvr", gvr, "kind", info.APIResource.Kind) @@ -263,12 +263,11 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers m.restResourceInfos.Store(infos) } -func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) { +func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind string) (*resourcerest.RESTStorage, error) { storageConfig, err := m.resourcetSorageConfig.NewLegacyResourceConfig(gvr.GroupResource()) if err != nil { return nil, err } - storageConfig.Cluster = cluster resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) if err != nil { @@ -291,12 +290,11 @@ func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResour }, nil } -func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) { +func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind string) (*resourcerest.RESTStorage, error) { storageConfig, err := m.resourcetSorageConfig.NewCustomResourceConfig(gvr) if err != nil { return nil, err } - storageConfig.Cluster = cluster resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) if err != nil { diff --git a/pkg/storage/internalstorage/register.go b/pkg/storage/internalstorage/register.go index db68e1339..d577750bf 100644 --- a/pkg/storage/internalstorage/register.go +++ b/pkg/storage/internalstorage/register.go @@ -29,6 +29,10 @@ func init() { } func NewStorageFactory(configPath string) (storage.StorageFactory, error) { + if configPath == "" { + return nil, fmt.Errorf("configPath should not be empty") + } + cfg := &Config{} if err := configor.Load(cfg, configPath); err != nil { return nil, err diff --git a/pkg/storage/internalstorage/resource_storage.go b/pkg/storage/internalstorage/resource_storage.go index ce29da11b..663a6e784 100644 --- a/pkg/storage/internalstorage/resource_storage.go +++ b/pkg/storage/internalstorage/resource_storage.go @@ -10,12 +10,14 @@ import ( "gorm.io/gorm" "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" genericstorage "k8s.io/apiserver/pkg/storage" internal "github.com/clusterpedia-io/api/clusterpedia" @@ -272,6 +274,10 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o return nil } +func (s *ResourceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + return nil, nil +} + func applyListOptionsToResourceQuery(db *gorm.DB, query *gorm.DB, opts *internal.ListOptions) (int64, *int64, *gorm.DB, error) { applyFn := func(query *gorm.DB, opts *internal.ListOptions) (*gorm.DB, error) { query, err := applyOwnerToResourceQuery(db, query, opts) diff --git a/pkg/storage/internalstorage/storage.go b/pkg/storage/internalstorage/storage.go index dbf7bc5ca..5ac987f66 100644 --- a/pkg/storage/internalstorage/storage.go +++ b/pkg/storage/internalstorage/storage.go @@ -84,3 +84,7 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna } return crs, nil } + +func (s *StorageFactory) PrepareCluster(cluster string) error { + return nil +} diff --git a/pkg/storage/memorystorage/memory_resource_storage.go b/pkg/storage/memorystorage/memory_resource_storage.go index 2a65ca69e..0680db34f 100644 --- a/pkg/storage/memorystorage/memory_resource_storage.go +++ b/pkg/storage/memorystorage/memory_resource_storage.go @@ -8,17 +8,17 @@ import ( "sync" "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" - "k8s.io/klog/v2" internal "github.com/clusterpedia-io/api/clusterpedia" - "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcescheme" "github.com/clusterpedia-io/clusterpedia/pkg/storage" cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" + utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) var ( @@ -46,100 +46,54 @@ type ResourceStorage struct { Codec runtime.Codec watchCache *cache.WatchCache - stopCh chan struct{} - crvSynchro *cache.ClusterResourceVersionSynchro + CrvSynchro *cache.ClusterResourceVersionSynchro incoming chan ClusterWatchEvent storageConfig *storage.ResourceStorageConfig } -func (s *ResourceStorage) convertEvent(event *ClusterWatchEvent) error { - klog.V(10).Infof("event: %s", event) - s.crvSynchro.UpdateClusterResourceVersion(&event.Event, event.ClusterName) - - err := s.encodeEvent(&event.Event) - if err != nil { - return fmt.Errorf("encode event failed: %v", err) - } - s.incoming <- *event - return nil -} - -func (s *ResourceStorage) dispatchEvents() { - for { - select { - case cwe, ok := <-s.incoming: - if !ok { - continue - } - - switch cwe.Event.Type { - case watch.Added: - err := s.watchCache.Add(cwe.Event.Object, cwe.ClusterName) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", cwe.Event.Object, err)) - } - case watch.Modified: - err := s.watchCache.Update(cwe.Event.Object, cwe.ClusterName) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to update watch event object (%#v) to store: %v", cwe.Event.Object, err)) - } - case watch.Deleted: - // TODO: Will any consumers need access to the "last known - // state", which is passed in event.Object? If so, may need - // to change this. - err := s.watchCache.Delete(cwe.Event.Object, cwe.ClusterName) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to delete watch event object (%#v) from store: %v", cwe.Event.Object, err)) - } - case watch.Error: - s.watchCache.ClearWatchCache(cwe.ClusterName) - default: - utilruntime.HandleError(fmt.Errorf("unable to understand watch event %#v", cwe.Event)) - continue - } - s.dispatchEvent(&cwe.Event) - case <-s.stopCh: - return - } - } -} - -func (s *ResourceStorage) dispatchEvent(event *watch.Event) { - s.watchCache.WatchersLock.RLock() - defer s.watchCache.WatchersLock.RUnlock() - for _, watcher := range s.watchCache.WatchersBuffer { - watcher.NonblockingAdd(event) - } -} - func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig { return s.storageConfig } func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtime.Object) error { - event := s.newClusterWatchEvent(watch.Added, obj, cluster) - err := s.convertEvent(event) + resourceVersion, err := s.CrvSynchro.UpdateClusterResourceVersion(obj, cluster) if err != nil { return err } + + err = s.watchCache.Add(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err)) + } + return nil } func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtime.Object) error { - event := s.newClusterWatchEvent(watch.Modified, obj, cluster) - err := s.convertEvent(event) + resourceVersion, err := s.CrvSynchro.UpdateClusterResourceVersion(obj, cluster) if err != nil { return err } + + err = s.watchCache.Update(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err)) + } + return nil } func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error { - event := s.newClusterWatchEvent(watch.Deleted, obj, cluster) - err := s.convertEvent(event) + resourceVersion, err := s.CrvSynchro.UpdateClusterResourceVersion(obj, cluster) if err != nil { return err } + + err = s.watchCache.Delete(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err)) + } + return nil } @@ -167,6 +121,7 @@ func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name stri return nil } +//nolint func (s *ResourceStorage) newClusterWatchEvent(eventType watch.EventType, obj runtime.Object, cluster string) *ClusterWatchEvent { return &ClusterWatchEvent{ ClusterName: cluster, @@ -240,33 +195,59 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o return nil } -func (s *ResourceStorage) encodeEvent(event *watch.Event) error { - var buffer bytes.Buffer +func (s *ResourceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + resourceversion := options.ResourceVersion + watchRV, err := cache.NewClusterResourceVersionFromString(resourceversion) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } - gk := event.Object.GetObjectKind().GroupVersionKind().GroupKind() - config := s.storageConfig - if ok := resourcescheme.LegacyResourceScheme.IsGroupRegistered(gk.Group); ok { - dest, err := resourcescheme.LegacyResourceScheme.New(config.MemoryVersion.WithKind(gk.Kind)) - if err != nil { - return err - } + watcher := cache.NewCacheWatcher(100) + watchCache := s.watchCache + watchCache.Lock() + defer watchCache.Unlock() - object := event.Object - err = config.Codec.Encode(object, &buffer) - if err != nil { - return err - } + initEvents, err := watchCache.GetAllEventsSinceThreadUnsafe(watchRV) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } - obj, _, err := config.Codec.Decode(buffer.Bytes(), nil, dest) - if err != nil { - return err - } + func() { + watchCache.WatchersLock.Lock() + defer watchCache.WatchersLock.Unlock() - if obj != dest { - return err - } - event.Object = dest - } + watchCache.WatchersBuffer = append(watchCache.WatchersBuffer, watcher) + }() - return nil + go watcher.Process(ctx, initEvents) + return watcher, nil +} + +type errWatcher struct { + result chan watch.Event +} + +func newErrWatcher(err error) *errWatcher { + errEvent := utilwatch.NewErrorEvent(err) + + // Create a watcher with room for a single event, populate it, and close the channel + watcher := &errWatcher{result: make(chan watch.Event, 1)} + watcher.result <- errEvent + close(watcher.result) + + return watcher +} + +func (c *errWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +func (c *errWatcher) Stop() { + // no-op } diff --git a/pkg/storage/memorystorage/memory_storage.go b/pkg/storage/memorystorage/memory_storage.go index f7853c731..a753db761 100644 --- a/pkg/storage/memorystorage/memory_storage.go +++ b/pkg/storage/memorystorage/memory_storage.go @@ -2,17 +2,16 @@ package memorystorage import ( "context" - "fmt" "k8s.io/apimachinery/pkg/runtime/schema" internal "github.com/clusterpedia-io/api/clusterpedia" "github.com/clusterpedia-io/clusterpedia/pkg/storage" cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" - utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) type StorageFactory struct { + clusters map[string]bool } func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) { @@ -25,36 +24,48 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi Resource: config.GroupResource.Resource, } - if resourceStorage, ok := storages.resourceStorages[gvr]; ok { + resourceStorage, ok := storages.resourceStorages[gvr] + if ok { watchCache := resourceStorage.watchCache if config.Namespaced { watchCache.KeyFunc = cache.GetKeyFunc(gvr, config.Namespaced) watchCache.IsNamespaced = true } - if _, ok := watchCache.GetStores()[config.Cluster]; !ok { - resourceStorage.watchCache.AddIndexer(config.Cluster, nil) + } else { + watchCache := cache.NewWatchCache(100, gvr, config.Namespaced) + resourceStorage = &ResourceStorage{ + incoming: make(chan ClusterWatchEvent, 100), + Codec: config.Codec, + watchCache: watchCache, + storageConfig: config, } - resourceStorage.crvSynchro.SetClusterResourceVersion(config.Cluster, "0") - - return resourceStorage, nil + storages.resourceStorages[gvr] = resourceStorage } - watchCache := cache.NewWatchCache(100, gvr, config.Namespaced, config.Cluster) - config.WatchCache = watchCache - resourceStorage := &ResourceStorage{ - incoming: make(chan ClusterWatchEvent, 100), - Codec: config.Codec, - crvSynchro: cache.NewClusterResourceVersionSynchro(config.Cluster), - watchCache: watchCache, - storageConfig: config, + for cluster := range s.clusters { + resourceStorage.watchCache.AddIndexer(cluster, nil) + + if resourceStorage.CrvSynchro == nil { + resourceStorage.CrvSynchro = cache.NewClusterResourceVersionSynchro(cluster) + } else { + resourceStorage.CrvSynchro.SetClusterResourceVersion(cluster, "0") + } } - go resourceStorage.dispatchEvents() + return resourceStorage, nil +} - storages.resourceStorages[gvr] = resourceStorage +func (s *StorageFactory) PrepareCluster(cluster string) error { + storages.Lock() + defer storages.Unlock() - return resourceStorage, nil + if _, ok := s.clusters[cluster]; ok { + return nil + } + + s.clusters[cluster] = true + return nil } func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionResource) (storage.CollectionResourceStorage, error) { @@ -69,11 +80,11 @@ func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error storages.Lock() defer storages.Unlock() for _, rs := range storages.resourceStorages { - rs.watchCache.DeleteIndexer(cluster) - // If a pediacluster is deleted from clusterpedia,then the informer of client-go should be list and watch again - errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("PediaCluster %s is deleted", cluster)) - rs.dispatchEvent(&errorEvent) + rs.CrvSynchro.RemoveCluster(cluster) + rs.watchCache.CleanCluster(cluster) + delete(s.clusters, cluster) } + return nil } @@ -81,11 +92,11 @@ func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster strin storages.Lock() defer storages.Unlock() if rs, ok := storages.resourceStorages[gvr]; ok { - rs.watchCache.DeleteIndexer(cluster) - // If a gvr is deleted from clusterpedia,then the informer of client-go should be list and watch again - errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("GVR %v is deleted", gvr)) - rs.dispatchEvent(&errorEvent) + rs.CrvSynchro.RemoveCluster(cluster) + rs.watchCache.CleanCluster(cluster) + delete(s.clusters, cluster) } + return nil } diff --git a/pkg/storage/memorystorage/register.go b/pkg/storage/memorystorage/register.go index e3cc3e206..6c3e20088 100644 --- a/pkg/storage/memorystorage/register.go +++ b/pkg/storage/memorystorage/register.go @@ -13,5 +13,8 @@ func init() { } func NewStorageFactory(_ string) (storage.StorageFactory, error) { - return &StorageFactory{}, nil + storageFactory := &StorageFactory{ + clusters: make(map[string]bool), + } + return storageFactory, nil } diff --git a/pkg/storage/memorystorage/watchcache/cluster_resource_version.go b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go index abc7ec0be..d62b032c5 100644 --- a/pkg/storage/memorystorage/watchcache/cluster_resource_version.go +++ b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go @@ -6,6 +6,7 @@ import ( "sync" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" @@ -91,31 +92,41 @@ func NewClusterResourceVersionSynchro(cluster string) *ClusterResourceVersionSyn } // UpdateClusterResourceVersion update the resourceVersion in ClusterResourceVersionSynchro to the latest -func (crvs *ClusterResourceVersionSynchro) UpdateClusterResourceVersion(event *watch.Event, cluster string) { +func (crvs *ClusterResourceVersionSynchro) UpdateClusterResourceVersion(obj runtime.Object, cluster string) (*ClusterResourceVersion, error) { crvs.Lock() defer crvs.Unlock() crv := crvs.crv accessor := meta.NewAccessor() - rv, _ := accessor.ResourceVersion(event.Object) + rv, _ := accessor.ResourceVersion(obj) crv.rvmap[cluster] = rv bytes, err := json.Marshal(crv.rvmap) if err != nil { - klog.Errorf("base64 encode failed: %v", err) - return + return nil, fmt.Errorf("base64 encode failed: %v", err) } - err = accessor.SetResourceVersion(event.Object, base64.RawURLEncoding.EncodeToString(bytes)) + version := base64.RawURLEncoding.EncodeToString(bytes) + err = accessor.SetResourceVersion(obj, version) if err != nil { - klog.Warningf("set resourceVersion failed: %v, may be it's a clear watch cache order event", err) - return + return nil, fmt.Errorf("set resourceVersion failed: %v, may be it's a clear watch cache order event", err) } + + return NewClusterResourceVersionFromString(version) } func (crvs *ClusterResourceVersionSynchro) SetClusterResourceVersion(clusterName string, resourceVersion string) { crvs.Lock() defer crvs.Unlock() - crvs.crv.rvmap[clusterName] = resourceVersion + if _, ok := crvs.crv.rvmap[clusterName]; !ok { + crvs.crv.rvmap[clusterName] = resourceVersion + } +} + +func (crvs *ClusterResourceVersionSynchro) RemoveCluster(clusterName string) { + crvs.Lock() + defer crvs.Unlock() + + delete(crvs.crv.rvmap, clusterName) } diff --git a/pkg/storage/memorystorage/watchcache/watch_cache.go b/pkg/storage/memorystorage/watchcache/watch_cache.go index 8b569e677..348491662 100644 --- a/pkg/storage/memorystorage/watchcache/watch_cache.go +++ b/pkg/storage/memorystorage/watchcache/watch_cache.go @@ -1,6 +1,7 @@ package watchcache import ( + "bytes" "context" "fmt" "sync" @@ -18,6 +19,8 @@ import ( "k8s.io/utils/strings/slices" internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcescheme" + utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) // StoreElement keeping the structs of resource in k8s(key, object, labels, fields). @@ -146,7 +149,7 @@ func storeElementObject(obj interface{}) (runtime.Object, error) { return elem.Object, nil } -func NewWatchCache(capacity int, gvr schema.GroupVersionResource, isNamespaced bool, newCluster string) *WatchCache { +func NewWatchCache(capacity int, gvr schema.GroupVersionResource, isNamespaced bool) *WatchCache { wc := &WatchCache{ capacity: capacity, KeyFunc: GetKeyFunc(gvr, isNamespaced), @@ -158,7 +161,6 @@ func NewWatchCache(capacity int, gvr schema.GroupVersionResource, isNamespaced b IsNamespaced: isNamespaced, } - wc.AddIndexer(newCluster, nil) return wc } @@ -200,61 +202,66 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion *ClusterRes } // Add takes runtime.Object as an argument. -func (w *WatchCache) Add(obj interface{}, clusterName string) error { - object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) - if err != nil { - return err - } - event := watch.Event{Type: watch.Added, Object: object} - +func (w *WatchCache) Add(obj runtime.Object, clusterName string, resourceVersion *ClusterResourceVersion, + codec runtime.Codec, memoryVersion schema.GroupVersion) error { f := func(elem *StoreElement) error { return w.stores[clusterName].Add(elem) } - return w.processEvent(event, resourceVersion, f) -} + object, err := encodeEvent(obj, codec, memoryVersion) + if err != nil { + return err + } -// Update takes runtime.Object as an argument. -func (w *WatchCache) Update(obj interface{}, clusterName string) error { - object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + event := watch.Event{Type: watch.Added, Object: object} + err = w.processEvent(event, resourceVersion, f) if err != nil { return err } - event := watch.Event{Type: watch.Modified, Object: object} + w.dispatchEvent(&event) + return nil +} + +// Update takes runtime.Object as an argument. +func (w *WatchCache) Update(obj runtime.Object, clusterName string, resourceVersion *ClusterResourceVersion, + codec runtime.Codec, memoryVersion schema.GroupVersion) error { f := func(elem *StoreElement) error { return w.stores[clusterName].Update(elem) } - return w.processEvent(event, resourceVersion, f) -} + object, err := encodeEvent(obj, codec, memoryVersion) + if err != nil { + return err + } -// Delete takes runtime.Object as an argument. -func (w *WatchCache) Delete(obj interface{}, clusterName string) error { - object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + event := watch.Event{Type: watch.Modified, Object: object} + err = w.processEvent(event, resourceVersion, f) if err != nil { return err } - event := watch.Event{Type: watch.Deleted, Object: object} - f := func(elem *StoreElement) error { return w.stores[clusterName].Delete(elem) } - return w.processEvent(event, resourceVersion, f) + w.dispatchEvent(&event) + return nil } -func (w *WatchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, *ClusterResourceVersion, error) { - object, ok := obj.(runtime.Object) - if !ok { - return nil, &ClusterResourceVersion{}, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) +// Delete takes runtime.Object as an argument. +func (w *WatchCache) Delete(obj runtime.Object, clusterName string, resourceVersion *ClusterResourceVersion, + codec runtime.Codec, memoryVersion schema.GroupVersion) error { + f := func(elem *StoreElement) error { + return w.stores[clusterName].Delete(elem) } - - resourceVersion, err := meta.NewAccessor().ResourceVersion(object) + object, err := encodeEvent(obj, codec, memoryVersion) if err != nil { - return nil, &ClusterResourceVersion{}, err + return err } - clusterRvStore, err := NewClusterResourceVersionFromString(resourceVersion) + event := watch.Event{Type: watch.Deleted, Object: object} + err = w.processEvent(event, resourceVersion, f) if err != nil { - return nil, &ClusterResourceVersion{}, err + return err } - return object, clusterRvStore, nil + + w.dispatchEvent(&event) + return nil } // Assumes that lock is already held for write. @@ -343,7 +350,6 @@ func (w *WatchCache) WaitUntilFreshAndGet(cluster, namespace, name string) (*Sto // GetAllEventsSinceThreadUnsafe returns watch event from slice window in watchCache by the resourceVersion func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion *ClusterResourceVersion) ([]*watch.Event, error) { size := w.endIndex - w.startIndex - oldestObj := w.cache[w.startIndex%w.capacity] if resourceVersion.IsEmpty() { // resourceVersion = 0 means that we don't require any specific starting point // and we would like to start watching from ~now. @@ -385,11 +391,7 @@ func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion *ClusterResou var result []*watch.Event if !founded { - oldest, err := GetClusterResourceVersionFromEvent(oldestObj) - if err != nil { - return nil, err - } - return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %#v (%#v)", resourceVersion, oldest)) + return nil, apierrors.NewResourceExpired("resource version not found") } else { i := 0 result = make([]*watch.Event, size-index) @@ -405,15 +407,18 @@ func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion *ClusterResou func (w *WatchCache) AddIndexer(clusterName string, indexers *cache.Indexers) { w.Lock() defer w.Unlock() - w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)) + + if _, ok := w.stores[clusterName]; !ok { + w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)) + } } -func (w *WatchCache) DeleteIndexer(clusterName string) { +func (w *WatchCache) DeleteIndexer(clusterName string) bool { w.Lock() defer w.Unlock() if _, ok := w.stores[clusterName]; !ok { - return + return false } delete(w.stores, clusterName) @@ -421,20 +426,54 @@ func (w *WatchCache) DeleteIndexer(clusterName string) { //clear cache w.startIndex = 0 w.endIndex = 0 + + return true } -func (w *WatchCache) ClearWatchCache(clusterName string) { - w.Lock() - defer w.Unlock() +func (w *WatchCache) dispatchEvent(event *watch.Event) { + w.WatchersLock.RLock() + defer w.WatchersLock.RUnlock() + for _, watcher := range w.WatchersBuffer { + watcher.NonblockingAdd(event) + } +} - if _, ok := w.stores[clusterName]; !ok { +func (w *WatchCache) CleanCluster(cluster string) { + if !w.DeleteIndexer(cluster) { return } - delete(w.stores, clusterName) - w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(nil)) + errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("cluster %s has been clean", cluster)) + w.dispatchEvent(&errorEvent) +} - //clear cache - w.startIndex = 0 - w.endIndex = 0 +func encodeEvent(obj runtime.Object, codec runtime.Codec, memoryVersion schema.GroupVersion) (runtime.Object, error) { + var buffer bytes.Buffer + + //gvk := obj.GetObjectKind().GroupVersionKind() + gk := obj.GetObjectKind().GroupVersionKind().GroupKind() + if ok := resourcescheme.LegacyResourceScheme.IsGroupRegistered(gk.Group); !ok { + return obj, nil + } + + dest, err := resourcescheme.LegacyResourceScheme.New(memoryVersion.WithKind(gk.Kind)) + if err != nil { + return nil, err + } + + err = codec.Encode(obj, &buffer) + if err != nil { + return nil, err + } + + object, _, err := codec.Decode(buffer.Bytes(), nil, dest) + if err != nil { + return nil, err + } + + if object != dest { + return nil, err + } + + return dest, nil } diff --git a/pkg/storage/options/options.go b/pkg/storage/options/options.go index 388d3e33f..61c77682a 100644 --- a/pkg/storage/options/options.go +++ b/pkg/storage/options/options.go @@ -7,7 +7,7 @@ import ( "github.com/spf13/pflag" _ "github.com/clusterpedia-io/clusterpedia/pkg/storage/internalstorage" - "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage" + _ "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage" ) type StorageOptions struct { @@ -24,7 +24,7 @@ func (o *StorageOptions) Validate() []error { return nil } - if o.Name == memorystorage.StorageName { + if o.ConfigPath == "" { return nil } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index b968fcc70..0c7fa1d3d 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -3,15 +3,17 @@ package storage import ( "context" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" internal "github.com/clusterpedia-io/api/clusterpedia" - "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" ) type StorageFactory interface { GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) + PrepareCluster(cluster string) error CleanCluster(ctx context.Context, cluster string) error CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error @@ -26,6 +28,7 @@ type ResourceStorage interface { Get(ctx context.Context, cluster, namespace, name string, obj runtime.Object) error List(ctx context.Context, listObj runtime.Object, opts *internal.ListOptions) error + Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) Create(ctx context.Context, cluster string, obj runtime.Object) error Update(ctx context.Context, cluster string, obj runtime.Object) error @@ -43,8 +46,6 @@ type ResourceStorageConfig struct { Codec runtime.Codec StorageVersion schema.GroupVersion MemoryVersion schema.GroupVersion - WatchCache *watchcache.WatchCache - Cluster string Namespaced bool } diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go index 30b09aced..c84bb7546 100644 --- a/pkg/synchromanager/clustersynchro/cluster_synchro.go +++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go @@ -275,7 +275,6 @@ func (s *ClusterSynchro) setSyncResources() { if syncResources == nil { return } - groupResourceStatus, storageResourceSyncConfigs := s.resourceNegotiator.NegotiateSyncResources(syncResources) lastGroupResourceStatus := s.groupResourceStatus.Load().(*GroupResourceStatus) @@ -303,7 +302,6 @@ func (s *ClusterSynchro) setSyncResources() { continue } resourceConfig := config.storageConfig - resourceConfig.Cluster = s.name resourceStorage, err := s.storage.NewResourceStorage(resourceConfig) if err != nil { diff --git a/pkg/synchromanager/clustersynchro_manager.go b/pkg/synchromanager/clustersynchro_manager.go index 21231fdf5..b17cc4920 100644 --- a/pkg/synchromanager/clustersynchro_manager.go +++ b/pkg/synchromanager/clustersynchro_manager.go @@ -356,6 +356,13 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster) } return controller.NoRequeueResult } + + err = manager.storage.PrepareCluster(cluster.Name) + if err != nil { + klog.ErrorS(err, "Failed to prepare cluster", "cluster", cluster.Name) + return controller.NoRequeueResult + } + manager.synchroWaitGroup.StartWithChannel(manager.stopCh, synchro.Run) manager.synchrolock.Lock()