Skip to content

Commit

Permalink
Add memory storage for supporting ListAndWatch
Browse files Browse the repository at this point in the history
Co-authored-by: duanmeng <duanmeng_yewu@cmss.chinamobile.com>
Co-authored-by: wuyingjun <wuyingjun_yewu@cmss.chinamobile.com>
Co-authored-by: hanweisen <hanweisen_yewu@cmss.chinamobile.com>
Signed-off-by: zhangyongxi <zhangyongxi_yewu@cmss.chinamobile.com>
  • Loading branch information
4 people committed Sep 7, 2022
1 parent e43ea17 commit 0e6e290
Show file tree
Hide file tree
Showing 25 changed files with 1,432 additions and 16 deletions.
48 changes: 45 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ifeq ($(LATEST_TAG),$(shell git describe --abbrev=0 --tags))
VERSION=$(LATEST_TAG)
endif

all: apiserver clustersynchro-manager controller-manager
all: apiserver binding-apiserver clustersynchro-manager controller-manager

gen-clusterconfigs:
./hack/gen-clusterconfigs.sh
Expand Down Expand Up @@ -69,6 +69,13 @@ apiserver:
-o bin/apiserver \
cmd/apiserver/main.go

.PHONY: binding-apiserver
binding-apiserver:
CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \
-ldflags $(LDFLAGS) \
-o bin/binding-apiserver \
cmd/binding-apiserver/main.go

.PHONY: clustersynchro-manager
clustersynchro-manager:
CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build \
Expand All @@ -84,7 +91,7 @@ controller-manager:
cmd/controller-manager/main.go

.PHONY: images
images: image-apiserver image-clustersynchro-manager image-controller-manager
images: image-apiserver image-binding-apiserver image-clustersynchro-manager image-controller-manager

image-apiserver:
GOOS="linux" $(MAKE) apiserver
Expand All @@ -94,6 +101,15 @@ image-apiserver:
--load \
--build-arg BASEIMAGE=$(BASEIMAGE) \
--build-arg BINNAME=apiserver .

image-binding-apiserver:
GOOS="linux" $(MAKE) binding-apiserver
docker buildx build \
-t ${REGISTRY}/binding-apiserver-$(GOARCH):$(VERSION) \
--platform=linux/$(GOARCH) \
--load \
--build-arg BASEIMAGE=$(BASEIMAGE) \
--build-arg BINNAME=binding-apiserver .

image-clustersynchro-manager:
GOOS="linux" $(MAKE) clustersynchro-manager
Expand All @@ -114,7 +130,7 @@ image-controller-manager:
--build-arg BINNAME=controller-manager .

.PHONY: push-images
push-images: push-apiserver-image push-clustersynchro-manager-image push-controller-manager-image
push-images: push-apiserver-image push-binding-apiserver-image push-clustersynchro-manager-image push-controller-manager-image

# clean manifest https://github.com/docker/cli/issues/954#issuecomment-586722447
push-apiserver-image: clean-apiserver-manifest
Expand All @@ -138,6 +154,28 @@ push-apiserver-image: clean-apiserver-manifest
docker manifest push $(REGISTRY)/apiserver:latest; \
fi;

# clean manifest https://github.com/docker/cli/issues/954#issuecomment-586722447
push-binding-apiserver-image: clean-binding-apiserver-manifest
set -e; \
images=""; \
for arch in $(RELEASE_ARCHS); do \
GOARCH=$$arch $(MAKE) image-binding-apiserver; \
image=$(REGISTRY)/binding-apiserver-$$arch:$(VERSION); \
docker push $$image; \
images="$$images $$image"; \
if [ $(VERSION) != latest ]; then \
latest_image=$(REGISTRY)/binding-apiserver-$$arch:latest; \
docker tag $$image $$latest_image; \
docker push $$latest_image; \
fi; \
done; \
docker manifest create $(REGISTRY)/binding-apiserver:$(VERSION) $$images; \
docker manifest push $(REGISTRY)/binding-apiserver:$(VERSION); \
if [ $(VERSION) != latest ]; then \
docker manifest create $(REGISTRY)/binding-apiserver:latest $$images; \
docker manifest push $(REGISTRY)/binding-apiserver:latest; \
fi;

# clean manifest https://github.com/docker/cli/issues/954#issuecomment-586722447
push-clustersynchro-manager-image: clean-clustersynchro-manager-manifest
set -e; \
Expand Down Expand Up @@ -190,6 +228,10 @@ clean-apiserver-manifest:
docker manifest rm $(REGISTRY)/apiserver:$(VERSION) 2>/dev/null;\
docker manifest rm $(REGISTRY)/apiserver:latest 2>/dev/null; exit 0

clean-binding-apiserver-manifest:
docker manifest rm $(REGISTRY)/binding-apiserver:$(VERSION) 2>/dev/null;\
docker manifest rm $(REGISTRY)/binding-apiserver:latest 2>/dev/null; exit 0

clean-clustersynchro-manager-manifest:
docker manifest rm $(REGISTRY)/clustersynchro-manager:$(VERSION) 2>/dev/null;\
docker manifest rm $(REGISTRY)/clustersynchro-manager:latest 2>/dev/null; exit 0
Expand Down
2 changes: 1 addition & 1 deletion cmd/apiserver/app/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command {
}
cliflag.PrintFlags(cmd.Flags())

config, err := opts.Config()
config, err := opts.Config(false)
if err != nil {
return err
}
Expand Down
17 changes: 14 additions & 3 deletions cmd/apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package options
import (
"fmt"
"net"
"net/http"
"strings"

utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -72,7 +75,7 @@ func (o *ClusterPediaServerOptions) Validate() error {
return utilerrors.NewAggregate(errors)
}

func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
func (o *ClusterPediaServerOptions) Config(bindingSyncController bool) (*apiserver.Config, error) {
if err := o.Validate(); err != nil {
return nil, err
}
Expand All @@ -91,6 +94,13 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
o.Admission.DisablePlugins = append(o.Admission.DisablePlugins, lifecycle.PluginName)

genericConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs)

// todo
// support watch to LongRunningFunc
genericConfig.LongRunningFunc = func(r *http.Request, requestInfo *genericrequest.RequestInfo) bool {
return strings.Contains(r.RequestURI, "watch")
}

// genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme))
// genericConfig.OpenAPIConfig.Info.Title = openAPITitle
// genericConfig.OpenAPIConfig.Info.Version= openAPIVersion
Expand All @@ -100,8 +110,9 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
}

return &apiserver.Config{
GenericConfig: genericConfig,
StorageFactory: storage,
GenericConfig: genericConfig,
StorageFactory: storage,
BindingSyncController: bindingSyncController,
}, nil
}

Expand Down
80 changes: 80 additions & 0 deletions cmd/binding-apiserver/app/binding_apiserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package app

import (
"context"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/runtime"
genericfeatures "k8s.io/apiserver/pkg/features"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
"k8s.io/component-base/term"

"github.com/clusterpedia-io/clusterpedia/cmd/apiserver/app/options"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
)

func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command {
opts := options.NewServerOptions()

cmd := &cobra.Command{
Use: "clusterpedia-apiserver",
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()

// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := opts.Logs.ValidateAndApply(clusterpediafeature.FeatureGate); err != nil {
return err
}
cliflag.PrintFlags(cmd.Flags())

config, err := opts.Config(true)
if err != nil {
return err
}

server, err := config.Complete().New()
if err != nil {
return err
}

if err := server.Run(ctx); err != nil {
return err
}
return nil
},
}

namedFlagSets := opts.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
clusterpediafeature.MutableFeatureGate.AddFlag(namedFlagSets.FlagSet("mutable feature gate"))

fs := cmd.Flags()
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}

cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
return cmd
}

func init() {
runtime.Must(logs.AddFeatureGates(clusterpediafeature.MutableFeatureGate))

// The feature gate `RemainingItemCount` should default to false
// https://github.com/clusterpedia-io/clusterpedia/issues/196
gates := clusterpediafeature.MutableFeatureGate.GetAll()
gate := gates[genericfeatures.RemainingItemCount]
gate.Default = false
gates[genericfeatures.RemainingItemCount] = gate

clusterpediafeature.MutableFeatureGate = featuregate.NewFeatureGate()
runtime.Must(clusterpediafeature.MutableFeatureGate.Add(gates))
clusterpediafeature.FeatureGate = clusterpediafeature.MutableFeatureGate
}
18 changes: 18 additions & 0 deletions cmd/binding-apiserver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import (
"os"

apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/logs/json/register" // for JSON log format registration

"github.com/clusterpedia-io/clusterpedia/cmd/binding-apiserver/app"
)

func main() {
ctx := apiserver.SetupSignalContext()
command := app.NewClusterPediaServerCommand(ctx)
code := cli.Run(command)
os.Exit(code)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
name: v1beta1.clusterpedia.io
spec:
insecureSkipTLSVerify: true
group: clusterpedia.io
groupPriorityMinimum: 1000
versionPriority: 100
service:
name: clusterpedia-binding-apiserver
namespace: clusterpedia-system
version: v1beta1
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: clusterpedia-binding-apiserver
namespace: clusterpedia-system
---
apiVersion: v1
kind: Service
metadata:
name: clusterpedia-binding-apiserver
namespace: clusterpedia-system
spec:
ports:
- port: 443
protocol: TCP
targetPort: 443
selector:
app: clusterpedia-binding-apiserver
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: clusterpedia-binding-apiserver
namespace: clusterpedia-system
labels:
app: clusterpedia-binding-apiserver
spec:
replicas: 1
selector:
matchLabels:
app: clusterpedia-binding-apiserver
template:
metadata:
labels:
app: clusterpedia-binding-apiserver
spec:
containers:
- name: binding-apiserver
image: ghcr.io/clusterpedia-io/clusterpedia/binding-apiserver:v0.4.1
command:
- /usr/local/bin/binding-apiserver
- --secure-port=443
- --storage-name=memory
- -v=3
serviceAccountName: clusterpedia-binding-apiserver
3 changes: 3 additions & 0 deletions deploy/clusterpedia_apiserver_rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ subjects:
- kind: ServiceAccount
name: clusterpedia-controller-manager
namespace: clusterpedia-system
- kind: ServiceAccount
name: clusterpedia-binding-apiserver
namespace: clusterpedia-system
12 changes: 12 additions & 0 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/filters"
)

Expand Down Expand Up @@ -64,6 +65,8 @@ type Config struct {
GenericConfig *genericapiserver.RecommendedConfig

StorageFactory storage.StorageFactory
// BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager
BindingSyncController bool
}

type ClusterPediaServer struct {
Expand All @@ -75,6 +78,8 @@ type completedConfig struct {

ClientConfig *clientrest.Config
StorageFactory storage.StorageFactory
// BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager
BindingSyncController bool
}

// CompletedConfig embeds a private pointer that cannot be instantiated outside of this package.
Expand All @@ -88,6 +93,7 @@ func (cfg *Config) Complete() CompletedConfig {
cfg.GenericConfig.Complete(),
cfg.GenericConfig.ClientConfig,
cfg.StorageFactory,
cfg.BindingSyncController,
}

c.GenericConfig.Version = &version.Info{
Expand Down Expand Up @@ -160,6 +166,12 @@ func (config completedConfig) New() (*ClusterPediaServer, error) {
genericServer.AddPostStartHookOrDie("start-clusterpedia-informers", func(context genericapiserver.PostStartHookContext) error {
clusterpediaInformerFactory.Start(context.StopCh)
clusterpediaInformerFactory.WaitForCacheSync(context.StopCh)

if config.BindingSyncController {
synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory)
synchromanager.Run(1, context.StopCh)
}

return nil
})

Expand Down
1 change: 1 addition & 0 deletions pkg/kubeapiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
&metav1.APIGroupList{},
&metav1.APIGroup{},
&metav1.APIResourceList{},
&metav1.WatchEvent{},
)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubeapiserver/clusterresource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alp
return
}

discoveryapis := c.restManager.LoadResources(resources)
discoveryapis := c.restManager.LoadResources(resources, cluster.Name)
c.discoveryManager.SetClusterGroupResource(cluster.Name, discoveryapis)

c.clusterresources[cluster.Name] = resources
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubeapiserver/resource_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler = handlers.GetResource(storage, reqScope)
case "list":
handler = handlers.ListResource(storage, nil, reqScope, false, r.minRequestTimeout)
case "watch":
handler = handlers.ListResource(storage, storage, reqScope, true, r.minRequestTimeout)
default:
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb),
Expand Down
Loading

0 comments on commit 0e6e290

Please sign in to comment.