From 5775f703f5655536c34c8bbaf0d68f13845c1a8d Mon Sep 17 00:00:00 2001 From: wuyingjun Date: Thu, 8 Sep 2022 17:02:08 +0800 Subject: [PATCH 1/7] add a new command for memory storage Co-authored-by: duanmeng Co-authored-by: hanweisen Co-authored-by: zhangyongxi Signed-off-by: wuyingjun --- Makefile | 50 +++++++++++- cmd/apiserver/app/apiserver.go | 2 +- cmd/apiserver/app/options/options.go | 16 +++- .../app/binding_apiserver.go | 80 +++++++++++++++++++ cmd/binding-apiserver/main.go | 18 +++++ pkg/apiserver/apiserver.go | 12 +++ 6 files changed, 170 insertions(+), 8 deletions(-) create mode 100644 cmd/binding-apiserver/app/binding_apiserver.go create mode 100644 cmd/binding-apiserver/main.go diff --git a/Makefile b/Makefile index c0044dd3b..5975ae93a 100644 --- a/Makefile +++ b/Makefile @@ -48,7 +48,7 @@ ifeq ($(LATEST_TAG),$(shell git describe --abbrev=0 --tags)) VERSION=$(LATEST_TAG) endif -all: apiserver clustersynchro-manager controller-manager +all: apiserver binding-apiserver clustersynchro-manager controller-manager gen-clusterconfigs: ./hack/gen-clusterconfigs.sh @@ -91,6 +91,13 @@ apiserver: -o bin/apiserver \ cmd/apiserver/main.go +.PHONY: binding-apiserver +binding-apiserver: + CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \ + -ldflags $(LDFLAGS) \ + -o bin/binding-apiserver \ + cmd/binding-apiserver/main.go + .PHONY: clustersynchro-manager clustersynchro-manager: CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \ @@ -106,7 +113,7 @@ controller-manager: cmd/controller-manager/main.go .PHONY: images -images: image-apiserver image-clustersynchro-manager image-controller-manager +images: image-apiserver image-binding-apiserver image-clustersynchro-manager image-controller-manager image-apiserver: GOOS="linux" $(MAKE) apiserver @@ -116,7 +123,16 @@ image-apiserver: --load \ --build-arg BASEIMAGE=$(BASEIMAGE) \ --build-arg BINNAME=apiserver . - + +image-binding-apiserver: + GOOS="linux" $(MAKE) binding-apiserver + docker buildx build \ + -t ${REGISTRY}/binding-apiserver-$(GOARCH):$(VERSION) \ + --platform=linux/$(GOARCH) \ + --load \ + --build-arg BASEIMAGE=$(BASEIMAGE) \ + --build-arg BINNAME=binding-apiserver . + image-clustersynchro-manager: GOOS="linux" $(MAKE) clustersynchro-manager docker buildx build \ @@ -136,7 +152,7 @@ image-controller-manager: --build-arg BINNAME=controller-manager . .PHONY: push-images -push-images: push-apiserver-image push-clustersynchro-manager-image push-controller-manager-image +push-images: push-apiserver-image push-binding-apiserver-image push-clustersynchro-manager-image push-controller-manager-imag # clean manifest https://github.com/docker/cli/issues/954#issuecomment-586722447 push-apiserver-image: clean-apiserver-manifest @@ -160,6 +176,28 @@ push-apiserver-image: clean-apiserver-manifest docker manifest push $(REGISTRY)/apiserver:latest; \ fi; +# clean manifest https://github.com/docker/cli/issues/954#issuecomment-586722447 +push-binding-apiserver-image: clean-binding-apiserver-manifest + set -e; \ + images=""; \ + for arch in $(RELEASE_ARCHS); do \ + GOARCH=$$arch $(MAKE) image-binding-apiserver; \ + image=$(REGISTRY)/binding-apiserver-$$arch:$(VERSION); \ + docker push $$image; \ + images="$$images $$image"; \ + if [ $(VERSION) != latest ]; then \ + latest_image=$(REGISTRY)/binding-apiserver-$$arch:latest; \ + docker tag $$image $$latest_image; \ + docker push $$latest_image; \ + fi; \ + done; \ + docker manifest create $(REGISTRY)/binding-apiserver:$(VERSION) $$images; \ + docker manifest push $(REGISTRY)/binding-apiserver:$(VERSION); \ + if [ $(VERSION) != latest ]; then \ + docker manifest create $(REGISTRY)/binding-apiserver:latest $$images; \ + docker manifest push $(REGISTRY)/binding-apiserver:latest; \ + fi; + # clean manifest https://github.com/docker/cli/issues/954#issuecomment-586722447 push-clustersynchro-manager-image: clean-clustersynchro-manager-manifest set -e; \ @@ -212,6 +250,10 @@ clean-apiserver-manifest: docker manifest rm $(REGISTRY)/apiserver:$(VERSION) 2>/dev/null;\ docker manifest rm $(REGISTRY)/apiserver:latest 2>/dev/null; exit 0 +clean-binding-apiserver-manifest: + docker manifest rm $(REGISTRY)/binding-apiserver:$(VERSION) 2>/dev/null;\ + docker manifest rm $(REGISTRY)/binding-apiserver:latest 2>/dev/null; exit 0 + clean-clustersynchro-manager-manifest: docker manifest rm $(REGISTRY)/clustersynchro-manager:$(VERSION) 2>/dev/null;\ docker manifest rm $(REGISTRY)/clustersynchro-manager:latest 2>/dev/null; exit 0 diff --git a/cmd/apiserver/app/apiserver.go b/cmd/apiserver/app/apiserver.go index 9dd16a869..541b2a363 100644 --- a/cmd/apiserver/app/apiserver.go +++ b/cmd/apiserver/app/apiserver.go @@ -32,7 +32,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command { } cliflag.PrintFlags(cmd.Flags()) - config, err := opts.Config() + config, err := opts.Config(false) if err != nil { return err } diff --git a/cmd/apiserver/app/options/options.go b/cmd/apiserver/app/options/options.go index 94eaa5035..326b1ce84 100644 --- a/cmd/apiserver/app/options/options.go +++ b/cmd/apiserver/app/options/options.go @@ -3,9 +3,12 @@ package options import ( "fmt" "net" + "net/http" + "strings" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" genericapiserver "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/util/feature" @@ -72,7 +75,7 @@ func (o *ClusterPediaServerOptions) Validate() error { return utilerrors.NewAggregate(errors) } -func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) { +func (o *ClusterPediaServerOptions) Config(bindingSyncController bool) (*apiserver.Config, error) { if err := o.Validate(); err != nil { return nil, err } @@ -95,13 +98,20 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) { // genericConfig.OpenAPIConfig.Info.Title = openAPITitle // genericConfig.OpenAPIConfig.Info.Version= openAPIVersion + // todo + // support watch to LongRunningFunc + genericConfig.LongRunningFunc = func(r *http.Request, requestInfo *genericrequest.RequestInfo) bool { + return strings.Contains(r.RequestURI, "watch") + } + if err := o.genericOptionsApplyTo(genericConfig); err != nil { return nil, err } return &apiserver.Config{ - GenericConfig: genericConfig, - StorageFactory: storage, + GenericConfig: genericConfig, + StorageFactory: storage, + BindingSyncController: bindingSyncController, }, nil } diff --git a/cmd/binding-apiserver/app/binding_apiserver.go b/cmd/binding-apiserver/app/binding_apiserver.go new file mode 100644 index 000000000..1b8ed1e55 --- /dev/null +++ b/cmd/binding-apiserver/app/binding_apiserver.go @@ -0,0 +1,80 @@ +package app + +import ( + "context" + + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/util/runtime" + genericfeatures "k8s.io/apiserver/pkg/features" + cliflag "k8s.io/component-base/cli/flag" + "k8s.io/component-base/cli/globalflag" + "k8s.io/component-base/featuregate" + "k8s.io/component-base/logs" + "k8s.io/component-base/term" + + "github.com/clusterpedia-io/clusterpedia/cmd/apiserver/app/options" + clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" + "github.com/clusterpedia-io/clusterpedia/pkg/version/verflag" +) + +func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command { + opts := options.NewServerOptions() + + cmd := &cobra.Command{ + Use: "clusterpedia-apiserver", + RunE: func(cmd *cobra.Command, args []string) error { + verflag.PrintAndExitIfRequested() + + // Activate logging as soon as possible, after that + // show flags with the final logging configuration. + if err := opts.Logs.ValidateAndApply(clusterpediafeature.FeatureGate); err != nil { + return err + } + cliflag.PrintFlags(cmd.Flags()) + + config, err := opts.Config(true) + if err != nil { + return err + } + + server, err := config.Complete().New() + if err != nil { + return err + } + + if err := server.Run(ctx); err != nil { + return err + } + return nil + }, + } + + namedFlagSets := opts.Flags() + verflag.AddFlags(namedFlagSets.FlagSet("global")) + globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags()) + clusterpediafeature.MutableFeatureGate.AddFlag(namedFlagSets.FlagSet("mutable feature gate")) + + fs := cmd.Flags() + for _, f := range namedFlagSets.FlagSets { + fs.AddFlagSet(f) + } + + cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) + cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols) + return cmd +} + +func init() { + runtime.Must(logs.AddFeatureGates(clusterpediafeature.MutableFeatureGate)) + + // The feature gate `RemainingItemCount` should default to false + // https://github.com/clusterpedia-io/clusterpedia/issues/196 + gates := clusterpediafeature.MutableFeatureGate.GetAll() + gate := gates[genericfeatures.RemainingItemCount] + gate.Default = false + gates[genericfeatures.RemainingItemCount] = gate + + clusterpediafeature.MutableFeatureGate = featuregate.NewFeatureGate() + runtime.Must(clusterpediafeature.MutableFeatureGate.Add(gates)) + clusterpediafeature.FeatureGate = clusterpediafeature.MutableFeatureGate +} diff --git a/cmd/binding-apiserver/main.go b/cmd/binding-apiserver/main.go new file mode 100644 index 000000000..1a0ff0093 --- /dev/null +++ b/cmd/binding-apiserver/main.go @@ -0,0 +1,18 @@ +package main + +import ( + "os" + + apiserver "k8s.io/apiserver/pkg/server" + "k8s.io/component-base/cli" + _ "k8s.io/component-base/logs/json/register" // for JSON log format registration + + "github.com/clusterpedia-io/clusterpedia/cmd/binding-apiserver/app" +) + +func main() { + ctx := apiserver.SetupSignalContext() + command := app.NewClusterPediaServerCommand(ctx) + code := cli.Run(command) + os.Exit(code) +} diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index d6f27685f..41bc564d9 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -26,6 +26,7 @@ import ( informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver" "github.com/clusterpedia-io/clusterpedia/pkg/storage" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager" "github.com/clusterpedia-io/clusterpedia/pkg/utils/filters" ) @@ -64,6 +65,8 @@ type Config struct { GenericConfig *genericapiserver.RecommendedConfig StorageFactory storage.StorageFactory + // BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager + BindingSyncController bool } type ClusterPediaServer struct { @@ -75,6 +78,8 @@ type completedConfig struct { ClientConfig *clientrest.Config StorageFactory storage.StorageFactory + // BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager + BindingSyncController bool } // CompletedConfig embeds a private pointer that cannot be instantiated outside of this package. @@ -88,6 +93,7 @@ func (cfg *Config) Complete() CompletedConfig { cfg.GenericConfig.Complete(), cfg.GenericConfig.ClientConfig, cfg.StorageFactory, + cfg.BindingSyncController, } c.GenericConfig.Version = &version.Info{ @@ -160,6 +166,12 @@ func (config completedConfig) New() (*ClusterPediaServer, error) { genericServer.AddPostStartHookOrDie("start-clusterpedia-informers", func(context genericapiserver.PostStartHookContext) error { clusterpediaInformerFactory.Start(context.StopCh) clusterpediaInformerFactory.WaitForCacheSync(context.StopCh) + + if config.BindingSyncController { + synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory) + synchromanager.Run(1, context.StopCh) + } + return nil }) From 5c356d3e8d21db156b11cb74d5f088b6abe202e6 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Thu, 8 Sep 2022 17:07:58 +0800 Subject: [PATCH 2/7] add deploy manifests for binding_apiserver Co-authored-by: hanweisen Co-authored-by: wuyingjun Co-authored-by: zhangyongxi Signed-off-by: duanmeng --- ...terpedia_binding_apiserver_apiservice.yaml | 13 ++++++ ...terpedia_binding_apiserver_deployment.yaml | 45 +++++++++++++++++++ deploy/clusterpedia_apiserver_rbac.yaml | 3 ++ 3 files changed, 61 insertions(+) create mode 100644 deploy/binding-apiserver/clusterpedia_binding_apiserver_apiservice.yaml create mode 100644 deploy/binding-apiserver/clusterpedia_binding_apiserver_deployment.yaml diff --git a/deploy/binding-apiserver/clusterpedia_binding_apiserver_apiservice.yaml b/deploy/binding-apiserver/clusterpedia_binding_apiserver_apiservice.yaml new file mode 100644 index 000000000..3f04bbe41 --- /dev/null +++ b/deploy/binding-apiserver/clusterpedia_binding_apiserver_apiservice.yaml @@ -0,0 +1,13 @@ +apiVersion: apiregistration.k8s.io/v1 +kind: APIService +metadata: + name: v1beta1.clusterpedia.io +spec: + insecureSkipTLSVerify: true + group: clusterpedia.io + groupPriorityMinimum: 1000 + versionPriority: 100 + service: + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system + version: v1beta1 diff --git a/deploy/binding-apiserver/clusterpedia_binding_apiserver_deployment.yaml b/deploy/binding-apiserver/clusterpedia_binding_apiserver_deployment.yaml new file mode 100644 index 000000000..75e47f139 --- /dev/null +++ b/deploy/binding-apiserver/clusterpedia_binding_apiserver_deployment.yaml @@ -0,0 +1,45 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system +--- +apiVersion: v1 +kind: Service +metadata: + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system +spec: + ports: + - port: 443 + protocol: TCP + targetPort: 443 + selector: + app: clusterpedia-binding-apiserver +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system + labels: + app: clusterpedia-binding-apiserver +spec: + replicas: 1 + selector: + matchLabels: + app: clusterpedia-binding-apiserver + template: + metadata: + labels: + app: clusterpedia-binding-apiserver + spec: + containers: + - name: binding-apiserver + image: ghcr.io/clusterpedia-io/clusterpedia/binding-apiserver:v0.4.1 + command: + - /usr/local/bin/binding-apiserver + - --secure-port=443 + - --storage-name=memory + - -v=3 + serviceAccountName: clusterpedia-binding-apiserver diff --git a/deploy/clusterpedia_apiserver_rbac.yaml b/deploy/clusterpedia_apiserver_rbac.yaml index c5e40237e..caa6735b8 100644 --- a/deploy/clusterpedia_apiserver_rbac.yaml +++ b/deploy/clusterpedia_apiserver_rbac.yaml @@ -28,3 +28,6 @@ subjects: - kind: ServiceAccount name: clusterpedia-controller-manager namespace: clusterpedia-system + - kind: ServiceAccount + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system \ No newline at end of file From 0dcc3b8500238e5adf6c3c0860af42ee23b4a5af Mon Sep 17 00:00:00 2001 From: zhangyongxi Date: Thu, 8 Sep 2022 18:19:26 +0800 Subject: [PATCH 3/7] add memory storage for save resource in memory Co-authored-by: duanmeng Co-authored-by: hanweisen Co-authored-by: wuyingjun Signed-off-by: zhangyongxi --- .../clusterresource_controller.go | 2 +- pkg/kubeapiserver/restmanager.go | 16 +- .../memorystorage/memory_resource_storage.go | 106 +++++++ pkg/storage/memorystorage/memory_storage.go | 71 +++++ pkg/storage/memorystorage/register.go | 17 + .../memorystorage/watchcache/watch_cache.go | 292 ++++++++++++++++++ pkg/storage/options/options.go | 5 + pkg/storage/storage.go | 4 + .../clustersynchro/cluster_synchro.go | 4 +- .../clustersynchro/resource_negotiator.go | 2 + 10 files changed, 510 insertions(+), 9 deletions(-) create mode 100644 pkg/storage/memorystorage/memory_resource_storage.go create mode 100644 pkg/storage/memorystorage/memory_storage.go create mode 100644 pkg/storage/memorystorage/register.go create mode 100644 pkg/storage/memorystorage/watchcache/watch_cache.go diff --git a/pkg/kubeapiserver/clusterresource_controller.go b/pkg/kubeapiserver/clusterresource_controller.go index b974851db..8f23b1796 100644 --- a/pkg/kubeapiserver/clusterresource_controller.go +++ b/pkg/kubeapiserver/clusterresource_controller.go @@ -83,7 +83,7 @@ func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alp return } - discoveryapis := c.restManager.LoadResources(resources) + discoveryapis := c.restManager.LoadResources(resources, cluster.Name) c.discoveryManager.SetClusterGroupResource(cluster.Name, discoveryapis) c.clusterresources[cluster.Name] = resources diff --git a/pkg/kubeapiserver/restmanager.go b/pkg/kubeapiserver/restmanager.go index 52649bc20..b259ae738 100644 --- a/pkg/kubeapiserver/restmanager.go +++ b/pkg/kubeapiserver/restmanager.go @@ -101,7 +101,7 @@ func (m *RESTManager) GetRESTResourceInfo(gvr schema.GroupVersionResource) RESTR return infos[gvr] } -func (m *RESTManager) LoadResources(infos ResourceInfoMap) map[schema.GroupResource]discovery.ResourceDiscoveryAPI { +func (m *RESTManager) LoadResources(infos ResourceInfoMap, cluster string) map[schema.GroupResource]discovery.ResourceDiscoveryAPI { apigroups := m.groups.Load().(map[string]metav1.APIGroup) apiresources := m.resources.Load().(map[schema.GroupResource]metav1.APIResource) restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo) @@ -175,7 +175,7 @@ func (m *RESTManager) LoadResources(infos ResourceInfoMap) map[schema.GroupResou m.addAPIResourcesLocked(addedAPIResources) } if len(addedInfos) != 0 { - m.addRESTResourceInfosLocked(addedInfos) + m.addRESTResourceInfosLocked(addedInfos, cluster) } return apis } @@ -208,7 +208,7 @@ func (m *RESTManager) addAPIResourcesLocked(addedResources map[schema.GroupResou m.resources.Store(resources) } -func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo) { +func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo, cluster string) { restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo) infos := make(map[schema.GroupVersionResource]RESTResourceInfo, len(restinfos)+len(addedInfos)) @@ -225,9 +225,9 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers var err error var storage *resourcerest.RESTStorage if resourcescheme.LegacyResourceScheme.IsGroupRegistered(gvr.Group) { - storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind) + storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind, cluster) } else { - storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind) + storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind, cluster) } if err != nil { klog.ErrorS(err, "Failed to gen resource rest storage", "gvr", gvr, "kind", info.APIResource.Kind) @@ -263,11 +263,12 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers m.restResourceInfos.Store(infos) } -func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind string) (*resourcerest.RESTStorage, error) { +func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) { storageConfig, err := m.resourcetSorageConfig.NewLegacyResourceConfig(gvr.GroupResource()) if err != nil { return nil, err } + storageConfig.Cluster = cluster resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) if err != nil { @@ -290,11 +291,12 @@ func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResour }, nil } -func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind string) (*resourcerest.RESTStorage, error) { +func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) { storageConfig, err := m.resourcetSorageConfig.NewCustomResourceConfig(gvr) if err != nil { return nil, err } + storageConfig.Cluster = cluster resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) if err != nil { diff --git a/pkg/storage/memorystorage/memory_resource_storage.go b/pkg/storage/memorystorage/memory_resource_storage.go new file mode 100644 index 000000000..6c544f09f --- /dev/null +++ b/pkg/storage/memorystorage/memory_resource_storage.go @@ -0,0 +1,106 @@ +package memorystorage + +import ( + "context" + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" + + internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/storage" + cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" +) + +var ( + storages *resourceStorages +) + +type resourceStorages struct { + sync.RWMutex + resourceStorages map[schema.GroupVersionResource]*ResourceStorage +} + +func init() { + storages = &resourceStorages{ + resourceStorages: make(map[schema.GroupVersionResource]*ResourceStorage), + } +} + +type ClusterWatchEvent struct { + Event watch.Event + ClusterName string +} + +//nolint +type ResourceStorage struct { + sync.RWMutex + + Codec runtime.Codec + watchCache *cache.WatchCache + stopCh chan struct{} + incoming chan ClusterWatchEvent + storageConfig *storage.ResourceStorageConfig +} + +//nolint +func (s *ResourceStorage) convertEvent(event *ClusterWatchEvent) error { + s.Lock() + s.Unlock() + klog.V(10).Infof("event: %s", event) + s.incoming <- *event + return nil +} + +func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig { + return s.storageConfig +} + +func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtime.Object) error { + event := s.newClusterWatchEvent(watch.Added, obj, cluster) + err := s.convertEvent(event) + if err != nil { + return err + } + return nil +} + +func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtime.Object) error { + event := s.newClusterWatchEvent(watch.Modified, obj, cluster) + err := s.convertEvent(event) + if err != nil { + return err + } + return nil +} + +func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error { + event := s.newClusterWatchEvent(watch.Deleted, obj, cluster) + err := s.convertEvent(event) + if err != nil { + return err + } + return nil +} + +func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error { + // todo + return nil +} + +func (s *ResourceStorage) newClusterWatchEvent(eventType watch.EventType, obj runtime.Object, cluster string) *ClusterWatchEvent { + return &ClusterWatchEvent{ + ClusterName: cluster, + Event: watch.Event{ + Type: eventType, + Object: obj, + }, + } +} + +func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, opts *internal.ListOptions) error { + // todo + return nil +} diff --git a/pkg/storage/memorystorage/memory_storage.go b/pkg/storage/memorystorage/memory_storage.go new file mode 100644 index 000000000..24ba8d16a --- /dev/null +++ b/pkg/storage/memorystorage/memory_storage.go @@ -0,0 +1,71 @@ +package memorystorage + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime/schema" + + internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/storage" + cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" +) + +type StorageFactory struct { +} + +func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) { + storages.Lock() + defer storages.Unlock() + + gvr := schema.GroupVersionResource{ + Group: config.GroupResource.Group, + Version: config.StorageVersion.Version, + Resource: config.GroupResource.Resource, + } + + if resourceStorage, ok := storages.resourceStorages[gvr]; ok { + watchCache := resourceStorage.watchCache + if config.Namespaced { + watchCache.KeyFunc = cache.GetKeyFunc(gvr, config.Namespaced) + watchCache.IsNamespaced = true + } + if _, ok := watchCache.GetStores()[config.Cluster]; !ok { + resourceStorage.watchCache.AddIndexer(config.Cluster, nil) + } + + return resourceStorage, nil + } + + watchCache := cache.NewWatchCache(100, gvr, config.Namespaced, config.Cluster) + config.WatchCache = watchCache + resourceStorage := &ResourceStorage{ + incoming: make(chan ClusterWatchEvent, 100), + Codec: config.Codec, + watchCache: watchCache, + storageConfig: config, + } + + storages.resourceStorages[gvr] = resourceStorage + + return resourceStorage, nil +} + +func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionResource) (storage.CollectionResourceStorage, error) { + return nil, nil +} + +func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) { + return nil, nil +} + +func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error { + return nil +} + +func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error { + return nil +} + +func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error) { + return nil, nil +} diff --git a/pkg/storage/memorystorage/register.go b/pkg/storage/memorystorage/register.go new file mode 100644 index 000000000..e3cc3e206 --- /dev/null +++ b/pkg/storage/memorystorage/register.go @@ -0,0 +1,17 @@ +package memorystorage + +import ( + "github.com/clusterpedia-io/clusterpedia/pkg/storage" +) + +const ( + StorageName = "memory" +) + +func init() { + storage.RegisterStorageFactoryFunc(StorageName, NewStorageFactory) +} + +func NewStorageFactory(_ string) (storage.StorageFactory, error) { + return &StorageFactory{}, nil +} diff --git a/pkg/storage/memorystorage/watchcache/watch_cache.go b/pkg/storage/memorystorage/watchcache/watch_cache.go new file mode 100644 index 000000000..7751e98e2 --- /dev/null +++ b/pkg/storage/memorystorage/watchcache/watch_cache.go @@ -0,0 +1,292 @@ +package watchcache + +import ( + "context" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/generic/registry" + "k8s.io/client-go/tools/cache" + "k8s.io/utils/strings/slices" + + internal "github.com/clusterpedia-io/api/clusterpedia" +) + +// StoreElement keeping the structs of resource in k8s(key, object, labels, fields). +type StoreElement struct { + Key string + Object runtime.Object + Labels labels.Set + Fields fields.Set +} + +// WatchCache implements a Store interface. +// However, it depends on the elements implementing runtime.Object interface. +// +// WatchCache is a "sliding window" (with a limited capacity) of objects +// observed from a watch. +type WatchCache struct { + sync.RWMutex + + // Maximum size of history window. + capacity int + + // KeyFunc is used to get a key in the underlying storage for a given object. + KeyFunc func(runtime.Object) (string, error) + + // getAttrsFunc is used to get labels and fields of an object. + getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) + + // cache is used a cyclic buffer - its first element (with the smallest + // resourceVersion) is defined by startIndex, its last element is defined + // by endIndex (if cache is full it will be startIndex + capacity). + // Both startIndex and endIndex can be greater than buffer capacity - + // you should always apply modulo capacity to get an index in cache array. + cache []*watch.Event + startIndex int + endIndex int + + // store will effectively support LIST operation from the "end of cache + // history" i.e. from the moment just after the newest cached watched event. + // It is necessary to effectively allow clients to start watching at now. + // NOTE: We assume that is thread-safe. + stores map[string]cache.Indexer + + //eventHandler func(*watchCacheEvent) + + WatchersLock sync.RWMutex + + IsNamespaced bool +} + +type keyFunc func(runtime.Object) (string, error) + +func GetKeyFunc(gvr schema.GroupVersionResource, isNamespaced bool) keyFunc { + prefix := gvr.Group + "/" + gvr.Resource + + var KeyFunc func(ctx context.Context, name string) (string, error) + if isNamespaced { + KeyFunc = func(ctx context.Context, name string) (string, error) { + return registry.NamespaceKeyFunc(ctx, prefix, name) + } + } else { + KeyFunc = func(ctx context.Context, name string) (string, error) { + return registry.NoNamespaceKeyFunc(ctx, prefix, name) + } + } + + // We adapt the store's keyFunc so that we can use it with the StorageDecorator + // without making any assumptions about where objects are stored in etcd + kc := func(obj runtime.Object) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return "", err + } + + if isNamespaced { + return KeyFunc(genericapirequest.WithNamespace(genericapirequest.NewContext(), accessor.GetNamespace()), accessor.GetName()) + } + + return KeyFunc(genericapirequest.NewContext(), accessor.GetName()) + } + + return kc +} + +func storeElementKey(obj interface{}) (string, error) { + elem, ok := obj.(*StoreElement) + if !ok { + return "", fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Key, nil +} + +func storeElementIndexers(indexers *cache.Indexers) cache.Indexers { + if indexers == nil { + return cache.Indexers{} + } + ret := cache.Indexers{} + for indexName, indexFunc := range *indexers { + ret[indexName] = storeElementIndexFunc(indexFunc) + } + return ret +} + +func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc { + return func(obj interface{}) (strings []string, e error) { + seo, err := storeElementObject(obj) + if err != nil { + return nil, err + } + return objIndexFunc(seo) + } +} + +func storeElementObject(obj interface{}) (runtime.Object, error) { + elem, ok := obj.(*StoreElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Object, nil +} + +func NewWatchCache(capacity int, gvr schema.GroupVersionResource, isNamespaced bool, newCluster string) *WatchCache { + wc := &WatchCache{ + capacity: capacity, + KeyFunc: GetKeyFunc(gvr, isNamespaced), + getAttrsFunc: nil, + cache: make([]*watch.Event, capacity), + startIndex: 0, + endIndex: 0, + stores: make(map[string]cache.Indexer), + IsNamespaced: isNamespaced, + } + + wc.AddIndexer(newCluster, nil) + return wc +} + +func (w *WatchCache) GetStores() map[string]cache.Indexer { + return w.stores +} + +func (w *WatchCache) processEvent(event watch.Event, updateFunc func(*StoreElement) error) error { + key, err := w.KeyFunc(event.Object) + if err != nil { + return fmt.Errorf("couldn't compute key: %v", err) + } + elem := &StoreElement{Key: key, Object: event.Object} + if w.getAttrsFunc != nil { + elem.Labels, elem.Fields, err = w.getAttrsFunc(event.Object) + if err != nil { + return err + } + } + + if err := func() error { + // TODO: We should consider moving this lock below after the watchCacheEvent + // is created. In such situation, the only problematic scenario is Replace( + // happening after getting object from store and before acquiring a lock. + // Maybe introduce another lock for this purpose. + w.Lock() + defer w.Unlock() + + return updateFunc(elem) + }(); err != nil { + return err + } + + return nil +} + +// Add takes runtime.Object as an argument. +func (w *WatchCache) Add(obj interface{}, clusterName string) error { + event := watch.Event{Type: watch.Added, Object: obj.(runtime.Object)} + + f := func(elem *StoreElement) error { + return w.stores[clusterName].Add(elem) + } + return w.processEvent(event, f) +} + +// Update takes runtime.Object as an argument. +func (w *WatchCache) Update(obj interface{}, clusterName string) error { + event := watch.Event{Type: watch.Modified, Object: obj.(runtime.Object)} + + f := func(elem *StoreElement) error { + return w.stores[clusterName].Update(elem) + } + return w.processEvent(event, f) +} + +// Delete takes runtime.Object as an argument. +func (w *WatchCache) Delete(obj interface{}, clusterName string) error { + event := watch.Event{Type: watch.Deleted, Object: obj.(runtime.Object)} + + f := func(elem *StoreElement) error { return w.stores[clusterName].Delete(elem) } + return w.processEvent(event, f) +} + +// WaitUntilFreshAndList returns list of pointers to objects. +func (w *WatchCache) WaitUntilFreshAndList(opts *internal.ListOptions) []*StoreElement { + w.RLock() + defer w.RUnlock() + + /* // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only + // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we + // want - they will be filtered out later. The fact that we return less things is only further performance improvement. + // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. + for _, matchValue := range matchValues { + if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { + return result, w.resourceVersion, nil + } + }*/ + var result []*StoreElement + accessor := meta.NewAccessor() + for _, store := range w.stores { + for _, obj := range store.List() { + se := obj.(*StoreElement) + ns, _ := accessor.Namespace(se.Object) + + if opts.Namespaces != nil { + if slices.Contains(opts.Namespaces, ns) { + result = append(result, se) + continue + } else { + continue + } + } + result = append(result, se) + } + } + return result +} + +// WaitUntilFreshAndGet returns list of pointers to objects. +func (w *WatchCache) WaitUntilFreshAndGet(cluster, namespace, name string) (*StoreElement, error) { + w.RLock() + defer w.RUnlock() + + var result *StoreElement + accessor := meta.NewAccessor() + if _, ok := w.stores[cluster]; !ok { + return nil, fmt.Errorf("cluster %s is not existed", cluster) + } + + for _, obj := range w.stores[cluster].List() { + se := obj.(*StoreElement) + ns, err := accessor.Namespace(se.Object) + if err != nil { + return result, err + } + if namespace != "" && namespace != ns { + continue + } + n, err := accessor.Name(se.Object) + if err != nil { + return result, err + } + if name != "" { + if ns == namespace && n == name { + return se, nil + } else { + continue + } + } + } + + return result, nil +} + +func (w *WatchCache) AddIndexer(clusterName string, indexers *cache.Indexers) { + w.Lock() + defer w.Unlock() + w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)) +} diff --git a/pkg/storage/options/options.go b/pkg/storage/options/options.go index bb2b493e8..388d3e33f 100644 --- a/pkg/storage/options/options.go +++ b/pkg/storage/options/options.go @@ -7,6 +7,7 @@ import ( "github.com/spf13/pflag" _ "github.com/clusterpedia-io/clusterpedia/pkg/storage/internalstorage" + "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage" ) type StorageOptions struct { @@ -23,6 +24,10 @@ func (o *StorageOptions) Validate() []error { return nil } + if o.Name == memorystorage.StorageName { + return nil + } + var errors []error if info, err := os.Stat(o.ConfigPath); err != nil { errors = append(errors, err) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index da0ebb971..b968fcc70 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -7,6 +7,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" ) type StorageFactory interface { @@ -42,6 +43,9 @@ type ResourceStorageConfig struct { Codec runtime.Codec StorageVersion schema.GroupVersion MemoryVersion schema.GroupVersion + WatchCache *watchcache.WatchCache + Cluster string + Namespaced bool } type storageRecoverableExceptionError struct { diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go index 89d166535..30b09aced 100644 --- a/pkg/synchromanager/clustersynchro/cluster_synchro.go +++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go @@ -302,8 +302,10 @@ func (s *ClusterSynchro) setSyncResources() { if _, ok := s.storageResourceSynchros.Load(storageGVR); ok { continue } + resourceConfig := config.storageConfig + resourceConfig.Cluster = s.name - resourceStorage, err := s.storage.NewResourceStorage(config.storageConfig) + resourceStorage, err := s.storage.NewResourceStorage(resourceConfig) if err != nil { klog.ErrorS(err, "Failed to create resource storage", "cluster", s.name, "storage resource", storageGVR) updateSyncConditions(storageGVR, clusterv1alpha2.ResourceSyncStatusPending, "SynchroCreateFailed", fmt.Sprintf("new resource storage failed: %s", err)) diff --git a/pkg/synchromanager/clustersynchro/resource_negotiator.go b/pkg/synchromanager/clustersynchro/resource_negotiator.go index e0ad66d96..6158788f0 100644 --- a/pkg/synchromanager/clustersynchro/resource_negotiator.go +++ b/pkg/synchromanager/clustersynchro/resource_negotiator.go @@ -117,6 +117,7 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu } storageConfig, err := negotiator.resourceStorageConfig.NewConfig(syncGVR) + if err != nil { syncCondition.Reason = "SynchroCreateFailed" syncCondition.Message = fmt.Sprintf("new resource storage config failed: %s", err) @@ -124,6 +125,7 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu continue } + storageConfig.Namespaced = apiResource.Namespaced storageGVR := storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version) syncCondition.StorageVersion = storageGVR.Version if syncGR != storageConfig.StorageGroupResource { From 4c5b6b3b99f374df1b97a25c2dc0d980b357d034 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Thu, 8 Sep 2022 18:38:19 +0800 Subject: [PATCH 4/7] complete and optimize the feature for memory storage Co-authored-by: hanweisen Co-authored-by: wuyingjun Co-authored-by: zhangyongxi Signed-off-by: duanmeng --- .../memorystorage/memory_resource_storage.go | 78 ++++++++++++++++++- pkg/storage/memorystorage/memory_storage.go | 10 +++ .../memorystorage/watchcache/watch_cache.go | 31 ++++++++ 3 files changed, 117 insertions(+), 2 deletions(-) diff --git a/pkg/storage/memorystorage/memory_resource_storage.go b/pkg/storage/memorystorage/memory_resource_storage.go index 6c544f09f..18ec0a4b7 100644 --- a/pkg/storage/memorystorage/memory_resource_storage.go +++ b/pkg/storage/memorystorage/memory_resource_storage.go @@ -1,9 +1,14 @@ package memorystorage import ( + "bytes" "context" + "fmt" + "reflect" "sync" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" @@ -86,7 +91,26 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim } func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error { - // todo + var buffer bytes.Buffer + se, err := s.watchCache.WaitUntilFreshAndGet(cluster, namespace, name) + if err != nil { + return err + } + + object := se.Object + err = s.Codec.Encode(object, &buffer) + if err != nil { + return err + } + + obj, _, err := s.Codec.Decode(buffer.Bytes(), nil, into) + if err != nil { + return err + } + + if obj != into { + return fmt.Errorf("Failed to decode resource, into is %T", into) + } return nil } @@ -101,6 +125,56 @@ func (s *ResourceStorage) newClusterWatchEvent(eventType watch.EventType, obj ru } func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, opts *internal.ListOptions) error { - // todo + var buffer bytes.Buffer + objects := s.watchCache.WaitUntilFreshAndList(opts) + + if len(objects) == 0 { + return nil + } + + listPtr, err := meta.GetItemsPtr(listObject) + if err != nil { + return err + } + + v, err := conversion.EnforcePtr(listPtr) + if err != nil || v.Kind() != reflect.Slice { + return fmt.Errorf("need ptr to slice: %v", err) + } + + expected := reflect.New(v.Type().Elem()).Interface().(runtime.Object) + seen := map[string]struct{}{} + accessor := meta.NewAccessor() + deduplicated := make([]runtime.Object, 0, len(objects)) + for _, object := range objects { + buffer.Reset() + obj := object.Object + err = s.Codec.Encode(obj, &buffer) + if err != nil { + return err + } + + obj, _, err := s.Codec.Decode(buffer.Bytes(), nil, expected.DeepCopyObject()) + if err != nil { + return err + } + + name, err := accessor.Name(obj) + if err != nil { + return err + } + + if _, ok := seen[name]; !ok { + seen[name] = struct{}{} + deduplicated = append(deduplicated, obj) + } + } + + slice := reflect.MakeSlice(v.Type(), len(deduplicated), len(deduplicated)) + for i, obj := range deduplicated { + slice.Index(i).Set(reflect.ValueOf(obj).Elem()) + } + + v.Set(slice) return nil } diff --git a/pkg/storage/memorystorage/memory_storage.go b/pkg/storage/memorystorage/memory_storage.go index 24ba8d16a..dbd9c65c1 100644 --- a/pkg/storage/memorystorage/memory_storage.go +++ b/pkg/storage/memorystorage/memory_storage.go @@ -59,10 +59,20 @@ func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string } func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error { + storages.Lock() + defer storages.Unlock() + for _, rs := range storages.resourceStorages { + rs.watchCache.DeleteIndexer(cluster) + } return nil } func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error { + storages.Lock() + defer storages.Unlock() + if rs, ok := storages.resourceStorages[gvr]; ok { + rs.watchCache.DeleteIndexer(cluster) + } return nil } diff --git a/pkg/storage/memorystorage/watchcache/watch_cache.go b/pkg/storage/memorystorage/watchcache/watch_cache.go index 7751e98e2..e7489e4ce 100644 --- a/pkg/storage/memorystorage/watchcache/watch_cache.go +++ b/pkg/storage/memorystorage/watchcache/watch_cache.go @@ -290,3 +290,34 @@ func (w *WatchCache) AddIndexer(clusterName string, indexers *cache.Indexers) { defer w.Unlock() w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)) } + +func (w *WatchCache) DeleteIndexer(clusterName string) { + w.Lock() + defer w.Unlock() + + if _, ok := w.stores[clusterName]; !ok { + return + } + + delete(w.stores, clusterName) + + //clear cache + w.startIndex = 0 + w.endIndex = 0 +} + +func (w *WatchCache) ClearWatchCache(clusterName string) { + w.Lock() + defer w.Unlock() + + if _, ok := w.stores[clusterName]; !ok { + return + } + + delete(w.stores, clusterName) + w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(nil)) + + //clear cache + w.startIndex = 0 + w.endIndex = 0 +} From 46e794aa76002c7d4c2346077bf66681e833de06 Mon Sep 17 00:00:00 2001 From: wuyingjun Date: Thu, 8 Sep 2022 19:02:57 +0800 Subject: [PATCH 5/7] encode and save resourceVersion of object in event Co-authored-by: duanmeng Co-authored-by: hanweisen Co-authored-by: zhangyongxi Signed-off-by: wuyingjun --- .../memorystorage/memory_resource_storage.go | 93 ++++++++++++++- pkg/storage/memorystorage/memory_storage.go | 5 + .../watchcache/cluster_resource_version.go | 112 ++++++++++++++++++ .../memorystorage/watchcache/watch_cache.go | 13 +- 4 files changed, 214 insertions(+), 9 deletions(-) create mode 100644 pkg/storage/memorystorage/watchcache/cluster_resource_version.go diff --git a/pkg/storage/memorystorage/memory_resource_storage.go b/pkg/storage/memorystorage/memory_resource_storage.go index 18ec0a4b7..d93e04106 100644 --- a/pkg/storage/memorystorage/memory_resource_storage.go +++ b/pkg/storage/memorystorage/memory_resource_storage.go @@ -11,10 +11,12 @@ import ( "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcescheme" "github.com/clusterpedia-io/clusterpedia/pkg/storage" cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" ) @@ -39,26 +41,68 @@ type ClusterWatchEvent struct { ClusterName string } -//nolint type ResourceStorage struct { sync.RWMutex Codec runtime.Codec watchCache *cache.WatchCache stopCh chan struct{} + crvSynchro *cache.ClusterResourceVersionSynchro incoming chan ClusterWatchEvent storageConfig *storage.ResourceStorageConfig } -//nolint func (s *ResourceStorage) convertEvent(event *ClusterWatchEvent) error { - s.Lock() - s.Unlock() klog.V(10).Infof("event: %s", event) + s.crvSynchro.UpdateClusterResourceVersion(&event.Event, event.ClusterName) + + err := s.encodeEvent(&event.Event) + if err != nil { + return fmt.Errorf("encode event failed: %v", err) + } s.incoming <- *event return nil } +func (s *ResourceStorage) dispatchEvents() { + for { + select { + case cwe, ok := <-s.incoming: + if !ok { + continue + } + + switch cwe.Event.Type { + case watch.Added: + err := s.watchCache.Add(cwe.Event.Object, cwe.ClusterName) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", cwe.Event.Object, err)) + } + case watch.Modified: + err := s.watchCache.Update(cwe.Event.Object, cwe.ClusterName) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to update watch event object (%#v) to store: %v", cwe.Event.Object, err)) + } + case watch.Deleted: + // TODO: Will any consumers need access to the "last known + // state", which is passed in event.Object? If so, may need + // to change this. + err := s.watchCache.Delete(cwe.Event.Object, cwe.ClusterName) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to delete watch event object (%#v) from store: %v", cwe.Event.Object, err)) + } + case watch.Error: + s.watchCache.ClearWatchCache(cwe.ClusterName) + default: + utilruntime.HandleError(fmt.Errorf("unable to understand watch event %#v", cwe.Event)) + continue + } + case <-s.stopCh: + return + } + } +} + func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig { return s.storageConfig } @@ -126,8 +170,15 @@ func (s *ResourceStorage) newClusterWatchEvent(eventType watch.EventType, obj ru func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, opts *internal.ListOptions) error { var buffer bytes.Buffer - objects := s.watchCache.WaitUntilFreshAndList(opts) + objects, readResourceVersion, err := s.watchCache.WaitUntilFreshAndList(opts) + if err != nil { + return err + } + list, err := meta.ListAccessor(listObject) + if err != nil { + return err + } if len(objects) == 0 { return nil } @@ -175,6 +226,38 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o slice.Index(i).Set(reflect.ValueOf(obj).Elem()) } + list.SetResourceVersion(readResourceVersion.GetClusterResourceVersion()) v.Set(slice) return nil } + +func (s *ResourceStorage) encodeEvent(event *watch.Event) error { + var buffer bytes.Buffer + + gk := event.Object.GetObjectKind().GroupVersionKind().GroupKind() + config := s.storageConfig + if ok := resourcescheme.LegacyResourceScheme.IsGroupRegistered(gk.Group); ok { + dest, err := resourcescheme.LegacyResourceScheme.New(config.MemoryVersion.WithKind(gk.Kind)) + if err != nil { + return err + } + + object := event.Object + err = config.Codec.Encode(object, &buffer) + if err != nil { + return err + } + + obj, _, err := config.Codec.Decode(buffer.Bytes(), nil, dest) + if err != nil { + return err + } + + if obj != dest { + return err + } + event.Object = dest + } + + return nil +} diff --git a/pkg/storage/memorystorage/memory_storage.go b/pkg/storage/memorystorage/memory_storage.go index dbd9c65c1..5cdf2069a 100644 --- a/pkg/storage/memorystorage/memory_storage.go +++ b/pkg/storage/memorystorage/memory_storage.go @@ -33,6 +33,8 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi resourceStorage.watchCache.AddIndexer(config.Cluster, nil) } + resourceStorage.crvSynchro.SetClusterResourceVersion(config.Cluster, "0") + return resourceStorage, nil } @@ -41,10 +43,13 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi resourceStorage := &ResourceStorage{ incoming: make(chan ClusterWatchEvent, 100), Codec: config.Codec, + crvSynchro: cache.NewClusterResourceVersionSynchro(config.Cluster), watchCache: watchCache, storageConfig: config, } + go resourceStorage.dispatchEvents() + storages.resourceStorages[gvr] = resourceStorage return resourceStorage, nil diff --git a/pkg/storage/memorystorage/watchcache/cluster_resource_version.go b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go new file mode 100644 index 000000000..3ca13bb03 --- /dev/null +++ b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go @@ -0,0 +1,112 @@ +package watchcache + +import ( + "encoding/base64" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" +) + +// ClusterResourceVersion holds the pediaCluster name and its latest resourceVersion +type ClusterResourceVersion struct { + rvmap map[string]string +} + +func NewClusterResourceVersion(cluster string) *ClusterResourceVersion { + return &ClusterResourceVersion{ + rvmap: map[string]string{cluster: "0"}, + } +} + +func NewClusterResourceVersionFromString(rv string) (*ClusterResourceVersion, error) { + result := &ClusterResourceVersion{ + rvmap: map[string]string{}, + } + if rv == "0" || rv == "" { + return result, nil + } + + decoded, err := base64.RawURLEncoding.DecodeString(rv) + if err != nil { + return nil, fmt.Errorf("base64 decode failed: %v", err) + } + + err = json.Unmarshal(decoded, &result.rvmap) + if err != nil { + return nil, fmt.Errorf("json decode failed: %v", err) + } + + return result, nil +} + +// GetClusterResourceVersion return a base64 encode string of ClusterResourceVersion +func (crv *ClusterResourceVersion) GetClusterResourceVersion() string { + bytes, err := json.Marshal(crv.rvmap) + if err != nil { + klog.Errorf("base64 encode failed: %v", err) + return "" + } + return base64.RawURLEncoding.EncodeToString(bytes) +} + +func (crv *ClusterResourceVersion) IsEqual(another *ClusterResourceVersion) bool { + if len(crv.rvmap) != len(another.rvmap) { + return false + } + for key, value := range crv.rvmap { + if another.rvmap[key] != value { + return false + } + } + return true +} + +func (crv *ClusterResourceVersion) IsEmpty() bool { + return len(crv.rvmap) == 0 +} + +type ClusterResourceVersionSynchro struct { + crv *ClusterResourceVersion + + sync.RWMutex +} + +func NewClusterResourceVersionSynchro(cluster string) *ClusterResourceVersionSynchro { + return &ClusterResourceVersionSynchro{ + crv: NewClusterResourceVersion(cluster), + } +} + +// UpdateClusterResourceVersion update the resourceVersion in ClusterResourceVersionSynchro to the latest +func (crvs *ClusterResourceVersionSynchro) UpdateClusterResourceVersion(event *watch.Event, cluster string) { + crvs.Lock() + defer crvs.Unlock() + + crv := crvs.crv + accessor := meta.NewAccessor() + rv, _ := accessor.ResourceVersion(event.Object) + crv.rvmap[cluster] = rv + + bytes, err := json.Marshal(crv.rvmap) + if err != nil { + klog.Errorf("base64 encode failed: %v", err) + return + } + + err = accessor.SetResourceVersion(event.Object, base64.RawURLEncoding.EncodeToString(bytes)) + if err != nil { + klog.Warningf("set resourceVersion failed: %v, may be it's a clear watch cache order event", err) + return + } +} + +func (crvs *ClusterResourceVersionSynchro) SetClusterResourceVersion(clusterName string, resourceVersion string) { + crvs.Lock() + defer crvs.Unlock() + + crvs.crv.rvmap[clusterName] = resourceVersion +} diff --git a/pkg/storage/memorystorage/watchcache/watch_cache.go b/pkg/storage/memorystorage/watchcache/watch_cache.go index e7489e4ce..32d90c206 100644 --- a/pkg/storage/memorystorage/watchcache/watch_cache.go +++ b/pkg/storage/memorystorage/watchcache/watch_cache.go @@ -59,6 +59,9 @@ type WatchCache struct { // NOTE: We assume that is thread-safe. stores map[string]cache.Indexer + // ResourceVersion up to which the watchCache is propagated. + resourceVersion *ClusterResourceVersion + //eventHandler func(*watchCacheEvent) WatchersLock sync.RWMutex @@ -215,7 +218,7 @@ func (w *WatchCache) Delete(obj interface{}, clusterName string) error { } // WaitUntilFreshAndList returns list of pointers to objects. -func (w *WatchCache) WaitUntilFreshAndList(opts *internal.ListOptions) []*StoreElement { +func (w *WatchCache) WaitUntilFreshAndList(opts *internal.ListOptions) ([]*StoreElement, *ClusterResourceVersion, error) { w.RLock() defer w.RUnlock() @@ -233,8 +236,10 @@ func (w *WatchCache) WaitUntilFreshAndList(opts *internal.ListOptions) []*StoreE for _, store := range w.stores { for _, obj := range store.List() { se := obj.(*StoreElement) - ns, _ := accessor.Namespace(se.Object) - + ns, err := accessor.Namespace(se.Object) + if err != nil { + return result, w.resourceVersion, err + } if opts.Namespaces != nil { if slices.Contains(opts.Namespaces, ns) { result = append(result, se) @@ -246,7 +251,7 @@ func (w *WatchCache) WaitUntilFreshAndList(opts *internal.ListOptions) []*StoreE result = append(result, se) } } - return result + return result, w.resourceVersion, nil } // WaitUntilFreshAndGet returns list of pointers to objects. From 021c56aa85f45dbdad038c71ac60d7706fcbcbd1 Mon Sep 17 00:00:00 2001 From: zhangyongxi Date: Thu, 8 Sep 2022 19:15:05 +0800 Subject: [PATCH 6/7] support watch resources with client-go or kubectl Co-authored-by: duanmeng Co-authored-by: wuyingjun Co-authored-by: hanweisen Signed-off-by: zhangyongxi --- pkg/kubeapiserver/apiserver.go | 1 + pkg/kubeapiserver/resource_handler.go | 2 + pkg/kubeapiserver/resourcerest/storage.go | 61 ++++++++ .../memorystorage/memory_resource_storage.go | 9 ++ pkg/storage/memorystorage/memory_storage.go | 8 + .../memorystorage/watchcache/cache_watcher.go | 140 ++++++++++++++++++ .../watchcache/cluster_resource_version.go | 9 ++ .../memorystorage/watchcache/watch_cache.go | 128 +++++++++++++++- pkg/utils/watch/watchevent.go | 29 ++++ 9 files changed, 379 insertions(+), 8 deletions(-) create mode 100644 pkg/storage/memorystorage/watchcache/cache_watcher.go create mode 100644 pkg/utils/watch/watchevent.go diff --git a/pkg/kubeapiserver/apiserver.go b/pkg/kubeapiserver/apiserver.go index 420eeb8c3..5bd7a2003 100644 --- a/pkg/kubeapiserver/apiserver.go +++ b/pkg/kubeapiserver/apiserver.go @@ -35,6 +35,7 @@ func init() { &metav1.APIGroupList{}, &metav1.APIGroup{}, &metav1.APIResourceList{}, + &metav1.WatchEvent{}, ) } diff --git a/pkg/kubeapiserver/resource_handler.go b/pkg/kubeapiserver/resource_handler.go index 94d2522c8..748fda430 100644 --- a/pkg/kubeapiserver/resource_handler.go +++ b/pkg/kubeapiserver/resource_handler.go @@ -118,6 +118,8 @@ func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { handler = handlers.GetResource(storage, reqScope) case "list": handler = handlers.ListResource(storage, nil, reqScope, false, r.minRequestTimeout) + case "watch": + handler = handlers.ListResource(storage, storage, reqScope, true, r.minRequestTimeout) default: responsewriters.ErrorNegotiated( apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb), diff --git a/pkg/kubeapiserver/resourcerest/storage.go b/pkg/kubeapiserver/resourcerest/storage.go index 3cffdb3bd..220a43570 100644 --- a/pkg/kubeapiserver/resourcerest/storage.go +++ b/pkg/kubeapiserver/resourcerest/storage.go @@ -9,6 +9,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" genericrequest "k8s.io/apiserver/pkg/endpoints/request" genericfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/rest" @@ -20,8 +21,10 @@ import ( "github.com/clusterpedia-io/api/clusterpedia/v1beta1" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/printers" "github.com/clusterpedia-io/clusterpedia/pkg/storage" + cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" "github.com/clusterpedia-io/clusterpedia/pkg/utils/negotiation" "github.com/clusterpedia-io/clusterpedia/pkg/utils/request" + utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) type RESTStorage struct { @@ -38,6 +41,7 @@ type RESTStorage struct { var _ rest.Lister = &RESTStorage{} var _ rest.Getter = &RESTStorage{} +var _ rest.Watcher = &RESTStorage{} func (s *RESTStorage) New() runtime.Object { return s.NewFunc() @@ -121,3 +125,60 @@ func (s *RESTStorage) ConvertToTable(ctx context.Context, object runtime.Object, return printers.NewDefaultTableConvertor(s.DefaultQualifiedResource).ConvertToTable(ctx, object, tableOptions) } + +func (s *RESTStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + resourceversion := options.ResourceVersion + watchRV, err := cache.NewClusterResourceVersionFromString(resourceversion) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } + + watcher := cache.NewCacheWatcher(100) + watchCache := s.Storage.GetStorageConfig().WatchCache + watchCache.Lock() + defer watchCache.Unlock() + + initEvents, err := watchCache.GetAllEventsSinceThreadUnsafe(watchRV) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } + + func() { + watchCache.WatchersLock.Lock() + defer watchCache.WatchersLock.Unlock() + + watchCache.WatchersBuffer = append(watchCache.WatchersBuffer, watcher) + }() + + go watcher.Process(ctx, initEvents) + return watcher, nil +} + +type errWatcher struct { + result chan watch.Event +} + +func newErrWatcher(err error) *errWatcher { + errEvent := utilwatch.NewErrorEvent(err) + + // Create a watcher with room for a single event, populate it, and close the channel + watcher := &errWatcher{result: make(chan watch.Event, 1)} + watcher.result <- errEvent + close(watcher.result) + + return watcher +} + +func (c *errWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +func (c *errWatcher) Stop() { + // no-op +} diff --git a/pkg/storage/memorystorage/memory_resource_storage.go b/pkg/storage/memorystorage/memory_resource_storage.go index d93e04106..2a65ca69e 100644 --- a/pkg/storage/memorystorage/memory_resource_storage.go +++ b/pkg/storage/memorystorage/memory_resource_storage.go @@ -97,12 +97,21 @@ func (s *ResourceStorage) dispatchEvents() { utilruntime.HandleError(fmt.Errorf("unable to understand watch event %#v", cwe.Event)) continue } + s.dispatchEvent(&cwe.Event) case <-s.stopCh: return } } } +func (s *ResourceStorage) dispatchEvent(event *watch.Event) { + s.watchCache.WatchersLock.RLock() + defer s.watchCache.WatchersLock.RUnlock() + for _, watcher := range s.watchCache.WatchersBuffer { + watcher.NonblockingAdd(event) + } +} + func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig { return s.storageConfig } diff --git a/pkg/storage/memorystorage/memory_storage.go b/pkg/storage/memorystorage/memory_storage.go index 5cdf2069a..f7853c731 100644 --- a/pkg/storage/memorystorage/memory_storage.go +++ b/pkg/storage/memorystorage/memory_storage.go @@ -2,12 +2,14 @@ package memorystorage import ( "context" + "fmt" "k8s.io/apimachinery/pkg/runtime/schema" internal "github.com/clusterpedia-io/api/clusterpedia" "github.com/clusterpedia-io/clusterpedia/pkg/storage" cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" + utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) type StorageFactory struct { @@ -68,6 +70,9 @@ func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error defer storages.Unlock() for _, rs := range storages.resourceStorages { rs.watchCache.DeleteIndexer(cluster) + // If a pediacluster is deleted from clusterpedia,then the informer of client-go should be list and watch again + errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("PediaCluster %s is deleted", cluster)) + rs.dispatchEvent(&errorEvent) } return nil } @@ -77,6 +82,9 @@ func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster strin defer storages.Unlock() if rs, ok := storages.resourceStorages[gvr]; ok { rs.watchCache.DeleteIndexer(cluster) + // If a gvr is deleted from clusterpedia,then the informer of client-go should be list and watch again + errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("GVR %v is deleted", gvr)) + rs.dispatchEvent(&errorEvent) } return nil } diff --git a/pkg/storage/memorystorage/watchcache/cache_watcher.go b/pkg/storage/memorystorage/watchcache/cache_watcher.go new file mode 100644 index 000000000..8c853b81c --- /dev/null +++ b/pkg/storage/memorystorage/watchcache/cache_watcher.go @@ -0,0 +1,140 @@ +package watchcache + +import ( + "context" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +// CacheWatcher implements watch.Interface +type CacheWatcher struct { + input chan *watch.Event + result chan watch.Event + done chan struct{} + stopped bool + forget func() +} + +func NewCacheWatcher(chanSize int) *CacheWatcher { + return &CacheWatcher{ + input: make(chan *watch.Event, chanSize), + result: make(chan watch.Event, chanSize), + done: make(chan struct{}), + stopped: false, + forget: func() {}, + } +} + +// ResultChan implements watch.Interface. +func (c *CacheWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Stop implements watch.Interface. +func (c *CacheWatcher) Stop() { + c.forget() +} + +func (c *CacheWatcher) StopThreadUnsafe() { + if !c.stopped { + c.stopped = true + close(c.done) + close(c.input) + } +} + +func (c *CacheWatcher) NonblockingAdd(event *watch.Event) bool { + select { + case c.input <- event: + return true + default: + return false + } +} + +// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) +func (c *CacheWatcher) Add(event *watch.Event, timer *time.Timer) bool { + // Try to send the event immediately, without blocking. + if c.NonblockingAdd(event) { + return true + } + + closeFunc := func() { + // This means that we couldn't send event to that watcher. + // Since we don't want to block on it infinitely, + // we simply terminate it. + //klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String()) + c.forget() + } + + if timer == nil { + closeFunc() + return false + } + + // OK, block sending, but only until timer fires. + select { + case c.input <- event: + return true + case <-timer.C: + closeFunc() + return false + } +} + +func (c *CacheWatcher) sendWatchCacheEvent(event *watch.Event) { + //watchEvent := c.convertToWatchEvent(event) + watchEvent := event + if watchEvent == nil { + // Watcher is not interested in that object. + return + } + + // We need to ensure that if we put event X to the c.result, all + // previous events were already put into it before, no matter whether + // c.done is close or not. + // Thus we cannot simply select from c.done and c.result and this + // would give us non-determinism. + // At the same time, we don't want to block infinitely on putting + // to c.result, when c.done is already closed. + + // This ensures that with c.done already close, we at most once go + // into the next select after this. With that, no matter which + // statement we choose there, we will deliver only consecutive + // events. + select { + case <-c.done: + return + default: + } + + select { + case c.result <- *watchEvent: + case <-c.done: + } +} + +// Process send the events which stored in watchCache into the result channel,and select the event from input channel into result channel continuously. +func (c *CacheWatcher) Process(ctx context.Context, initEvents []*watch.Event) { + defer utilruntime.HandleCrash() + + for _, event := range initEvents { + c.sendWatchCacheEvent(event) + } + + defer close(c.result) + defer c.Stop() + for { + select { + case event, ok := <-c.input: + if !ok { + return + } + c.sendWatchCacheEvent(event) + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/storage/memorystorage/watchcache/cluster_resource_version.go b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go index 3ca13bb03..abc7ec0be 100644 --- a/pkg/storage/memorystorage/watchcache/cluster_resource_version.go +++ b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go @@ -53,6 +53,15 @@ func (crv *ClusterResourceVersion) GetClusterResourceVersion() string { return base64.RawURLEncoding.EncodeToString(bytes) } +// GetClusterResourceVersionFromEvent return a ClusterResourceVersion from watch event +func GetClusterResourceVersionFromEvent(event *watch.Event) (*ClusterResourceVersion, error) { + accessor, err := meta.Accessor(event.Object) + if err != nil { + return nil, fmt.Errorf("unable to understand watch event %#v", event) + } + return NewClusterResourceVersionFromString(accessor.GetResourceVersion()) +} + func (crv *ClusterResourceVersion) IsEqual(another *ClusterResourceVersion) bool { if len(crv.rvmap) != len(another.rvmap) { return false diff --git a/pkg/storage/memorystorage/watchcache/watch_cache.go b/pkg/storage/memorystorage/watchcache/watch_cache.go index 32d90c206..8b569e677 100644 --- a/pkg/storage/memorystorage/watchcache/watch_cache.go +++ b/pkg/storage/memorystorage/watchcache/watch_cache.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -66,7 +67,12 @@ type WatchCache struct { WatchersLock sync.RWMutex - IsNamespaced bool + // watchersBuffer is a list of watchers potentially interested in currently + // dispatched event. + WatchersBuffer []*CacheWatcher + // blockedWatchers is a list of watchers whose buffer is currently full. + BlockedWatchers []*CacheWatcher + IsNamespaced bool } type keyFunc func(runtime.Object) (string, error) @@ -160,7 +166,7 @@ func (w *WatchCache) GetStores() map[string]cache.Indexer { return w.stores } -func (w *WatchCache) processEvent(event watch.Event, updateFunc func(*StoreElement) error) error { +func (w *WatchCache) processEvent(event watch.Event, resourceVersion *ClusterResourceVersion, updateFunc func(*StoreElement) error) error { key, err := w.KeyFunc(event.Object) if err != nil { return fmt.Errorf("couldn't compute key: %v", err) @@ -173,6 +179,8 @@ func (w *WatchCache) processEvent(event watch.Event, updateFunc func(*StoreEleme } } + wcEvent := event + if err := func() error { // TODO: We should consider moving this lock below after the watchCacheEvent // is created. In such situation, the only problematic scenario is Replace( @@ -181,6 +189,8 @@ func (w *WatchCache) processEvent(event watch.Event, updateFunc func(*StoreEleme w.Lock() defer w.Unlock() + w.updateCache(&wcEvent) + w.resourceVersion = resourceVersion return updateFunc(elem) }(); err != nil { return err @@ -191,30 +201,70 @@ func (w *WatchCache) processEvent(event watch.Event, updateFunc func(*StoreEleme // Add takes runtime.Object as an argument. func (w *WatchCache) Add(obj interface{}, clusterName string) error { - event := watch.Event{Type: watch.Added, Object: obj.(runtime.Object)} + object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{Type: watch.Added, Object: object} f := func(elem *StoreElement) error { return w.stores[clusterName].Add(elem) } - return w.processEvent(event, f) + return w.processEvent(event, resourceVersion, f) } // Update takes runtime.Object as an argument. func (w *WatchCache) Update(obj interface{}, clusterName string) error { - event := watch.Event{Type: watch.Modified, Object: obj.(runtime.Object)} + object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{Type: watch.Modified, Object: object} f := func(elem *StoreElement) error { return w.stores[clusterName].Update(elem) } - return w.processEvent(event, f) + return w.processEvent(event, resourceVersion, f) } // Delete takes runtime.Object as an argument. func (w *WatchCache) Delete(obj interface{}, clusterName string) error { - event := watch.Event{Type: watch.Deleted, Object: obj.(runtime.Object)} + object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{Type: watch.Deleted, Object: object} f := func(elem *StoreElement) error { return w.stores[clusterName].Delete(elem) } - return w.processEvent(event, f) + return w.processEvent(event, resourceVersion, f) +} + +func (w *WatchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, *ClusterResourceVersion, error) { + object, ok := obj.(runtime.Object) + if !ok { + return nil, &ClusterResourceVersion{}, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) + } + + resourceVersion, err := meta.NewAccessor().ResourceVersion(object) + if err != nil { + return nil, &ClusterResourceVersion{}, err + } + + clusterRvStore, err := NewClusterResourceVersionFromString(resourceVersion) + if err != nil { + return nil, &ClusterResourceVersion{}, err + } + return object, clusterRvStore, nil +} + +// Assumes that lock is already held for write. +func (w *WatchCache) updateCache(event *watch.Event) { + if w.endIndex == w.startIndex+w.capacity { + // Cache is full - remove the oldest element. + w.startIndex++ + } + w.cache[w.endIndex%w.capacity] = event + w.endIndex++ } // WaitUntilFreshAndList returns list of pointers to objects. @@ -290,6 +340,68 @@ func (w *WatchCache) WaitUntilFreshAndGet(cluster, namespace, name string) (*Sto return result, nil } +// GetAllEventsSinceThreadUnsafe returns watch event from slice window in watchCache by the resourceVersion +func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion *ClusterResourceVersion) ([]*watch.Event, error) { + size := w.endIndex - w.startIndex + oldestObj := w.cache[w.startIndex%w.capacity] + if resourceVersion.IsEmpty() { + // resourceVersion = 0 means that we don't require any specific starting point + // and we would like to start watching from ~now. + // However, to keep backward compatibility, we additionally need to return the + // current state and only then start watching from that point. + // + // TODO: In v2 api, we should stop returning the current state - #13969. + var allItems []interface{} + for _, store := range w.stores { + allItems = append(allItems, store.List()...) + } + result := make([]*watch.Event, len(allItems)) + for i, item := range allItems { + elem, ok := item.(*StoreElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", elem) + } + result[i] = &watch.Event{ + Type: watch.Added, + Object: elem.Object, + } + } + return result, nil + } + + var index int + var founded bool + for index = 0; w.startIndex+index < w.endIndex; index++ { + rv, err := GetClusterResourceVersionFromEvent(w.cache[(w.startIndex+index)%w.capacity]) + if err != nil { + return nil, err + } + if rv.IsEqual(resourceVersion) { + founded = true + index++ + break + } + } + + var result []*watch.Event + if !founded { + oldest, err := GetClusterResourceVersionFromEvent(oldestObj) + if err != nil { + return nil, err + } + return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %#v (%#v)", resourceVersion, oldest)) + } else { + i := 0 + result = make([]*watch.Event, size-index) + for ; w.startIndex+index < w.endIndex; index++ { + result[i] = w.cache[(w.startIndex+index)%w.capacity] + i++ + } + } + + return result, nil +} + func (w *WatchCache) AddIndexer(clusterName string, indexers *cache.Indexers) { w.Lock() defer w.Unlock() diff --git a/pkg/utils/watch/watchevent.go b/pkg/utils/watch/watchevent.go new file mode 100644 index 000000000..1e566699e --- /dev/null +++ b/pkg/utils/watch/watchevent.go @@ -0,0 +1,29 @@ +package watch + +import ( + "net/http" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +// NewErrorEvent return a watch event of error status with an error +func NewErrorEvent(err error) watch.Event { + errEvent := watch.Event{Type: watch.Error} + switch err := err.(type) { + case runtime.Object: + errEvent.Object = err + case *apierrors.StatusError: + errEvent.Object = &err.ErrStatus + default: + errEvent.Object = &metav1.Status{ + Status: metav1.StatusFailure, + Message: err.Error(), + Reason: metav1.StatusReasonInternalError, + Code: http.StatusInternalServerError, + } + } + return errEvent +} From ff2054451c1dea2f30650ec6a205268867e4d3ea Mon Sep 17 00:00:00 2001 From: zhangyongxi Date: Fri, 9 Sep 2022 16:55:08 +0800 Subject: [PATCH 7/7] code optimization for ListAndWatch Co-authored-by: duanmeng Co-authored-by: wuyingjun Co-authored-by: hanweisen Signed-off-by: zhangyongxi --- Makefile | 2 +- cmd/apiserver/app/apiserver.go | 2 +- cmd/apiserver/app/options/options.go | 7 +- .../app/binding_apiserver.go | 23 ++- .../clusterpedia_binding_apiserver_rbac.yaml | 24 +++ deploy/clusterpedia_apiserver_rbac.yaml | 3 - pkg/apiserver/apiserver.go | 11 -- .../clusterpedia/collectionresources/rest.go | 2 +- .../clusterresource_controller.go | 2 +- pkg/kubeapiserver/resourcerest/storage.go | 57 +----- pkg/kubeapiserver/restmanager.go | 20 +-- .../storageconfig/storageconfig_factory.go | 12 +- pkg/storage/internalstorage/register.go | 4 + .../internalstorage/resource_storage.go | 6 + pkg/storage/internalstorage/storage.go | 4 + .../memorystorage/memory_resource_storage.go | 169 ++++++++---------- pkg/storage/memorystorage/memory_storage.go | 67 ++++--- pkg/storage/memorystorage/register.go | 5 +- .../watchcache/cluster_resource_version.go | 27 ++- .../memorystorage/watchcache/watch_cache.go | 141 +++++++++------ pkg/storage/options/options.go | 4 +- pkg/storage/storage.go | 7 +- .../clustersynchro/cluster_synchro.go | 5 +- .../clustersynchro/resource_negotiator.go | 4 +- pkg/synchromanager/clustersynchro_manager.go | 7 + 25 files changed, 325 insertions(+), 290 deletions(-) create mode 100644 deploy/binding-apiserver/clusterpedia_binding_apiserver_rbac.yaml diff --git a/Makefile b/Makefile index 5975ae93a..499515c52 100644 --- a/Makefile +++ b/Makefile @@ -94,7 +94,7 @@ apiserver: .PHONY: binding-apiserver binding-apiserver: CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \ - -ldflags $(LDFLAGS) \ + -ldflags "$(LDFLAGS)" \ -o bin/binding-apiserver \ cmd/binding-apiserver/main.go diff --git a/cmd/apiserver/app/apiserver.go b/cmd/apiserver/app/apiserver.go index 541b2a363..9dd16a869 100644 --- a/cmd/apiserver/app/apiserver.go +++ b/cmd/apiserver/app/apiserver.go @@ -32,7 +32,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command { } cliflag.PrintFlags(cmd.Flags()) - config, err := opts.Config(false) + config, err := opts.Config() if err != nil { return err } diff --git a/cmd/apiserver/app/options/options.go b/cmd/apiserver/app/options/options.go index 326b1ce84..c133326a7 100644 --- a/cmd/apiserver/app/options/options.go +++ b/cmd/apiserver/app/options/options.go @@ -75,7 +75,7 @@ func (o *ClusterPediaServerOptions) Validate() error { return utilerrors.NewAggregate(errors) } -func (o *ClusterPediaServerOptions) Config(bindingSyncController bool) (*apiserver.Config, error) { +func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) { if err := o.Validate(); err != nil { return nil, err } @@ -109,9 +109,8 @@ func (o *ClusterPediaServerOptions) Config(bindingSyncController bool) (*apiserv } return &apiserver.Config{ - GenericConfig: genericConfig, - StorageFactory: storage, - BindingSyncController: bindingSyncController, + GenericConfig: genericConfig, + StorageFactory: storage, }, nil } diff --git a/cmd/binding-apiserver/app/binding_apiserver.go b/cmd/binding-apiserver/app/binding_apiserver.go index 1b8ed1e55..5701a83b5 100644 --- a/cmd/binding-apiserver/app/binding_apiserver.go +++ b/cmd/binding-apiserver/app/binding_apiserver.go @@ -2,6 +2,7 @@ package app import ( "context" + "fmt" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/util/runtime" @@ -13,6 +14,8 @@ import ( "k8s.io/component-base/term" "github.com/clusterpedia-io/clusterpedia/cmd/apiserver/app/options" + "github.com/clusterpedia-io/clusterpedia/pkg/generated/clientset/versioned" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager" clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" "github.com/clusterpedia-io/clusterpedia/pkg/version/verflag" ) @@ -32,12 +35,28 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command { } cliflag.PrintFlags(cmd.Flags()) - config, err := opts.Config(true) + config, err := opts.Config() if err != nil { return err } - server, err := config.Complete().New() + completedConfig := config.Complete() + if completedConfig.ClientConfig == nil { + return fmt.Errorf("CompletedConfig.New() called with config.ClientConfig == nil") + } + if completedConfig.StorageFactory == nil { + return fmt.Errorf("CompletedConfig.New() called with config.StorageFactory == nil") + } + + crdclient, err := versioned.NewForConfig(completedConfig.ClientConfig) + if err != nil { + return err + } + + synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory) + go synchromanager.Run(1, ctx.Done()) + + server, err := completedConfig.New() if err != nil { return err } diff --git a/deploy/binding-apiserver/clusterpedia_binding_apiserver_rbac.yaml b/deploy/binding-apiserver/clusterpedia_binding_apiserver_rbac.yaml new file mode 100644 index 000000000..3ab2ec2bc --- /dev/null +++ b/deploy/binding-apiserver/clusterpedia_binding_apiserver_rbac.yaml @@ -0,0 +1,24 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: clusterpedia +rules: + - apiGroups: ['*'] + resources: ['*'] + verbs: ["*"] + - nonResourceURLs: ['*'] + verbs: ["get"] + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: clusterpedia +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: clusterpedia +subjects: + - kind: ServiceAccount + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system diff --git a/deploy/clusterpedia_apiserver_rbac.yaml b/deploy/clusterpedia_apiserver_rbac.yaml index caa6735b8..c5e40237e 100644 --- a/deploy/clusterpedia_apiserver_rbac.yaml +++ b/deploy/clusterpedia_apiserver_rbac.yaml @@ -28,6 +28,3 @@ subjects: - kind: ServiceAccount name: clusterpedia-controller-manager namespace: clusterpedia-system - - kind: ServiceAccount - name: clusterpedia-binding-apiserver - namespace: clusterpedia-system \ No newline at end of file diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 41bc564d9..59c2ee60b 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -26,7 +26,6 @@ import ( informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver" "github.com/clusterpedia-io/clusterpedia/pkg/storage" - "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager" "github.com/clusterpedia-io/clusterpedia/pkg/utils/filters" ) @@ -65,8 +64,6 @@ type Config struct { GenericConfig *genericapiserver.RecommendedConfig StorageFactory storage.StorageFactory - // BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager - BindingSyncController bool } type ClusterPediaServer struct { @@ -78,8 +75,6 @@ type completedConfig struct { ClientConfig *clientrest.Config StorageFactory storage.StorageFactory - // BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager - BindingSyncController bool } // CompletedConfig embeds a private pointer that cannot be instantiated outside of this package. @@ -93,7 +88,6 @@ func (cfg *Config) Complete() CompletedConfig { cfg.GenericConfig.Complete(), cfg.GenericConfig.ClientConfig, cfg.StorageFactory, - cfg.BindingSyncController, } c.GenericConfig.Version = &version.Info{ @@ -167,11 +161,6 @@ func (config completedConfig) New() (*ClusterPediaServer, error) { clusterpediaInformerFactory.Start(context.StopCh) clusterpediaInformerFactory.WaitForCacheSync(context.StopCh) - if config.BindingSyncController { - synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory) - synchromanager.Run(1, context.StopCh) - } - return nil }) diff --git a/pkg/apiserver/registry/clusterpedia/collectionresources/rest.go b/pkg/apiserver/registry/clusterpedia/collectionresources/rest.go index 7abeb47e1..b2c1d47b0 100644 --- a/pkg/apiserver/registry/clusterpedia/collectionresources/rest.go +++ b/pkg/apiserver/registry/clusterpedia/collectionresources/rest.go @@ -51,7 +51,7 @@ func NewREST(serializer runtime.NegotiatedSerializer, factory storage.StorageFac for irt := range cr.ResourceTypes { rt := &cr.ResourceTypes[irt] if rt.Resource != "" { - config, err := configFactory.NewConfig(rt.GroupResource().WithVersion("")) + config, err := configFactory.NewConfig(rt.GroupResource().WithVersion(""), false) if err != nil { continue } diff --git a/pkg/kubeapiserver/clusterresource_controller.go b/pkg/kubeapiserver/clusterresource_controller.go index 8f23b1796..b974851db 100644 --- a/pkg/kubeapiserver/clusterresource_controller.go +++ b/pkg/kubeapiserver/clusterresource_controller.go @@ -83,7 +83,7 @@ func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alp return } - discoveryapis := c.restManager.LoadResources(resources, cluster.Name) + discoveryapis := c.restManager.LoadResources(resources) c.discoveryManager.SetClusterGroupResource(cluster.Name, discoveryapis) c.clusterresources[cluster.Name] = resources diff --git a/pkg/kubeapiserver/resourcerest/storage.go b/pkg/kubeapiserver/resourcerest/storage.go index 220a43570..8420f665d 100644 --- a/pkg/kubeapiserver/resourcerest/storage.go +++ b/pkg/kubeapiserver/resourcerest/storage.go @@ -21,10 +21,8 @@ import ( "github.com/clusterpedia-io/api/clusterpedia/v1beta1" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/printers" "github.com/clusterpedia-io/clusterpedia/pkg/storage" - cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" "github.com/clusterpedia-io/clusterpedia/pkg/utils/negotiation" "github.com/clusterpedia-io/clusterpedia/pkg/utils/request" - utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) type RESTStorage struct { @@ -127,58 +125,5 @@ func (s *RESTStorage) ConvertToTable(ctx context.Context, object runtime.Object, } func (s *RESTStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { - resourceversion := options.ResourceVersion - watchRV, err := cache.NewClusterResourceVersionFromString(resourceversion) - if err != nil { - // To match the uncached watch implementation, once we have passed authn/authz/admission, - // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, - // rather than a directly returned error. - return newErrWatcher(err), nil - } - - watcher := cache.NewCacheWatcher(100) - watchCache := s.Storage.GetStorageConfig().WatchCache - watchCache.Lock() - defer watchCache.Unlock() - - initEvents, err := watchCache.GetAllEventsSinceThreadUnsafe(watchRV) - if err != nil { - // To match the uncached watch implementation, once we have passed authn/authz/admission, - // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, - // rather than a directly returned error. - return newErrWatcher(err), nil - } - - func() { - watchCache.WatchersLock.Lock() - defer watchCache.WatchersLock.Unlock() - - watchCache.WatchersBuffer = append(watchCache.WatchersBuffer, watcher) - }() - - go watcher.Process(ctx, initEvents) - return watcher, nil -} - -type errWatcher struct { - result chan watch.Event -} - -func newErrWatcher(err error) *errWatcher { - errEvent := utilwatch.NewErrorEvent(err) - - // Create a watcher with room for a single event, populate it, and close the channel - watcher := &errWatcher{result: make(chan watch.Event, 1)} - watcher.result <- errEvent - close(watcher.result) - - return watcher -} - -func (c *errWatcher) ResultChan() <-chan watch.Event { - return c.result -} - -func (c *errWatcher) Stop() { - // no-op + return s.Storage.Watch(ctx, options) } diff --git a/pkg/kubeapiserver/restmanager.go b/pkg/kubeapiserver/restmanager.go index b259ae738..0e6cc657d 100644 --- a/pkg/kubeapiserver/restmanager.go +++ b/pkg/kubeapiserver/restmanager.go @@ -101,7 +101,7 @@ func (m *RESTManager) GetRESTResourceInfo(gvr schema.GroupVersionResource) RESTR return infos[gvr] } -func (m *RESTManager) LoadResources(infos ResourceInfoMap, cluster string) map[schema.GroupResource]discovery.ResourceDiscoveryAPI { +func (m *RESTManager) LoadResources(infos ResourceInfoMap) map[schema.GroupResource]discovery.ResourceDiscoveryAPI { apigroups := m.groups.Load().(map[string]metav1.APIGroup) apiresources := m.resources.Load().(map[schema.GroupResource]metav1.APIResource) restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo) @@ -175,7 +175,7 @@ func (m *RESTManager) LoadResources(infos ResourceInfoMap, cluster string) map[s m.addAPIResourcesLocked(addedAPIResources) } if len(addedInfos) != 0 { - m.addRESTResourceInfosLocked(addedInfos, cluster) + m.addRESTResourceInfosLocked(addedInfos) } return apis } @@ -208,7 +208,7 @@ func (m *RESTManager) addAPIResourcesLocked(addedResources map[schema.GroupResou m.resources.Store(resources) } -func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo, cluster string) { +func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo) { restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo) infos := make(map[schema.GroupVersionResource]RESTResourceInfo, len(restinfos)+len(addedInfos)) @@ -225,9 +225,9 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers var err error var storage *resourcerest.RESTStorage if resourcescheme.LegacyResourceScheme.IsGroupRegistered(gvr.Group) { - storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind, cluster) + storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind, info.APIResource.Namespaced) } else { - storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind, cluster) + storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind, info.APIResource.Namespaced) } if err != nil { klog.ErrorS(err, "Failed to gen resource rest storage", "gvr", gvr, "kind", info.APIResource.Kind) @@ -263,12 +263,11 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers m.restResourceInfos.Store(infos) } -func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) { - storageConfig, err := m.resourcetSorageConfig.NewLegacyResourceConfig(gvr.GroupResource()) +func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind string, namespaced bool) (*resourcerest.RESTStorage, error) { + storageConfig, err := m.resourcetSorageConfig.NewLegacyResourceConfig(gvr.GroupResource(), namespaced) if err != nil { return nil, err } - storageConfig.Cluster = cluster resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) if err != nil { @@ -291,12 +290,11 @@ func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResour }, nil } -func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) { - storageConfig, err := m.resourcetSorageConfig.NewCustomResourceConfig(gvr) +func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind string, namespaced bool) (*resourcerest.RESTStorage, error) { + storageConfig, err := m.resourcetSorageConfig.NewCustomResourceConfig(gvr, namespaced) if err != nil { return nil, err } - storageConfig.Cluster = cluster resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) if err != nil { diff --git a/pkg/kubeapiserver/storageconfig/storageconfig_factory.go b/pkg/kubeapiserver/storageconfig/storageconfig_factory.go index a14ebb583..40b0d769d 100644 --- a/pkg/kubeapiserver/storageconfig/storageconfig_factory.go +++ b/pkg/kubeapiserver/storageconfig/storageconfig_factory.go @@ -59,14 +59,14 @@ func (g *StorageConfigFactory) GetStorageGroupResource(groupResource schema.Grou return groupResource } -func (g *StorageConfigFactory) NewConfig(gvr schema.GroupVersionResource) (*storage.ResourceStorageConfig, error) { +func (g *StorageConfigFactory) NewConfig(gvr schema.GroupVersionResource, namespaced bool) (*storage.ResourceStorageConfig, error) { if resourcescheme.LegacyResourceScheme.IsGroupRegistered(gvr.Group) { - return g.NewLegacyResourceConfig(gvr.GroupResource()) + return g.NewLegacyResourceConfig(gvr.GroupResource(), namespaced) } - return g.NewCustomResourceConfig(gvr) + return g.NewCustomResourceConfig(gvr, namespaced) } -func (g *StorageConfigFactory) NewCustomResourceConfig(gvr schema.GroupVersionResource) (*storage.ResourceStorageConfig, error) { +func (g *StorageConfigFactory) NewCustomResourceConfig(gvr schema.GroupVersionResource, namespaced bool) (*storage.ResourceStorageConfig, error) { version := gvr.GroupVersion() codec := versioning.NewCodec( resourcescheme.UnstructuredCodecs, @@ -85,10 +85,11 @@ func (g *StorageConfigFactory) NewCustomResourceConfig(gvr schema.GroupVersionRe Codec: codec, StorageVersion: version, MemoryVersion: version, + Namespaced: namespaced, }, nil } -func (g *StorageConfigFactory) NewLegacyResourceConfig(gr schema.GroupResource) (*storage.ResourceStorageConfig, error) { +func (g *StorageConfigFactory) NewLegacyResourceConfig(gr schema.GroupResource, namespaced bool) (*storage.ResourceStorageConfig, error) { chosenStorageResource := g.GetStorageGroupResource(gr) storageVersion, err := g.legacyResourceEncodingConfig.StorageEncodingFor(chosenStorageResource) @@ -117,5 +118,6 @@ func (g *StorageConfigFactory) NewLegacyResourceConfig(gr schema.GroupResource) Codec: codec, StorageVersion: codecConfig.StorageVersion, MemoryVersion: memoryVersion, + Namespaced: namespaced, }, nil } diff --git a/pkg/storage/internalstorage/register.go b/pkg/storage/internalstorage/register.go index db68e1339..d577750bf 100644 --- a/pkg/storage/internalstorage/register.go +++ b/pkg/storage/internalstorage/register.go @@ -29,6 +29,10 @@ func init() { } func NewStorageFactory(configPath string) (storage.StorageFactory, error) { + if configPath == "" { + return nil, fmt.Errorf("configPath should not be empty") + } + cfg := &Config{} if err := configor.Load(cfg, configPath); err != nil { return nil, err diff --git a/pkg/storage/internalstorage/resource_storage.go b/pkg/storage/internalstorage/resource_storage.go index ce29da11b..663a6e784 100644 --- a/pkg/storage/internalstorage/resource_storage.go +++ b/pkg/storage/internalstorage/resource_storage.go @@ -10,12 +10,14 @@ import ( "gorm.io/gorm" "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" genericstorage "k8s.io/apiserver/pkg/storage" internal "github.com/clusterpedia-io/api/clusterpedia" @@ -272,6 +274,10 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o return nil } +func (s *ResourceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + return nil, nil +} + func applyListOptionsToResourceQuery(db *gorm.DB, query *gorm.DB, opts *internal.ListOptions) (int64, *int64, *gorm.DB, error) { applyFn := func(query *gorm.DB, opts *internal.ListOptions) (*gorm.DB, error) { query, err := applyOwnerToResourceQuery(db, query, opts) diff --git a/pkg/storage/internalstorage/storage.go b/pkg/storage/internalstorage/storage.go index dbf7bc5ca..5ac987f66 100644 --- a/pkg/storage/internalstorage/storage.go +++ b/pkg/storage/internalstorage/storage.go @@ -84,3 +84,7 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna } return crs, nil } + +func (s *StorageFactory) PrepareCluster(cluster string) error { + return nil +} diff --git a/pkg/storage/memorystorage/memory_resource_storage.go b/pkg/storage/memorystorage/memory_resource_storage.go index 2a65ca69e..0680db34f 100644 --- a/pkg/storage/memorystorage/memory_resource_storage.go +++ b/pkg/storage/memorystorage/memory_resource_storage.go @@ -8,17 +8,17 @@ import ( "sync" "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" - "k8s.io/klog/v2" internal "github.com/clusterpedia-io/api/clusterpedia" - "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcescheme" "github.com/clusterpedia-io/clusterpedia/pkg/storage" cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" + utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) var ( @@ -46,100 +46,54 @@ type ResourceStorage struct { Codec runtime.Codec watchCache *cache.WatchCache - stopCh chan struct{} - crvSynchro *cache.ClusterResourceVersionSynchro + CrvSynchro *cache.ClusterResourceVersionSynchro incoming chan ClusterWatchEvent storageConfig *storage.ResourceStorageConfig } -func (s *ResourceStorage) convertEvent(event *ClusterWatchEvent) error { - klog.V(10).Infof("event: %s", event) - s.crvSynchro.UpdateClusterResourceVersion(&event.Event, event.ClusterName) - - err := s.encodeEvent(&event.Event) - if err != nil { - return fmt.Errorf("encode event failed: %v", err) - } - s.incoming <- *event - return nil -} - -func (s *ResourceStorage) dispatchEvents() { - for { - select { - case cwe, ok := <-s.incoming: - if !ok { - continue - } - - switch cwe.Event.Type { - case watch.Added: - err := s.watchCache.Add(cwe.Event.Object, cwe.ClusterName) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", cwe.Event.Object, err)) - } - case watch.Modified: - err := s.watchCache.Update(cwe.Event.Object, cwe.ClusterName) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to update watch event object (%#v) to store: %v", cwe.Event.Object, err)) - } - case watch.Deleted: - // TODO: Will any consumers need access to the "last known - // state", which is passed in event.Object? If so, may need - // to change this. - err := s.watchCache.Delete(cwe.Event.Object, cwe.ClusterName) - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to delete watch event object (%#v) from store: %v", cwe.Event.Object, err)) - } - case watch.Error: - s.watchCache.ClearWatchCache(cwe.ClusterName) - default: - utilruntime.HandleError(fmt.Errorf("unable to understand watch event %#v", cwe.Event)) - continue - } - s.dispatchEvent(&cwe.Event) - case <-s.stopCh: - return - } - } -} - -func (s *ResourceStorage) dispatchEvent(event *watch.Event) { - s.watchCache.WatchersLock.RLock() - defer s.watchCache.WatchersLock.RUnlock() - for _, watcher := range s.watchCache.WatchersBuffer { - watcher.NonblockingAdd(event) - } -} - func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig { return s.storageConfig } func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtime.Object) error { - event := s.newClusterWatchEvent(watch.Added, obj, cluster) - err := s.convertEvent(event) + resourceVersion, err := s.CrvSynchro.UpdateClusterResourceVersion(obj, cluster) if err != nil { return err } + + err = s.watchCache.Add(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err)) + } + return nil } func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtime.Object) error { - event := s.newClusterWatchEvent(watch.Modified, obj, cluster) - err := s.convertEvent(event) + resourceVersion, err := s.CrvSynchro.UpdateClusterResourceVersion(obj, cluster) if err != nil { return err } + + err = s.watchCache.Update(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err)) + } + return nil } func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error { - event := s.newClusterWatchEvent(watch.Deleted, obj, cluster) - err := s.convertEvent(event) + resourceVersion, err := s.CrvSynchro.UpdateClusterResourceVersion(obj, cluster) if err != nil { return err } + + err = s.watchCache.Delete(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err)) + } + return nil } @@ -167,6 +121,7 @@ func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name stri return nil } +//nolint func (s *ResourceStorage) newClusterWatchEvent(eventType watch.EventType, obj runtime.Object, cluster string) *ClusterWatchEvent { return &ClusterWatchEvent{ ClusterName: cluster, @@ -240,33 +195,59 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o return nil } -func (s *ResourceStorage) encodeEvent(event *watch.Event) error { - var buffer bytes.Buffer +func (s *ResourceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + resourceversion := options.ResourceVersion + watchRV, err := cache.NewClusterResourceVersionFromString(resourceversion) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } - gk := event.Object.GetObjectKind().GroupVersionKind().GroupKind() - config := s.storageConfig - if ok := resourcescheme.LegacyResourceScheme.IsGroupRegistered(gk.Group); ok { - dest, err := resourcescheme.LegacyResourceScheme.New(config.MemoryVersion.WithKind(gk.Kind)) - if err != nil { - return err - } + watcher := cache.NewCacheWatcher(100) + watchCache := s.watchCache + watchCache.Lock() + defer watchCache.Unlock() - object := event.Object - err = config.Codec.Encode(object, &buffer) - if err != nil { - return err - } + initEvents, err := watchCache.GetAllEventsSinceThreadUnsafe(watchRV) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } - obj, _, err := config.Codec.Decode(buffer.Bytes(), nil, dest) - if err != nil { - return err - } + func() { + watchCache.WatchersLock.Lock() + defer watchCache.WatchersLock.Unlock() - if obj != dest { - return err - } - event.Object = dest - } + watchCache.WatchersBuffer = append(watchCache.WatchersBuffer, watcher) + }() - return nil + go watcher.Process(ctx, initEvents) + return watcher, nil +} + +type errWatcher struct { + result chan watch.Event +} + +func newErrWatcher(err error) *errWatcher { + errEvent := utilwatch.NewErrorEvent(err) + + // Create a watcher with room for a single event, populate it, and close the channel + watcher := &errWatcher{result: make(chan watch.Event, 1)} + watcher.result <- errEvent + close(watcher.result) + + return watcher +} + +func (c *errWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +func (c *errWatcher) Stop() { + // no-op } diff --git a/pkg/storage/memorystorage/memory_storage.go b/pkg/storage/memorystorage/memory_storage.go index f7853c731..8ac3b5384 100644 --- a/pkg/storage/memorystorage/memory_storage.go +++ b/pkg/storage/memorystorage/memory_storage.go @@ -2,17 +2,16 @@ package memorystorage import ( "context" - "fmt" "k8s.io/apimachinery/pkg/runtime/schema" internal "github.com/clusterpedia-io/api/clusterpedia" "github.com/clusterpedia-io/clusterpedia/pkg/storage" cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" - utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) type StorageFactory struct { + clusters map[string]bool } func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) { @@ -25,36 +24,48 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi Resource: config.GroupResource.Resource, } - if resourceStorage, ok := storages.resourceStorages[gvr]; ok { + resourceStorage, ok := storages.resourceStorages[gvr] + if ok { watchCache := resourceStorage.watchCache - if config.Namespaced { + if config.Namespaced && !watchCache.IsNamespaced { watchCache.KeyFunc = cache.GetKeyFunc(gvr, config.Namespaced) watchCache.IsNamespaced = true } - if _, ok := watchCache.GetStores()[config.Cluster]; !ok { - resourceStorage.watchCache.AddIndexer(config.Cluster, nil) + } else { + watchCache := cache.NewWatchCache(100, gvr, config.Namespaced) + resourceStorage = &ResourceStorage{ + incoming: make(chan ClusterWatchEvent, 100), + Codec: config.Codec, + watchCache: watchCache, + storageConfig: config, } - resourceStorage.crvSynchro.SetClusterResourceVersion(config.Cluster, "0") - - return resourceStorage, nil + storages.resourceStorages[gvr] = resourceStorage } - watchCache := cache.NewWatchCache(100, gvr, config.Namespaced, config.Cluster) - config.WatchCache = watchCache - resourceStorage := &ResourceStorage{ - incoming: make(chan ClusterWatchEvent, 100), - Codec: config.Codec, - crvSynchro: cache.NewClusterResourceVersionSynchro(config.Cluster), - watchCache: watchCache, - storageConfig: config, + for cluster := range s.clusters { + resourceStorage.watchCache.AddIndexer(cluster, nil) + + if resourceStorage.CrvSynchro == nil { + resourceStorage.CrvSynchro = cache.NewClusterResourceVersionSynchro(cluster) + } else { + resourceStorage.CrvSynchro.SetClusterResourceVersion(cluster, "0") + } } - go resourceStorage.dispatchEvents() + return resourceStorage, nil +} - storages.resourceStorages[gvr] = resourceStorage +func (s *StorageFactory) PrepareCluster(cluster string) error { + storages.Lock() + defer storages.Unlock() - return resourceStorage, nil + if _, ok := s.clusters[cluster]; ok { + return nil + } + + s.clusters[cluster] = true + return nil } func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionResource) (storage.CollectionResourceStorage, error) { @@ -69,11 +80,11 @@ func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error storages.Lock() defer storages.Unlock() for _, rs := range storages.resourceStorages { - rs.watchCache.DeleteIndexer(cluster) - // If a pediacluster is deleted from clusterpedia,then the informer of client-go should be list and watch again - errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("PediaCluster %s is deleted", cluster)) - rs.dispatchEvent(&errorEvent) + rs.CrvSynchro.RemoveCluster(cluster) + rs.watchCache.CleanCluster(cluster) + delete(s.clusters, cluster) } + return nil } @@ -81,11 +92,11 @@ func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster strin storages.Lock() defer storages.Unlock() if rs, ok := storages.resourceStorages[gvr]; ok { - rs.watchCache.DeleteIndexer(cluster) - // If a gvr is deleted from clusterpedia,then the informer of client-go should be list and watch again - errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("GVR %v is deleted", gvr)) - rs.dispatchEvent(&errorEvent) + rs.CrvSynchro.RemoveCluster(cluster) + rs.watchCache.CleanCluster(cluster) + delete(s.clusters, cluster) } + return nil } diff --git a/pkg/storage/memorystorage/register.go b/pkg/storage/memorystorage/register.go index e3cc3e206..6c3e20088 100644 --- a/pkg/storage/memorystorage/register.go +++ b/pkg/storage/memorystorage/register.go @@ -13,5 +13,8 @@ func init() { } func NewStorageFactory(_ string) (storage.StorageFactory, error) { - return &StorageFactory{}, nil + storageFactory := &StorageFactory{ + clusters: make(map[string]bool), + } + return storageFactory, nil } diff --git a/pkg/storage/memorystorage/watchcache/cluster_resource_version.go b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go index abc7ec0be..d62b032c5 100644 --- a/pkg/storage/memorystorage/watchcache/cluster_resource_version.go +++ b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go @@ -6,6 +6,7 @@ import ( "sync" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" @@ -91,31 +92,41 @@ func NewClusterResourceVersionSynchro(cluster string) *ClusterResourceVersionSyn } // UpdateClusterResourceVersion update the resourceVersion in ClusterResourceVersionSynchro to the latest -func (crvs *ClusterResourceVersionSynchro) UpdateClusterResourceVersion(event *watch.Event, cluster string) { +func (crvs *ClusterResourceVersionSynchro) UpdateClusterResourceVersion(obj runtime.Object, cluster string) (*ClusterResourceVersion, error) { crvs.Lock() defer crvs.Unlock() crv := crvs.crv accessor := meta.NewAccessor() - rv, _ := accessor.ResourceVersion(event.Object) + rv, _ := accessor.ResourceVersion(obj) crv.rvmap[cluster] = rv bytes, err := json.Marshal(crv.rvmap) if err != nil { - klog.Errorf("base64 encode failed: %v", err) - return + return nil, fmt.Errorf("base64 encode failed: %v", err) } - err = accessor.SetResourceVersion(event.Object, base64.RawURLEncoding.EncodeToString(bytes)) + version := base64.RawURLEncoding.EncodeToString(bytes) + err = accessor.SetResourceVersion(obj, version) if err != nil { - klog.Warningf("set resourceVersion failed: %v, may be it's a clear watch cache order event", err) - return + return nil, fmt.Errorf("set resourceVersion failed: %v, may be it's a clear watch cache order event", err) } + + return NewClusterResourceVersionFromString(version) } func (crvs *ClusterResourceVersionSynchro) SetClusterResourceVersion(clusterName string, resourceVersion string) { crvs.Lock() defer crvs.Unlock() - crvs.crv.rvmap[clusterName] = resourceVersion + if _, ok := crvs.crv.rvmap[clusterName]; !ok { + crvs.crv.rvmap[clusterName] = resourceVersion + } +} + +func (crvs *ClusterResourceVersionSynchro) RemoveCluster(clusterName string) { + crvs.Lock() + defer crvs.Unlock() + + delete(crvs.crv.rvmap, clusterName) } diff --git a/pkg/storage/memorystorage/watchcache/watch_cache.go b/pkg/storage/memorystorage/watchcache/watch_cache.go index 8b569e677..348491662 100644 --- a/pkg/storage/memorystorage/watchcache/watch_cache.go +++ b/pkg/storage/memorystorage/watchcache/watch_cache.go @@ -1,6 +1,7 @@ package watchcache import ( + "bytes" "context" "fmt" "sync" @@ -18,6 +19,8 @@ import ( "k8s.io/utils/strings/slices" internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcescheme" + utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) // StoreElement keeping the structs of resource in k8s(key, object, labels, fields). @@ -146,7 +149,7 @@ func storeElementObject(obj interface{}) (runtime.Object, error) { return elem.Object, nil } -func NewWatchCache(capacity int, gvr schema.GroupVersionResource, isNamespaced bool, newCluster string) *WatchCache { +func NewWatchCache(capacity int, gvr schema.GroupVersionResource, isNamespaced bool) *WatchCache { wc := &WatchCache{ capacity: capacity, KeyFunc: GetKeyFunc(gvr, isNamespaced), @@ -158,7 +161,6 @@ func NewWatchCache(capacity int, gvr schema.GroupVersionResource, isNamespaced b IsNamespaced: isNamespaced, } - wc.AddIndexer(newCluster, nil) return wc } @@ -200,61 +202,66 @@ func (w *WatchCache) processEvent(event watch.Event, resourceVersion *ClusterRes } // Add takes runtime.Object as an argument. -func (w *WatchCache) Add(obj interface{}, clusterName string) error { - object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) - if err != nil { - return err - } - event := watch.Event{Type: watch.Added, Object: object} - +func (w *WatchCache) Add(obj runtime.Object, clusterName string, resourceVersion *ClusterResourceVersion, + codec runtime.Codec, memoryVersion schema.GroupVersion) error { f := func(elem *StoreElement) error { return w.stores[clusterName].Add(elem) } - return w.processEvent(event, resourceVersion, f) -} + object, err := encodeEvent(obj, codec, memoryVersion) + if err != nil { + return err + } -// Update takes runtime.Object as an argument. -func (w *WatchCache) Update(obj interface{}, clusterName string) error { - object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + event := watch.Event{Type: watch.Added, Object: object} + err = w.processEvent(event, resourceVersion, f) if err != nil { return err } - event := watch.Event{Type: watch.Modified, Object: object} + w.dispatchEvent(&event) + return nil +} + +// Update takes runtime.Object as an argument. +func (w *WatchCache) Update(obj runtime.Object, clusterName string, resourceVersion *ClusterResourceVersion, + codec runtime.Codec, memoryVersion schema.GroupVersion) error { f := func(elem *StoreElement) error { return w.stores[clusterName].Update(elem) } - return w.processEvent(event, resourceVersion, f) -} + object, err := encodeEvent(obj, codec, memoryVersion) + if err != nil { + return err + } -// Delete takes runtime.Object as an argument. -func (w *WatchCache) Delete(obj interface{}, clusterName string) error { - object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + event := watch.Event{Type: watch.Modified, Object: object} + err = w.processEvent(event, resourceVersion, f) if err != nil { return err } - event := watch.Event{Type: watch.Deleted, Object: object} - f := func(elem *StoreElement) error { return w.stores[clusterName].Delete(elem) } - return w.processEvent(event, resourceVersion, f) + w.dispatchEvent(&event) + return nil } -func (w *WatchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, *ClusterResourceVersion, error) { - object, ok := obj.(runtime.Object) - if !ok { - return nil, &ClusterResourceVersion{}, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) +// Delete takes runtime.Object as an argument. +func (w *WatchCache) Delete(obj runtime.Object, clusterName string, resourceVersion *ClusterResourceVersion, + codec runtime.Codec, memoryVersion schema.GroupVersion) error { + f := func(elem *StoreElement) error { + return w.stores[clusterName].Delete(elem) } - - resourceVersion, err := meta.NewAccessor().ResourceVersion(object) + object, err := encodeEvent(obj, codec, memoryVersion) if err != nil { - return nil, &ClusterResourceVersion{}, err + return err } - clusterRvStore, err := NewClusterResourceVersionFromString(resourceVersion) + event := watch.Event{Type: watch.Deleted, Object: object} + err = w.processEvent(event, resourceVersion, f) if err != nil { - return nil, &ClusterResourceVersion{}, err + return err } - return object, clusterRvStore, nil + + w.dispatchEvent(&event) + return nil } // Assumes that lock is already held for write. @@ -343,7 +350,6 @@ func (w *WatchCache) WaitUntilFreshAndGet(cluster, namespace, name string) (*Sto // GetAllEventsSinceThreadUnsafe returns watch event from slice window in watchCache by the resourceVersion func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion *ClusterResourceVersion) ([]*watch.Event, error) { size := w.endIndex - w.startIndex - oldestObj := w.cache[w.startIndex%w.capacity] if resourceVersion.IsEmpty() { // resourceVersion = 0 means that we don't require any specific starting point // and we would like to start watching from ~now. @@ -385,11 +391,7 @@ func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion *ClusterResou var result []*watch.Event if !founded { - oldest, err := GetClusterResourceVersionFromEvent(oldestObj) - if err != nil { - return nil, err - } - return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %#v (%#v)", resourceVersion, oldest)) + return nil, apierrors.NewResourceExpired("resource version not found") } else { i := 0 result = make([]*watch.Event, size-index) @@ -405,15 +407,18 @@ func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion *ClusterResou func (w *WatchCache) AddIndexer(clusterName string, indexers *cache.Indexers) { w.Lock() defer w.Unlock() - w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)) + + if _, ok := w.stores[clusterName]; !ok { + w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)) + } } -func (w *WatchCache) DeleteIndexer(clusterName string) { +func (w *WatchCache) DeleteIndexer(clusterName string) bool { w.Lock() defer w.Unlock() if _, ok := w.stores[clusterName]; !ok { - return + return false } delete(w.stores, clusterName) @@ -421,20 +426,54 @@ func (w *WatchCache) DeleteIndexer(clusterName string) { //clear cache w.startIndex = 0 w.endIndex = 0 + + return true } -func (w *WatchCache) ClearWatchCache(clusterName string) { - w.Lock() - defer w.Unlock() +func (w *WatchCache) dispatchEvent(event *watch.Event) { + w.WatchersLock.RLock() + defer w.WatchersLock.RUnlock() + for _, watcher := range w.WatchersBuffer { + watcher.NonblockingAdd(event) + } +} - if _, ok := w.stores[clusterName]; !ok { +func (w *WatchCache) CleanCluster(cluster string) { + if !w.DeleteIndexer(cluster) { return } - delete(w.stores, clusterName) - w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(nil)) + errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("cluster %s has been clean", cluster)) + w.dispatchEvent(&errorEvent) +} - //clear cache - w.startIndex = 0 - w.endIndex = 0 +func encodeEvent(obj runtime.Object, codec runtime.Codec, memoryVersion schema.GroupVersion) (runtime.Object, error) { + var buffer bytes.Buffer + + //gvk := obj.GetObjectKind().GroupVersionKind() + gk := obj.GetObjectKind().GroupVersionKind().GroupKind() + if ok := resourcescheme.LegacyResourceScheme.IsGroupRegistered(gk.Group); !ok { + return obj, nil + } + + dest, err := resourcescheme.LegacyResourceScheme.New(memoryVersion.WithKind(gk.Kind)) + if err != nil { + return nil, err + } + + err = codec.Encode(obj, &buffer) + if err != nil { + return nil, err + } + + object, _, err := codec.Decode(buffer.Bytes(), nil, dest) + if err != nil { + return nil, err + } + + if object != dest { + return nil, err + } + + return dest, nil } diff --git a/pkg/storage/options/options.go b/pkg/storage/options/options.go index 388d3e33f..61c77682a 100644 --- a/pkg/storage/options/options.go +++ b/pkg/storage/options/options.go @@ -7,7 +7,7 @@ import ( "github.com/spf13/pflag" _ "github.com/clusterpedia-io/clusterpedia/pkg/storage/internalstorage" - "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage" + _ "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage" ) type StorageOptions struct { @@ -24,7 +24,7 @@ func (o *StorageOptions) Validate() []error { return nil } - if o.Name == memorystorage.StorageName { + if o.ConfigPath == "" { return nil } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index b968fcc70..0c7fa1d3d 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -3,15 +3,17 @@ package storage import ( "context" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" internal "github.com/clusterpedia-io/api/clusterpedia" - "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" ) type StorageFactory interface { GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) + PrepareCluster(cluster string) error CleanCluster(ctx context.Context, cluster string) error CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error @@ -26,6 +28,7 @@ type ResourceStorage interface { Get(ctx context.Context, cluster, namespace, name string, obj runtime.Object) error List(ctx context.Context, listObj runtime.Object, opts *internal.ListOptions) error + Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) Create(ctx context.Context, cluster string, obj runtime.Object) error Update(ctx context.Context, cluster string, obj runtime.Object) error @@ -43,8 +46,6 @@ type ResourceStorageConfig struct { Codec runtime.Codec StorageVersion schema.GroupVersion MemoryVersion schema.GroupVersion - WatchCache *watchcache.WatchCache - Cluster string Namespaced bool } diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go index 30b09aced..d159777e4 100644 --- a/pkg/synchromanager/clustersynchro/cluster_synchro.go +++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go @@ -275,7 +275,6 @@ func (s *ClusterSynchro) setSyncResources() { if syncResources == nil { return } - groupResourceStatus, storageResourceSyncConfigs := s.resourceNegotiator.NegotiateSyncResources(syncResources) lastGroupResourceStatus := s.groupResourceStatus.Load().(*GroupResourceStatus) @@ -302,10 +301,8 @@ func (s *ClusterSynchro) setSyncResources() { if _, ok := s.storageResourceSynchros.Load(storageGVR); ok { continue } - resourceConfig := config.storageConfig - resourceConfig.Cluster = s.name - resourceStorage, err := s.storage.NewResourceStorage(resourceConfig) + resourceStorage, err := s.storage.NewResourceStorage(config.storageConfig) if err != nil { klog.ErrorS(err, "Failed to create resource storage", "cluster", s.name, "storage resource", storageGVR) updateSyncConditions(storageGVR, clusterv1alpha2.ResourceSyncStatusPending, "SynchroCreateFailed", fmt.Sprintf("new resource storage failed: %s", err)) diff --git a/pkg/synchromanager/clustersynchro/resource_negotiator.go b/pkg/synchromanager/clustersynchro/resource_negotiator.go index 6158788f0..c89794c26 100644 --- a/pkg/synchromanager/clustersynchro/resource_negotiator.go +++ b/pkg/synchromanager/clustersynchro/resource_negotiator.go @@ -116,8 +116,7 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu Reason: "SynchroCreating", } - storageConfig, err := negotiator.resourceStorageConfig.NewConfig(syncGVR) - + storageConfig, err := negotiator.resourceStorageConfig.NewConfig(syncGVR, apiResource.Namespaced) if err != nil { syncCondition.Reason = "SynchroCreateFailed" syncCondition.Message = fmt.Sprintf("new resource storage config failed: %s", err) @@ -125,7 +124,6 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu continue } - storageConfig.Namespaced = apiResource.Namespaced storageGVR := storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version) syncCondition.StorageVersion = storageGVR.Version if syncGR != storageConfig.StorageGroupResource { diff --git a/pkg/synchromanager/clustersynchro_manager.go b/pkg/synchromanager/clustersynchro_manager.go index 21231fdf5..b17cc4920 100644 --- a/pkg/synchromanager/clustersynchro_manager.go +++ b/pkg/synchromanager/clustersynchro_manager.go @@ -356,6 +356,13 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster) } return controller.NoRequeueResult } + + err = manager.storage.PrepareCluster(cluster.Name) + if err != nil { + klog.ErrorS(err, "Failed to prepare cluster", "cluster", cluster.Name) + return controller.NoRequeueResult + } + manager.synchroWaitGroup.StartWithChannel(manager.stopCh, synchro.Run) manager.synchrolock.Lock()