From 5b11676b4de3425d178cb4d30d1c102f617a8c40 Mon Sep 17 00:00:00 2001 From: zhangyongxi Date: Thu, 11 Aug 2022 10:36:07 +0800 Subject: [PATCH] Add memory storage for supporting ListAndWatch Co-authored-by: duanmeng Co-authored-by: wuyingjun Co-authored-by: hanweisen Signed-off-by: zhangyongxi --- Makefile | 48 +- cmd/apiserver/app/apiserver.go | 2 +- cmd/apiserver/app/options/options.go | 17 +- .../app/binding_apiserver.go | 80 ++++ cmd/binding-apiserver/main.go | 18 + ...terpedia_binding_apiserver_apiservice.yaml | 13 + ...terpedia_binding_apiserver_deployment.yaml | 45 ++ deploy/clusterpedia_apiserver_rbac.yaml | 3 + pkg/apiserver/apiserver.go | 12 + pkg/kubeapiserver/apiserver.go | 1 + .../clusterresource_controller.go | 2 +- pkg/kubeapiserver/resource_handler.go | 2 + pkg/kubeapiserver/resourcerest/storage.go | 63 +++ pkg/kubeapiserver/restmanager.go | 16 +- .../memorystorage/memory_resource_storage.go | 274 +++++++++++ pkg/storage/memorystorage/memory_storage.go | 92 ++++ pkg/storage/memorystorage/register.go | 17 + .../memorystorage/watchcache/cache_watcher.go | 140 ++++++ .../watchcache/cluster_resource_version.go | 121 +++++ .../memorystorage/watchcache/watch_cache.go | 440 ++++++++++++++++++ pkg/storage/options/options.go | 5 + pkg/storage/storage.go | 4 + .../clustersynchro/cluster_synchro.go | 4 +- .../clustersynchro/resource_negotiator.go | 2 + pkg/utils/watch/watchevent.go | 29 ++ 25 files changed, 1434 insertions(+), 16 deletions(-) create mode 100644 cmd/binding-apiserver/app/binding_apiserver.go create mode 100644 cmd/binding-apiserver/main.go create mode 100644 deploy/binding-apiserver/clusterpedia_binding_apiserver_apiservice.yaml create mode 100644 deploy/binding-apiserver/clusterpedia_binding_apiserver_deployment.yaml create mode 100644 pkg/storage/memorystorage/memory_resource_storage.go create mode 100644 pkg/storage/memorystorage/memory_storage.go create mode 100644 pkg/storage/memorystorage/register.go create mode 100644 pkg/storage/memorystorage/watchcache/cache_watcher.go create mode 100644 pkg/storage/memorystorage/watchcache/cluster_resource_version.go create mode 100644 pkg/storage/memorystorage/watchcache/watch_cache.go create mode 100644 pkg/utils/watch/watchevent.go diff --git a/Makefile b/Makefile index ea355230d..41254a5ed 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ ifeq ($(LATEST_TAG),$(shell git describe --abbrev=0 --tags)) VERSION=$(LATEST_TAG) endif -all: apiserver clustersynchro-manager controller-manager +all: apiserver binding-apiserver clustersynchro-manager controller-manager gen-clusterconfigs: ./hack/gen-clusterconfigs.sh @@ -69,6 +69,13 @@ apiserver: -o bin/apiserver \ cmd/apiserver/main.go +.PHONY: binding-apiserver +binding-apiserver: + CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \ + -ldflags $(LDFLAGS) \ + -o bin/binding-apiserver \ + cmd/binding-apiserver/main.go + .PHONY: clustersynchro-manager clustersynchro-manager: CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \ @@ -84,7 +91,7 @@ controller-manager: cmd/controller-manager/main.go .PHONY: images -images: image-apiserver image-clustersynchro-manager image-controller-manager +images: image-apiserver image-binding-apiserver image-clustersynchro-manager image-controller-manager image-apiserver: GOOS="linux" $(MAKE) apiserver @@ -94,6 +101,15 @@ image-apiserver: --load \ --build-arg BASEIMAGE=$(BASEIMAGE) \ --build-arg BINNAME=apiserver . + +image-binding-apiserver: + GOOS="linux" $(MAKE) binding-apiserver + docker buildx build \ + -t ${REGISTRY}/binding-apiserver-$(GOARCH):$(VERSION) \ + --platform=linux/$(GOARCH) \ + --load \ + --build-arg BASEIMAGE=$(BASEIMAGE) \ + --build-arg BINNAME=binding-apiserver . image-clustersynchro-manager: GOOS="linux" $(MAKE) clustersynchro-manager @@ -114,7 +130,7 @@ image-controller-manager: --build-arg BINNAME=controller-manager . .PHONY: push-images -push-images: push-apiserver-image push-clustersynchro-manager-image push-controller-manager-image +push-images: push-apiserver-image push-binding-apiserver-image push-clustersynchro-manager-image push-controller-manager-image # clean manifest https://github.com/docker/cli/issues/954#issuecomment-586722447 push-apiserver-image: clean-apiserver-manifest @@ -138,6 +154,28 @@ push-apiserver-image: clean-apiserver-manifest docker manifest push $(REGISTRY)/apiserver:latest; \ fi; +# clean manifest https://github.com/docker/cli/issues/954#issuecomment-586722447 +push-binding-apiserver-image: clean-binding-apiserver-manifest + set -e; \ + images=""; \ + for arch in $(RELEASE_ARCHS); do \ + GOARCH=$$arch $(MAKE) image-binding-apiserver; \ + image=$(REGISTRY)/binding-apiserver-$$arch:$(VERSION); \ + docker push $$image; \ + images="$$images $$image"; \ + if [ $(VERSION) != latest ]; then \ + latest_image=$(REGISTRY)/binding-apiserver-$$arch:latest; \ + docker tag $$image $$latest_image; \ + docker push $$latest_image; \ + fi; \ + done; \ + docker manifest create $(REGISTRY)/binding-apiserver:$(VERSION) $$images; \ + docker manifest push $(REGISTRY)/binding-apiserver:$(VERSION); \ + if [ $(VERSION) != latest ]; then \ + docker manifest create $(REGISTRY)/binding-apiserver:latest $$images; \ + docker manifest push $(REGISTRY)/binding-apiserver:latest; \ + fi; + # clean manifest https://github.com/docker/cli/issues/954#issuecomment-586722447 push-clustersynchro-manager-image: clean-clustersynchro-manager-manifest set -e; \ @@ -190,6 +228,10 @@ clean-apiserver-manifest: docker manifest rm $(REGISTRY)/apiserver:$(VERSION) 2>/dev/null;\ docker manifest rm $(REGISTRY)/apiserver:latest 2>/dev/null; exit 0 +clean-binding-apiserver-manifest: + docker manifest rm $(REGISTRY)/binding-apiserver:$(VERSION) 2>/dev/null;\ + docker manifest rm $(REGISTRY)/binding-apiserver:latest 2>/dev/null; exit 0 + clean-clustersynchro-manager-manifest: docker manifest rm $(REGISTRY)/clustersynchro-manager:$(VERSION) 2>/dev/null;\ docker manifest rm $(REGISTRY)/clustersynchro-manager:latest 2>/dev/null; exit 0 diff --git a/cmd/apiserver/app/apiserver.go b/cmd/apiserver/app/apiserver.go index 9dd16a869..541b2a363 100644 --- a/cmd/apiserver/app/apiserver.go +++ b/cmd/apiserver/app/apiserver.go @@ -32,7 +32,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command { } cliflag.PrintFlags(cmd.Flags()) - config, err := opts.Config() + config, err := opts.Config(false) if err != nil { return err } diff --git a/cmd/apiserver/app/options/options.go b/cmd/apiserver/app/options/options.go index 94eaa5035..9c5182e20 100644 --- a/cmd/apiserver/app/options/options.go +++ b/cmd/apiserver/app/options/options.go @@ -3,9 +3,12 @@ package options import ( "fmt" "net" + "net/http" + "strings" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" genericapiserver "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/util/feature" @@ -72,7 +75,7 @@ func (o *ClusterPediaServerOptions) Validate() error { return utilerrors.NewAggregate(errors) } -func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) { +func (o *ClusterPediaServerOptions) Config(bindingSyncController bool) (*apiserver.Config, error) { if err := o.Validate(); err != nil { return nil, err } @@ -91,6 +94,13 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) { o.Admission.DisablePlugins = append(o.Admission.DisablePlugins, lifecycle.PluginName) genericConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs) + + // todo + // support watch to LongRunningFunc + genericConfig.LongRunningFunc = func(r *http.Request, requestInfo *genericrequest.RequestInfo) bool { + return strings.Contains(r.RequestURI, "watch") + } + // genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme)) // genericConfig.OpenAPIConfig.Info.Title = openAPITitle // genericConfig.OpenAPIConfig.Info.Version= openAPIVersion @@ -100,8 +110,9 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) { } return &apiserver.Config{ - GenericConfig: genericConfig, - StorageFactory: storage, + GenericConfig: genericConfig, + StorageFactory: storage, + BindingSyncController: bindingSyncController, }, nil } diff --git a/cmd/binding-apiserver/app/binding_apiserver.go b/cmd/binding-apiserver/app/binding_apiserver.go new file mode 100644 index 000000000..1b8ed1e55 --- /dev/null +++ b/cmd/binding-apiserver/app/binding_apiserver.go @@ -0,0 +1,80 @@ +package app + +import ( + "context" + + "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/util/runtime" + genericfeatures "k8s.io/apiserver/pkg/features" + cliflag "k8s.io/component-base/cli/flag" + "k8s.io/component-base/cli/globalflag" + "k8s.io/component-base/featuregate" + "k8s.io/component-base/logs" + "k8s.io/component-base/term" + + "github.com/clusterpedia-io/clusterpedia/cmd/apiserver/app/options" + clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" + "github.com/clusterpedia-io/clusterpedia/pkg/version/verflag" +) + +func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command { + opts := options.NewServerOptions() + + cmd := &cobra.Command{ + Use: "clusterpedia-apiserver", + RunE: func(cmd *cobra.Command, args []string) error { + verflag.PrintAndExitIfRequested() + + // Activate logging as soon as possible, after that + // show flags with the final logging configuration. + if err := opts.Logs.ValidateAndApply(clusterpediafeature.FeatureGate); err != nil { + return err + } + cliflag.PrintFlags(cmd.Flags()) + + config, err := opts.Config(true) + if err != nil { + return err + } + + server, err := config.Complete().New() + if err != nil { + return err + } + + if err := server.Run(ctx); err != nil { + return err + } + return nil + }, + } + + namedFlagSets := opts.Flags() + verflag.AddFlags(namedFlagSets.FlagSet("global")) + globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags()) + clusterpediafeature.MutableFeatureGate.AddFlag(namedFlagSets.FlagSet("mutable feature gate")) + + fs := cmd.Flags() + for _, f := range namedFlagSets.FlagSets { + fs.AddFlagSet(f) + } + + cols, _, _ := term.TerminalSize(cmd.OutOrStdout()) + cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols) + return cmd +} + +func init() { + runtime.Must(logs.AddFeatureGates(clusterpediafeature.MutableFeatureGate)) + + // The feature gate `RemainingItemCount` should default to false + // https://github.com/clusterpedia-io/clusterpedia/issues/196 + gates := clusterpediafeature.MutableFeatureGate.GetAll() + gate := gates[genericfeatures.RemainingItemCount] + gate.Default = false + gates[genericfeatures.RemainingItemCount] = gate + + clusterpediafeature.MutableFeatureGate = featuregate.NewFeatureGate() + runtime.Must(clusterpediafeature.MutableFeatureGate.Add(gates)) + clusterpediafeature.FeatureGate = clusterpediafeature.MutableFeatureGate +} diff --git a/cmd/binding-apiserver/main.go b/cmd/binding-apiserver/main.go new file mode 100644 index 000000000..1a0ff0093 --- /dev/null +++ b/cmd/binding-apiserver/main.go @@ -0,0 +1,18 @@ +package main + +import ( + "os" + + apiserver "k8s.io/apiserver/pkg/server" + "k8s.io/component-base/cli" + _ "k8s.io/component-base/logs/json/register" // for JSON log format registration + + "github.com/clusterpedia-io/clusterpedia/cmd/binding-apiserver/app" +) + +func main() { + ctx := apiserver.SetupSignalContext() + command := app.NewClusterPediaServerCommand(ctx) + code := cli.Run(command) + os.Exit(code) +} diff --git a/deploy/binding-apiserver/clusterpedia_binding_apiserver_apiservice.yaml b/deploy/binding-apiserver/clusterpedia_binding_apiserver_apiservice.yaml new file mode 100644 index 000000000..3f04bbe41 --- /dev/null +++ b/deploy/binding-apiserver/clusterpedia_binding_apiserver_apiservice.yaml @@ -0,0 +1,13 @@ +apiVersion: apiregistration.k8s.io/v1 +kind: APIService +metadata: + name: v1beta1.clusterpedia.io +spec: + insecureSkipTLSVerify: true + group: clusterpedia.io + groupPriorityMinimum: 1000 + versionPriority: 100 + service: + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system + version: v1beta1 diff --git a/deploy/binding-apiserver/clusterpedia_binding_apiserver_deployment.yaml b/deploy/binding-apiserver/clusterpedia_binding_apiserver_deployment.yaml new file mode 100644 index 000000000..75e47f139 --- /dev/null +++ b/deploy/binding-apiserver/clusterpedia_binding_apiserver_deployment.yaml @@ -0,0 +1,45 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system +--- +apiVersion: v1 +kind: Service +metadata: + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system +spec: + ports: + - port: 443 + protocol: TCP + targetPort: 443 + selector: + app: clusterpedia-binding-apiserver +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system + labels: + app: clusterpedia-binding-apiserver +spec: + replicas: 1 + selector: + matchLabels: + app: clusterpedia-binding-apiserver + template: + metadata: + labels: + app: clusterpedia-binding-apiserver + spec: + containers: + - name: binding-apiserver + image: ghcr.io/clusterpedia-io/clusterpedia/binding-apiserver:v0.4.1 + command: + - /usr/local/bin/binding-apiserver + - --secure-port=443 + - --storage-name=memory + - -v=3 + serviceAccountName: clusterpedia-binding-apiserver diff --git a/deploy/clusterpedia_apiserver_rbac.yaml b/deploy/clusterpedia_apiserver_rbac.yaml index c5e40237e..7d0ace2f3 100644 --- a/deploy/clusterpedia_apiserver_rbac.yaml +++ b/deploy/clusterpedia_apiserver_rbac.yaml @@ -28,3 +28,6 @@ subjects: - kind: ServiceAccount name: clusterpedia-controller-manager namespace: clusterpedia-system + - kind: ServiceAccount + name: clusterpedia-binding-apiserver + namespace: clusterpedia-system diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index d6f27685f..41bc564d9 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -26,6 +26,7 @@ import ( informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver" "github.com/clusterpedia-io/clusterpedia/pkg/storage" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager" "github.com/clusterpedia-io/clusterpedia/pkg/utils/filters" ) @@ -64,6 +65,8 @@ type Config struct { GenericConfig *genericapiserver.RecommendedConfig StorageFactory storage.StorageFactory + // BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager + BindingSyncController bool } type ClusterPediaServer struct { @@ -75,6 +78,8 @@ type completedConfig struct { ClientConfig *clientrest.Config StorageFactory storage.StorageFactory + // BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager + BindingSyncController bool } // CompletedConfig embeds a private pointer that cannot be instantiated outside of this package. @@ -88,6 +93,7 @@ func (cfg *Config) Complete() CompletedConfig { cfg.GenericConfig.Complete(), cfg.GenericConfig.ClientConfig, cfg.StorageFactory, + cfg.BindingSyncController, } c.GenericConfig.Version = &version.Info{ @@ -160,6 +166,12 @@ func (config completedConfig) New() (*ClusterPediaServer, error) { genericServer.AddPostStartHookOrDie("start-clusterpedia-informers", func(context genericapiserver.PostStartHookContext) error { clusterpediaInformerFactory.Start(context.StopCh) clusterpediaInformerFactory.WaitForCacheSync(context.StopCh) + + if config.BindingSyncController { + synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory) + synchromanager.Run(1, context.StopCh) + } + return nil }) diff --git a/pkg/kubeapiserver/apiserver.go b/pkg/kubeapiserver/apiserver.go index 420eeb8c3..5bd7a2003 100644 --- a/pkg/kubeapiserver/apiserver.go +++ b/pkg/kubeapiserver/apiserver.go @@ -35,6 +35,7 @@ func init() { &metav1.APIGroupList{}, &metav1.APIGroup{}, &metav1.APIResourceList{}, + &metav1.WatchEvent{}, ) } diff --git a/pkg/kubeapiserver/clusterresource_controller.go b/pkg/kubeapiserver/clusterresource_controller.go index b974851db..8f23b1796 100644 --- a/pkg/kubeapiserver/clusterresource_controller.go +++ b/pkg/kubeapiserver/clusterresource_controller.go @@ -83,7 +83,7 @@ func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alp return } - discoveryapis := c.restManager.LoadResources(resources) + discoveryapis := c.restManager.LoadResources(resources, cluster.Name) c.discoveryManager.SetClusterGroupResource(cluster.Name, discoveryapis) c.clusterresources[cluster.Name] = resources diff --git a/pkg/kubeapiserver/resource_handler.go b/pkg/kubeapiserver/resource_handler.go index 94d2522c8..748fda430 100644 --- a/pkg/kubeapiserver/resource_handler.go +++ b/pkg/kubeapiserver/resource_handler.go @@ -118,6 +118,8 @@ func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { handler = handlers.GetResource(storage, reqScope) case "list": handler = handlers.ListResource(storage, nil, reqScope, false, r.minRequestTimeout) + case "watch": + handler = handlers.ListResource(storage, storage, reqScope, true, r.minRequestTimeout) default: responsewriters.ErrorNegotiated( apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb), diff --git a/pkg/kubeapiserver/resourcerest/storage.go b/pkg/kubeapiserver/resourcerest/storage.go index 3cffdb3bd..520346b57 100644 --- a/pkg/kubeapiserver/resourcerest/storage.go +++ b/pkg/kubeapiserver/resourcerest/storage.go @@ -9,6 +9,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" genericrequest "k8s.io/apiserver/pkg/endpoints/request" genericfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/rest" @@ -20,8 +21,10 @@ import ( "github.com/clusterpedia-io/api/clusterpedia/v1beta1" "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/printers" "github.com/clusterpedia-io/clusterpedia/pkg/storage" + cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" "github.com/clusterpedia-io/clusterpedia/pkg/utils/negotiation" "github.com/clusterpedia-io/clusterpedia/pkg/utils/request" + utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" ) type RESTStorage struct { @@ -38,6 +41,7 @@ type RESTStorage struct { var _ rest.Lister = &RESTStorage{} var _ rest.Getter = &RESTStorage{} +var _ rest.Watcher = &RESTStorage{} func (s *RESTStorage) New() runtime.Object { return s.NewFunc() @@ -121,3 +125,62 @@ func (s *RESTStorage) ConvertToTable(ctx context.Context, object runtime.Object, return printers.NewDefaultTableConvertor(s.DefaultQualifiedResource).ConvertToTable(ctx, object, tableOptions) } + +func (s *RESTStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + resourceversion := options.ResourceVersion + watchRV, err := cache.NewClusterResourceVersionFromString(resourceversion) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } + + watcher := cache.NewCacheWatcher(100) + watchCache := s.Storage.GetStorageConfig().WatchCache + watchCache.Lock() + defer watchCache.Unlock() + + initEvents, err := watchCache.GetAllEventsSinceThreadUnsafe(watchRV) + if err != nil { + // To match the uncached watch implementation, once we have passed authn/authz/admission, + // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, + // rather than a directly returned error. + return newErrWatcher(err), nil + } + + func() { + watchCache.WatchersLock.Lock() + defer watchCache.WatchersLock.Unlock() + + watchCache.WatchersBuffer = append(watchCache.WatchersBuffer, watcher) + }() + + go watcher.Process(ctx, initEvents) + return watcher, nil +} + +type errWatcher struct { + result chan watch.Event +} + +func newErrWatcher(err error) *errWatcher { + errEvent := utilwatch.NewErrorEvent(err) + + // Create a watcher with room for a single event, populate it, and close the channel + watcher := &errWatcher{result: make(chan watch.Event, 1)} + watcher.result <- errEvent + close(watcher.result) + + return watcher +} + +// ResultChan implements Interface +func (c *errWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Stop implements Interface +func (c *errWatcher) Stop() { + // no-op +} diff --git a/pkg/kubeapiserver/restmanager.go b/pkg/kubeapiserver/restmanager.go index 52649bc20..b259ae738 100644 --- a/pkg/kubeapiserver/restmanager.go +++ b/pkg/kubeapiserver/restmanager.go @@ -101,7 +101,7 @@ func (m *RESTManager) GetRESTResourceInfo(gvr schema.GroupVersionResource) RESTR return infos[gvr] } -func (m *RESTManager) LoadResources(infos ResourceInfoMap) map[schema.GroupResource]discovery.ResourceDiscoveryAPI { +func (m *RESTManager) LoadResources(infos ResourceInfoMap, cluster string) map[schema.GroupResource]discovery.ResourceDiscoveryAPI { apigroups := m.groups.Load().(map[string]metav1.APIGroup) apiresources := m.resources.Load().(map[schema.GroupResource]metav1.APIResource) restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo) @@ -175,7 +175,7 @@ func (m *RESTManager) LoadResources(infos ResourceInfoMap) map[schema.GroupResou m.addAPIResourcesLocked(addedAPIResources) } if len(addedInfos) != 0 { - m.addRESTResourceInfosLocked(addedInfos) + m.addRESTResourceInfosLocked(addedInfos, cluster) } return apis } @@ -208,7 +208,7 @@ func (m *RESTManager) addAPIResourcesLocked(addedResources map[schema.GroupResou m.resources.Store(resources) } -func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo) { +func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo, cluster string) { restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo) infos := make(map[schema.GroupVersionResource]RESTResourceInfo, len(restinfos)+len(addedInfos)) @@ -225,9 +225,9 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers var err error var storage *resourcerest.RESTStorage if resourcescheme.LegacyResourceScheme.IsGroupRegistered(gvr.Group) { - storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind) + storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind, cluster) } else { - storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind) + storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind, cluster) } if err != nil { klog.ErrorS(err, "Failed to gen resource rest storage", "gvr", gvr, "kind", info.APIResource.Kind) @@ -263,11 +263,12 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers m.restResourceInfos.Store(infos) } -func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind string) (*resourcerest.RESTStorage, error) { +func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) { storageConfig, err := m.resourcetSorageConfig.NewLegacyResourceConfig(gvr.GroupResource()) if err != nil { return nil, err } + storageConfig.Cluster = cluster resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) if err != nil { @@ -290,11 +291,12 @@ func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResour }, nil } -func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind string) (*resourcerest.RESTStorage, error) { +func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) { storageConfig, err := m.resourcetSorageConfig.NewCustomResourceConfig(gvr) if err != nil { return nil, err } + storageConfig.Cluster = cluster resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig) if err != nil { diff --git a/pkg/storage/memorystorage/memory_resource_storage.go b/pkg/storage/memorystorage/memory_resource_storage.go new file mode 100644 index 000000000..2f4864c9e --- /dev/null +++ b/pkg/storage/memorystorage/memory_resource_storage.go @@ -0,0 +1,274 @@ +package memorystorage + +import ( + "bytes" + "context" + "fmt" + "reflect" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" + + internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/resourcescheme" + "github.com/clusterpedia-io/clusterpedia/pkg/storage" + cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" +) + +var ( + storages *resourceStorages +) + +type resourceStorages struct { + sync.RWMutex + resourceStorages map[schema.GroupVersionResource]*ResourceStorage +} + +func init() { + storages = &resourceStorages{ + resourceStorages: make(map[schema.GroupVersionResource]*ResourceStorage), + } +} + +type ClusterWatchEvent struct { + Event watch.Event + ClusterName string +} + +type ResourceStorage struct { + sync.RWMutex + + Codec runtime.Codec + watchCache *cache.WatchCache + stopCh chan struct{} + crvSynchro *cache.ClusterResourceVersionSynchro + incoming chan ClusterWatchEvent + storageConfig *storage.ResourceStorageConfig +} + +func (s *ResourceStorage) convertEvent(event *ClusterWatchEvent) error { + s.Lock() + s.Unlock() + klog.V(10).Infof("event: %s", event) + s.crvSynchro.UpdateClusterResourceVersion(&event.Event, event.ClusterName) + + err := s.encodeEvent(&event.Event) + if err != nil { + return fmt.Errorf("encode event failed: %v", err) + } + s.incoming <- *event + return nil +} + +func (s *ResourceStorage) dispatchEvents() { + for { + select { + case cwe, ok := <-s.incoming: + if !ok { + continue + } + + switch cwe.Event.Type { + case watch.Added: + err := s.watchCache.Add(cwe.Event.Object, cwe.ClusterName) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", cwe.Event.Object, err)) + } + case watch.Modified: + err := s.watchCache.Update(cwe.Event.Object, cwe.ClusterName) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to update watch event object (%#v) to store: %v", cwe.Event.Object, err)) + } + case watch.Deleted: + // TODO: Will any consumers need access to the "last known + // state", which is passed in event.Object? If so, may need + // to change this. + err := s.watchCache.Delete(cwe.Event.Object, cwe.ClusterName) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to delete watch event object (%#v) from store: %v", cwe.Event.Object, err)) + } + case watch.Error: + s.watchCache.ClearWatchCache(cwe.ClusterName) + default: + utilruntime.HandleError(fmt.Errorf("unable to understand watch event %#v", cwe.Event)) + continue + } + s.dispatchEvent(&cwe.Event) + case <-s.stopCh: + return + } + } +} + +func (s *ResourceStorage) dispatchEvent(event *watch.Event) { + s.watchCache.WatchersLock.RLock() + defer s.watchCache.WatchersLock.RUnlock() + for _, watcher := range s.watchCache.WatchersBuffer { + watcher.NonblockingAdd(event) + } +} + +func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig { + return s.storageConfig +} + +func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtime.Object) error { + event := s.newClusterWatchEvent(watch.Added, obj, cluster) + err := s.convertEvent(event) + if err != nil { + return err + } + return nil +} + +func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtime.Object) error { + event := s.newClusterWatchEvent(watch.Modified, obj, cluster) + err := s.convertEvent(event) + if err != nil { + return err + } + return nil +} + +func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error { + event := s.newClusterWatchEvent(watch.Deleted, obj, cluster) + err := s.convertEvent(event) + if err != nil { + return err + } + return nil +} + +func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error { + var buffer bytes.Buffer + se, err := s.watchCache.WaitUntilFreshAndGet(cluster, namespace, name) + if err != nil { + return err + } + + object := se.Object + err = s.Codec.Encode(object, &buffer) + if err != nil { + return err + } + + obj, _, err := s.Codec.Decode(buffer.Bytes(), nil, into) + if err != nil { + return err + } + + if obj != into { + return fmt.Errorf("Failed to decode resource, into is %T", into) + } + return nil +} + +func (s *ResourceStorage) newClusterWatchEvent(eventType watch.EventType, obj runtime.Object, cluster string) *ClusterWatchEvent { + return &ClusterWatchEvent{ + ClusterName: cluster, + Event: watch.Event{ + Type: eventType, + Object: obj, + }, + } +} + +func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, opts *internal.ListOptions) error { + var buffer bytes.Buffer + objects, readResourceVersion, err := s.watchCache.WaitUntilFreshAndList(opts) + if err != nil { + return err + } + + list, err := meta.ListAccessor(listObject) + if err != nil { + return err + } + if len(objects) == 0 { + return nil + } + + listPtr, err := meta.GetItemsPtr(listObject) + if err != nil { + return err + } + + v, err := conversion.EnforcePtr(listPtr) + if err != nil || v.Kind() != reflect.Slice { + return fmt.Errorf("need ptr to slice: %v", err) + } + + expected := reflect.New(v.Type().Elem()).Interface().(runtime.Object) + seen := map[string]struct{}{} + accessor := meta.NewAccessor() + deduplicated := make([]runtime.Object, 0, len(objects)) + for _, object := range objects { + buffer.Reset() + obj := object.Object + err = s.Codec.Encode(obj, &buffer) + if err != nil { + return err + } + + obj, _, err := s.Codec.Decode(buffer.Bytes(), nil, expected.DeepCopyObject()) + if err != nil { + return err + } + + name, err := accessor.Name(obj) + if err != nil { + return err + } + + if _, ok := seen[name]; !ok { + seen[name] = struct{}{} + deduplicated = append(deduplicated, obj) + } + } + + slice := reflect.MakeSlice(v.Type(), len(deduplicated), len(deduplicated)) + for i, obj := range deduplicated { + slice.Index(i).Set(reflect.ValueOf(obj).Elem()) + } + + list.SetResourceVersion(readResourceVersion.GetClusterResourceVersion()) + v.Set(slice) + return nil +} + +func (s *ResourceStorage) encodeEvent(event *watch.Event) error { + var buffer bytes.Buffer + + gk := event.Object.GetObjectKind().GroupVersionKind().GroupKind() + config := s.storageConfig + if ok := resourcescheme.LegacyResourceScheme.IsGroupRegistered(gk.Group); ok { + dest, err := resourcescheme.LegacyResourceScheme.New(config.MemoryVersion.WithKind(gk.Kind)) + if err != nil { + return err + } + + object := event.Object + err = config.Codec.Encode(object, &buffer) + if err != nil { + return err + } + + obj, _, err := config.Codec.Decode(buffer.Bytes(), nil, dest) + if err != nil { + return err + } + + if obj != dest { + return err + } + event.Object = dest + } + + return nil +} diff --git a/pkg/storage/memorystorage/memory_storage.go b/pkg/storage/memorystorage/memory_storage.go new file mode 100644 index 000000000..de2306bbf --- /dev/null +++ b/pkg/storage/memorystorage/memory_storage.go @@ -0,0 +1,92 @@ +package memorystorage + +import ( + "context" + "fmt" + internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/storage" + cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" + utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type StorageFactory struct { +} + +func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) { + storages.Lock() + defer storages.Unlock() + + gvr := schema.GroupVersionResource{ + Group: config.GroupResource.Group, + Version: config.StorageVersion.Version, + Resource: config.GroupResource.Resource, + } + + if resourceStorage, ok := storages.resourceStorages[gvr]; ok { + watchCache := resourceStorage.watchCache + if config.Namespaced { + watchCache.KeyFunc = cache.GetKeyFunc(gvr, config.Namespaced) + watchCache.IsNamespaced = true + } + if _, ok := watchCache.GetStores()[config.Cluster]; !ok { + resourceStorage.watchCache.AddIndexer(config.Cluster, nil) + } + + resourceStorage.crvSynchro.SetClusterResourceVersion(config.Cluster, "0") + + return resourceStorage, nil + } + + watchCache := cache.NewWatchCache(100, gvr, config.Namespaced, config.Cluster) + config.WatchCache = watchCache + resourceStorage := &ResourceStorage{ + incoming: make(chan ClusterWatchEvent, 100), + Codec: config.Codec, + crvSynchro: cache.NewClusterResourceVersionSynchro(config.Cluster), + watchCache: watchCache, + storageConfig: config, + } + + go resourceStorage.dispatchEvents() + + storages.resourceStorages[gvr] = resourceStorage + + return resourceStorage, nil +} + +func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionResource) (storage.CollectionResourceStorage, error) { + return nil, nil +} + +func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) { + return nil, nil +} + +func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error { + storages.Lock() + defer storages.Unlock() + for _, rs := range storages.resourceStorages { + rs.watchCache.DeleteIndexer(cluster) + // If a pediacluster is deleted from clusterpedia,then the informer of client-go should be list and watch again + errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("PediaCluster %s is deleted", cluster)) + rs.dispatchEvent(&errorEvent) + } + return nil +} + +func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error { + storages.Lock() + defer storages.Unlock() + if rs, ok := storages.resourceStorages[gvr]; ok { + rs.watchCache.DeleteIndexer(cluster) + // If a gvr is deleted from clusterpedia,then the informer of client-go should be list and watch again + errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("GVR %v is deleted", gvr)) + rs.dispatchEvent(&errorEvent) + } + return nil +} + +func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error) { + return nil, nil +} diff --git a/pkg/storage/memorystorage/register.go b/pkg/storage/memorystorage/register.go new file mode 100644 index 000000000..e3cc3e206 --- /dev/null +++ b/pkg/storage/memorystorage/register.go @@ -0,0 +1,17 @@ +package memorystorage + +import ( + "github.com/clusterpedia-io/clusterpedia/pkg/storage" +) + +const ( + StorageName = "memory" +) + +func init() { + storage.RegisterStorageFactoryFunc(StorageName, NewStorageFactory) +} + +func NewStorageFactory(_ string) (storage.StorageFactory, error) { + return &StorageFactory{}, nil +} diff --git a/pkg/storage/memorystorage/watchcache/cache_watcher.go b/pkg/storage/memorystorage/watchcache/cache_watcher.go new file mode 100644 index 000000000..543e5b2ac --- /dev/null +++ b/pkg/storage/memorystorage/watchcache/cache_watcher.go @@ -0,0 +1,140 @@ +package watchcache + +import ( + "context" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +// CacheWatcher implements watch.Interface +type CacheWatcher struct { + input chan *watch.Event + result chan watch.Event + done chan struct{} + stopped bool + forget func() +} + +func NewCacheWatcher(chanSize int) *CacheWatcher { + return &CacheWatcher{ + input: make(chan *watch.Event, chanSize), + result: make(chan watch.Event, chanSize), + done: make(chan struct{}), + stopped: false, + forget: func() {}, + } +} + +// Implements watch.Interface. +func (c *CacheWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Implements watch.Interface. +func (c *CacheWatcher) Stop() { + c.forget() +} + +func (c *CacheWatcher) StopThreadUnsafe() { + if !c.stopped { + c.stopped = true + close(c.done) + close(c.input) + } +} + +func (c *CacheWatcher) NonblockingAdd(event *watch.Event) bool { + select { + case c.input <- event: + return true + default: + return false + } +} + +// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) +func (c *CacheWatcher) Add(event *watch.Event, timer *time.Timer) bool { + // Try to send the event immediately, without blocking. + if c.NonblockingAdd(event) { + return true + } + + closeFunc := func() { + // This means that we couldn't send event to that watcher. + // Since we don't want to block on it infinitely, + // we simply terminate it. + //klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String()) + c.forget() + } + + if timer == nil { + closeFunc() + return false + } + + // OK, block sending, but only until timer fires. + select { + case c.input <- event: + return true + case <-timer.C: + closeFunc() + return false + } +} + +func (c *CacheWatcher) sendWatchCacheEvent(event *watch.Event) { + //watchEvent := c.convertToWatchEvent(event) + watchEvent := event + if watchEvent == nil { + // Watcher is not interested in that object. + return + } + + // We need to ensure that if we put event X to the c.result, all + // previous events were already put into it before, no matter whether + // c.done is close or not. + // Thus we cannot simply select from c.done and c.result and this + // would give us non-determinism. + // At the same time, we don't want to block infinitely on putting + // to c.result, when c.done is already closed. + + // This ensures that with c.done already close, we at most once go + // into the next select after this. With that, no matter which + // statement we choose there, we will deliver only consecutive + // events. + select { + case <-c.done: + return + default: + } + + select { + case c.result <- *watchEvent: + case <-c.done: + } +} + +// Process send the events which stored in watchCache into the result channel,and select the event from input channel into result channel continuously. +func (c *CacheWatcher) Process(ctx context.Context, initEvents []*watch.Event) { + defer utilruntime.HandleCrash() + + for _, event := range initEvents { + c.sendWatchCacheEvent(event) + } + + defer close(c.result) + defer c.Stop() + for { + select { + case event, ok := <-c.input: + if !ok { + return + } + c.sendWatchCacheEvent(event) + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/storage/memorystorage/watchcache/cluster_resource_version.go b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go new file mode 100644 index 000000000..abc7ec0be --- /dev/null +++ b/pkg/storage/memorystorage/watchcache/cluster_resource_version.go @@ -0,0 +1,121 @@ +package watchcache + +import ( + "encoding/base64" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" +) + +// ClusterResourceVersion holds the pediaCluster name and its latest resourceVersion +type ClusterResourceVersion struct { + rvmap map[string]string +} + +func NewClusterResourceVersion(cluster string) *ClusterResourceVersion { + return &ClusterResourceVersion{ + rvmap: map[string]string{cluster: "0"}, + } +} + +func NewClusterResourceVersionFromString(rv string) (*ClusterResourceVersion, error) { + result := &ClusterResourceVersion{ + rvmap: map[string]string{}, + } + if rv == "0" || rv == "" { + return result, nil + } + + decoded, err := base64.RawURLEncoding.DecodeString(rv) + if err != nil { + return nil, fmt.Errorf("base64 decode failed: %v", err) + } + + err = json.Unmarshal(decoded, &result.rvmap) + if err != nil { + return nil, fmt.Errorf("json decode failed: %v", err) + } + + return result, nil +} + +// GetClusterResourceVersion return a base64 encode string of ClusterResourceVersion +func (crv *ClusterResourceVersion) GetClusterResourceVersion() string { + bytes, err := json.Marshal(crv.rvmap) + if err != nil { + klog.Errorf("base64 encode failed: %v", err) + return "" + } + return base64.RawURLEncoding.EncodeToString(bytes) +} + +// GetClusterResourceVersionFromEvent return a ClusterResourceVersion from watch event +func GetClusterResourceVersionFromEvent(event *watch.Event) (*ClusterResourceVersion, error) { + accessor, err := meta.Accessor(event.Object) + if err != nil { + return nil, fmt.Errorf("unable to understand watch event %#v", event) + } + return NewClusterResourceVersionFromString(accessor.GetResourceVersion()) +} + +func (crv *ClusterResourceVersion) IsEqual(another *ClusterResourceVersion) bool { + if len(crv.rvmap) != len(another.rvmap) { + return false + } + for key, value := range crv.rvmap { + if another.rvmap[key] != value { + return false + } + } + return true +} + +func (crv *ClusterResourceVersion) IsEmpty() bool { + return len(crv.rvmap) == 0 +} + +type ClusterResourceVersionSynchro struct { + crv *ClusterResourceVersion + + sync.RWMutex +} + +func NewClusterResourceVersionSynchro(cluster string) *ClusterResourceVersionSynchro { + return &ClusterResourceVersionSynchro{ + crv: NewClusterResourceVersion(cluster), + } +} + +// UpdateClusterResourceVersion update the resourceVersion in ClusterResourceVersionSynchro to the latest +func (crvs *ClusterResourceVersionSynchro) UpdateClusterResourceVersion(event *watch.Event, cluster string) { + crvs.Lock() + defer crvs.Unlock() + + crv := crvs.crv + accessor := meta.NewAccessor() + rv, _ := accessor.ResourceVersion(event.Object) + crv.rvmap[cluster] = rv + + bytes, err := json.Marshal(crv.rvmap) + if err != nil { + klog.Errorf("base64 encode failed: %v", err) + return + } + + err = accessor.SetResourceVersion(event.Object, base64.RawURLEncoding.EncodeToString(bytes)) + if err != nil { + klog.Warningf("set resourceVersion failed: %v, may be it's a clear watch cache order event", err) + return + } +} + +func (crvs *ClusterResourceVersionSynchro) SetClusterResourceVersion(clusterName string, resourceVersion string) { + crvs.Lock() + defer crvs.Unlock() + + crvs.crv.rvmap[clusterName] = resourceVersion +} diff --git a/pkg/storage/memorystorage/watchcache/watch_cache.go b/pkg/storage/memorystorage/watchcache/watch_cache.go new file mode 100644 index 000000000..d7f5d4a00 --- /dev/null +++ b/pkg/storage/memorystorage/watchcache/watch_cache.go @@ -0,0 +1,440 @@ +package watchcache + +import ( + "context" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/generic/registry" + "k8s.io/client-go/tools/cache" + "k8s.io/utils/strings/slices" + + internal "github.com/clusterpedia-io/api/clusterpedia" +) + +// StoreElement keeping the structs of resource in k8s(key, object, labels, fields). +type StoreElement struct { + Key string + Object runtime.Object + Labels labels.Set + Fields fields.Set +} + +// WatchCache implements a Store interface. +// However, it depends on the elements implementing runtime.Object interface. +// +// WatchCache is a "sliding window" (with a limited capacity) of objects +// observed from a watch. +type WatchCache struct { + sync.RWMutex + + // Maximum size of history window. + capacity int + + // KeyFunc is used to get a key in the underlying storage for a given object. + KeyFunc func(runtime.Object) (string, error) + + // getAttrsFunc is used to get labels and fields of an object. + getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) + + // cache is used a cyclic buffer - its first element (with the smallest + // resourceVersion) is defined by startIndex, its last element is defined + // by endIndex (if cache is full it will be startIndex + capacity). + // Both startIndex and endIndex can be greater than buffer capacity - + // you should always apply modulo capacity to get an index in cache array. + cache []*watch.Event + startIndex int + endIndex int + + // store will effectively support LIST operation from the "end of cache + // history" i.e. from the moment just after the newest cached watched event. + // It is necessary to effectively allow clients to start watching at now. + // NOTE: We assume that is thread-safe. + stores map[string]cache.Indexer + + // ResourceVersion up to which the watchCache is propagated. + resourceVersion *ClusterResourceVersion + + //eventHandler func(*watchCacheEvent) + + WatchersLock sync.RWMutex + + // watchersBuffer is a list of watchers potentially interested in currently + // dispatched event. + WatchersBuffer []*CacheWatcher + // blockedWatchers is a list of watchers whose buffer is currently full. + BlockedWatchers []*CacheWatcher + IsNamespaced bool +} + +type keyFunc func(runtime.Object) (string, error) + +func GetKeyFunc(gvr schema.GroupVersionResource, isNamespaced bool) keyFunc { + prefix := gvr.Group + "/" + gvr.Resource + + var KeyFunc func(ctx context.Context, name string) (string, error) + if isNamespaced { + KeyFunc = func(ctx context.Context, name string) (string, error) { + return registry.NamespaceKeyFunc(ctx, prefix, name) + } + } else { + KeyFunc = func(ctx context.Context, name string) (string, error) { + return registry.NoNamespaceKeyFunc(ctx, prefix, name) + } + } + + // We adapt the store's keyFunc so that we can use it with the StorageDecorator + // without making any assumptions about where objects are stored in etcd + kc := func(obj runtime.Object) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return "", err + } + + if isNamespaced { + return KeyFunc(genericapirequest.WithNamespace(genericapirequest.NewContext(), accessor.GetNamespace()), accessor.GetName()) + } + + return KeyFunc(genericapirequest.NewContext(), accessor.GetName()) + } + + return kc +} + +func storeElementKey(obj interface{}) (string, error) { + elem, ok := obj.(*StoreElement) + if !ok { + return "", fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Key, nil +} + +func storeElementIndexers(indexers *cache.Indexers) cache.Indexers { + if indexers == nil { + return cache.Indexers{} + } + ret := cache.Indexers{} + for indexName, indexFunc := range *indexers { + ret[indexName] = storeElementIndexFunc(indexFunc) + } + return ret +} + +func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc { + return func(obj interface{}) (strings []string, e error) { + seo, err := storeElementObject(obj) + if err != nil { + return nil, err + } + return objIndexFunc(seo) + } +} + +func storeElementObject(obj interface{}) (runtime.Object, error) { + elem, ok := obj.(*StoreElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Object, nil +} + +func NewWatchCache(capacity int, gvr schema.GroupVersionResource, isNamespaced bool, newCluster string) *WatchCache { + wc := &WatchCache{ + capacity: capacity, + KeyFunc: GetKeyFunc(gvr, isNamespaced), + getAttrsFunc: nil, + cache: make([]*watch.Event, capacity), + startIndex: 0, + endIndex: 0, + stores: make(map[string]cache.Indexer), + IsNamespaced: isNamespaced, + } + + wc.AddIndexer(newCluster, nil) + return wc +} + +func (w *WatchCache) GetStores() map[string]cache.Indexer { + return w.stores +} + +func (w *WatchCache) processEvent(event watch.Event, resourceVersion *ClusterResourceVersion, updateFunc func(*StoreElement) error) error { + key, err := w.KeyFunc(event.Object) + if err != nil { + return fmt.Errorf("couldn't compute key: %v", err) + } + elem := &StoreElement{Key: key, Object: event.Object} + if w.getAttrsFunc != nil { + elem.Labels, elem.Fields, err = w.getAttrsFunc(event.Object) + if err != nil { + return err + } + } + + wcEvent := event + + if err := func() error { + // TODO: We should consider moving this lock below after the watchCacheEvent + // is created. In such situation, the only problematic scenario is Replace( + // happening after getting object from store and before acquiring a lock. + // Maybe introduce another lock for this purpose. + w.Lock() + defer w.Unlock() + + w.updateCache(&wcEvent) + w.resourceVersion = resourceVersion + return updateFunc(elem) + }(); err != nil { + return err + } + + return nil +} + +// Add takes runtime.Object as an argument. +func (w *WatchCache) Add(obj interface{}, clusterName string) error { + object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{Type: watch.Added, Object: object} + + f := func(elem *StoreElement) error { + return w.stores[clusterName].Add(elem) + } + return w.processEvent(event, resourceVersion, f) +} + +// Update takes runtime.Object as an argument. +func (w *WatchCache) Update(obj interface{}, clusterName string) error { + object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{Type: watch.Modified, Object: object} + + f := func(elem *StoreElement) error { + return w.stores[clusterName].Update(elem) + } + return w.processEvent(event, resourceVersion, f) +} + +// Delete takes runtime.Object as an argument. +func (w *WatchCache) Delete(obj interface{}, clusterName string) error { + object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) + if err != nil { + return err + } + event := watch.Event{Type: watch.Deleted, Object: object} + + f := func(elem *StoreElement) error { return w.stores[clusterName].Delete(elem) } + return w.processEvent(event, resourceVersion, f) +} + +func (w *WatchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, *ClusterResourceVersion, error) { + object, ok := obj.(runtime.Object) + if !ok { + return nil, &ClusterResourceVersion{}, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) + } + + resourceVersion, err := meta.NewAccessor().ResourceVersion(object) + if err != nil { + return nil, &ClusterResourceVersion{}, err + } + + clusterRvStore, err := NewClusterResourceVersionFromString(resourceVersion) + if err != nil { + return nil, &ClusterResourceVersion{}, err + } + return object, clusterRvStore, nil +} + +// Assumes that lock is already held for write. +func (w *WatchCache) updateCache(event *watch.Event) { + if w.endIndex == w.startIndex+w.capacity { + // Cache is full - remove the oldest element. + w.startIndex++ + } + w.cache[w.endIndex%w.capacity] = event + w.endIndex++ +} + +// WaitUntilFreshAndList returns list of pointers to objects. +func (w *WatchCache) WaitUntilFreshAndList(opts *internal.ListOptions) ([]*StoreElement, *ClusterResourceVersion, error) { + w.RLock() + defer w.RUnlock() + + /* // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only + // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we + // want - they will be filtered out later. The fact that we return less things is only further performance improvement. + // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. + for _, matchValue := range matchValues { + if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { + return result, w.resourceVersion, nil + } + }*/ + var result []*StoreElement + accessor := meta.NewAccessor() + for _, store := range w.stores { + for _, obj := range store.List() { + se := obj.(*StoreElement) + ns, err := accessor.Namespace(se.Object) + if err != nil { + return result, w.resourceVersion, err + } + if opts.Namespaces != nil { + if slices.Contains(opts.Namespaces, ns) { + result = append(result, se) + continue + } else { + continue + } + } + result = append(result, se) + } + } + return result, w.resourceVersion, nil +} + +// WaitUntilFreshAndGet returns list of pointers to objects. +func (w *WatchCache) WaitUntilFreshAndGet(cluster, namespace, name string) (*StoreElement, error) { + w.RLock() + defer w.RUnlock() + + var result *StoreElement + accessor := meta.NewAccessor() + if _, ok := w.stores[cluster]; !ok { + return nil, fmt.Errorf("cluster %s is not existed", cluster) + } + + for _, obj := range w.stores[cluster].List() { + se := obj.(*StoreElement) + ns, err := accessor.Namespace(se.Object) + if err != nil { + return result, err + } + if namespace != "" && namespace != ns { + continue + } + n, err := accessor.Name(se.Object) + if err != nil { + return result, err + } + if name != "" { + if ns == namespace && n == name { + return se, nil + } else { + continue + } + } + } + + return result, nil +} + +// GetAllEventsSinceThreadUnsafe returns watch event from slice window in watchCache by the resourceVersion +func (w *WatchCache) GetAllEventsSinceThreadUnsafe(resourceVersion *ClusterResourceVersion) ([]*watch.Event, error) { + size := w.endIndex - w.startIndex + oldestObj := w.cache[w.startIndex%w.capacity] + if resourceVersion.IsEmpty() { + // resourceVersion = 0 means that we don't require any specific starting point + // and we would like to start watching from ~now. + // However, to keep backward compatibility, we additionally need to return the + // current state and only then start watching from that point. + // + // TODO: In v2 api, we should stop returning the current state - #13969. + var allItems []interface{} + for _, store := range w.stores { + allItems = append(allItems, store.List()...) + } + result := make([]*watch.Event, len(allItems)) + for i, item := range allItems { + elem, ok := item.(*StoreElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", elem) + } + result[i] = &watch.Event{ + Type: watch.Added, + Object: elem.Object, + } + } + return result, nil + } + + var index int + var founded bool + for index = 0; w.startIndex+index < w.endIndex; index++ { + rv, err := GetClusterResourceVersionFromEvent(w.cache[(w.startIndex+index)%w.capacity]) + if err != nil { + return nil, err + } + if rv.IsEqual(resourceVersion) { + founded = true + index++ + break + } + } + + var result []*watch.Event + if !founded { + oldest, err := GetClusterResourceVersionFromEvent(oldestObj) + if err != nil { + return nil, err + } + return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %#v (%#v)", resourceVersion, oldest)) + } else { + i := 0 + result = make([]*watch.Event, size-index) + for ; w.startIndex+index < w.endIndex; index++ { + result[i] = w.cache[(w.startIndex+index)%w.capacity] + i++ + } + } + + return result, nil +} + +func (w *WatchCache) AddIndexer(clusterName string, indexers *cache.Indexers) { + w.Lock() + defer w.Unlock() + w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)) +} + +func (w *WatchCache) DeleteIndexer(clusterName string) { + w.Lock() + defer w.Unlock() + + if _, ok := w.stores[clusterName]; !ok { + return + } + + delete(w.stores, clusterName) + + //clear cache + w.startIndex = 0 + w.endIndex = 0 +} + +func (w *WatchCache) ClearWatchCache(clusterName string) { + w.Lock() + defer w.Unlock() + + if _, ok := w.stores[clusterName]; !ok { + return + } + + delete(w.stores, clusterName) + w.stores[clusterName] = cache.NewIndexer(storeElementKey, storeElementIndexers(nil)) + + //clear cache + w.startIndex = 0 + w.endIndex = 0 +} diff --git a/pkg/storage/options/options.go b/pkg/storage/options/options.go index bb2b493e8..388d3e33f 100644 --- a/pkg/storage/options/options.go +++ b/pkg/storage/options/options.go @@ -7,6 +7,7 @@ import ( "github.com/spf13/pflag" _ "github.com/clusterpedia-io/clusterpedia/pkg/storage/internalstorage" + "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage" ) type StorageOptions struct { @@ -23,6 +24,10 @@ func (o *StorageOptions) Validate() []error { return nil } + if o.Name == memorystorage.StorageName { + return nil + } + var errors []error if info, err := os.Stat(o.ConfigPath); err != nil { errors = append(errors, err) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d099767fa..d1b2df59b 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -7,6 +7,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache" ) type StorageFactory interface { @@ -42,4 +43,7 @@ type ResourceStorageConfig struct { Codec runtime.Codec StorageVersion schema.GroupVersion MemoryVersion schema.GroupVersion + WatchCache *watchcache.WatchCache + Cluster string + Namespaced bool } diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go index f71dc0300..d47fee746 100644 --- a/pkg/synchromanager/clustersynchro/cluster_synchro.go +++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go @@ -306,8 +306,10 @@ func (s *ClusterSynchro) setSyncResources() { if _, ok := s.storageResourceSynchros.Load(storageGVR); ok { continue } + resourceConfig := config.storageConfig + resourceConfig.Cluster = s.name - resourceStorage, err := s.storage.NewResourceStorage(config.storageConfig) + resourceStorage, err := s.storage.NewResourceStorage(resourceConfig) if err != nil { klog.ErrorS(err, "Failed to create resource storage", "cluster", s.name, "storage resource", storageGVR) updateSyncConditions(storageGVR, clusterv1alpha2.ResourceSyncStatusPending, "SynchroCreateFailed", fmt.Sprintf("new resource storage failed: %s", err)) diff --git a/pkg/synchromanager/clustersynchro/resource_negotiator.go b/pkg/synchromanager/clustersynchro/resource_negotiator.go index e0ad66d96..6158788f0 100644 --- a/pkg/synchromanager/clustersynchro/resource_negotiator.go +++ b/pkg/synchromanager/clustersynchro/resource_negotiator.go @@ -117,6 +117,7 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu } storageConfig, err := negotiator.resourceStorageConfig.NewConfig(syncGVR) + if err != nil { syncCondition.Reason = "SynchroCreateFailed" syncCondition.Message = fmt.Sprintf("new resource storage config failed: %s", err) @@ -124,6 +125,7 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu continue } + storageConfig.Namespaced = apiResource.Namespaced storageGVR := storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version) syncCondition.StorageVersion = storageGVR.Version if syncGR != storageConfig.StorageGroupResource { diff --git a/pkg/utils/watch/watchevent.go b/pkg/utils/watch/watchevent.go new file mode 100644 index 000000000..1e566699e --- /dev/null +++ b/pkg/utils/watch/watchevent.go @@ -0,0 +1,29 @@ +package watch + +import ( + "net/http" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +// NewErrorEvent return a watch event of error status with an error +func NewErrorEvent(err error) watch.Event { + errEvent := watch.Event{Type: watch.Error} + switch err := err.(type) { + case runtime.Object: + errEvent.Object = err + case *apierrors.StatusError: + errEvent.Object = &err.ErrStatus + default: + errEvent.Object = &metav1.Status{ + Status: metav1.StatusFailure, + Message: err.Error(), + Reason: metav1.StatusReasonInternalError, + Code: http.StatusInternalServerError, + } + } + return errEvent +}