Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Co-authored-by: duanmeng <duanmeng_yewu@cmss.chinamobile.com>
Co-authored-by: wuyingjun <wuyingjun_yewu@cmss.chinamobile.com>
Co-authored-by: hanweisen <hanweisen_yewu@cmss.chinamobile.com>
Signed-off-by: zhangyongxi <zhangyongxi_yewu@cmss.chinamobile.com>
  • Loading branch information
4 people committed Sep 16, 2022
1 parent 33f0745 commit 6cea7c2
Show file tree
Hide file tree
Showing 21 changed files with 311 additions and 283 deletions.
2 changes: 1 addition & 1 deletion cmd/apiserver/app/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command {
}
cliflag.PrintFlags(cmd.Flags())

config, err := opts.Config(false)
config, err := opts.Config()
if err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions cmd/apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (o *ClusterPediaServerOptions) Validate() error {
return utilerrors.NewAggregate(errors)
}

func (o *ClusterPediaServerOptions) Config(bindingSyncController bool) (*apiserver.Config, error) {
func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
if err := o.Validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -109,9 +109,8 @@ func (o *ClusterPediaServerOptions) Config(bindingSyncController bool) (*apiserv
}

return &apiserver.Config{
GenericConfig: genericConfig,
StorageFactory: storage,
BindingSyncController: bindingSyncController,
GenericConfig: genericConfig,
StorageFactory: storage,
}, nil
}

Expand Down
23 changes: 21 additions & 2 deletions cmd/binding-apiserver/app/binding_apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"fmt"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -13,6 +14,8 @@ import (
"k8s.io/component-base/term"

"github.com/clusterpedia-io/clusterpedia/cmd/apiserver/app/options"
"github.com/clusterpedia-io/clusterpedia/pkg/generated/clientset/versioned"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
)
Expand All @@ -32,12 +35,28 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command {
}
cliflag.PrintFlags(cmd.Flags())

config, err := opts.Config(true)
config, err := opts.Config()
if err != nil {
return err
}

server, err := config.Complete().New()
completedConfig := config.Complete()
if completedConfig.ClientConfig == nil {
return fmt.Errorf("CompletedConfig.New() called with config.ClientConfig == nil")
}
if completedConfig.StorageFactory == nil {
return fmt.Errorf("CompletedConfig.New() called with config.StorageFactory == nil")
}

crdclient, err := versioned.NewForConfig(completedConfig.ClientConfig)
if err != nil {
return err
}

synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory)
go synchromanager.Run(1, ctx.Done())

server, err := completedConfig.New()
if err != nil {
return err
}
Expand Down
24 changes: 24 additions & 0 deletions deploy/binding-apiserver/clusterpedia_binding_apiserver_rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: clusterpedia
rules:
- apiGroups: ['*']
resources: ['*']
verbs: ["*"]
- nonResourceURLs: ['*']
verbs: ["get"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: clusterpedia
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: clusterpedia
subjects:
- kind: ServiceAccount
name: clusterpedia-binding-apiserver
namespace: clusterpedia-system
3 changes: 0 additions & 3 deletions deploy/clusterpedia_apiserver_rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,3 @@ subjects:
- kind: ServiceAccount
name: clusterpedia-controller-manager
namespace: clusterpedia-system
- kind: ServiceAccount
name: clusterpedia-binding-apiserver
namespace: clusterpedia-system
19 changes: 0 additions & 19 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package apiserver

import (
"context"
"fmt"
"net/http"

metainternal "k8s.io/apimachinery/pkg/apis/meta/internalversion"
Expand All @@ -26,7 +25,6 @@ import (
informers "github.com/clusterpedia-io/clusterpedia/pkg/generated/informers/externalversions"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/filters"
)

Expand Down Expand Up @@ -65,8 +63,6 @@ type Config struct {
GenericConfig *genericapiserver.RecommendedConfig

StorageFactory storage.StorageFactory
// BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager
BindingSyncController bool
}

type ClusterPediaServer struct {
Expand All @@ -78,8 +74,6 @@ type completedConfig struct {

ClientConfig *clientrest.Config
StorageFactory storage.StorageFactory
// BindingSyncController means whether apiserver or binding_apiserver should process and sync events as clustersynchro_manager
BindingSyncController bool
}

// CompletedConfig embeds a private pointer that cannot be instantiated outside of this package.
Expand All @@ -93,7 +87,6 @@ func (cfg *Config) Complete() CompletedConfig {
cfg.GenericConfig.Complete(),
cfg.GenericConfig.ClientConfig,
cfg.StorageFactory,
cfg.BindingSyncController,
}

c.GenericConfig.Version = &version.Info{
Expand All @@ -105,13 +98,6 @@ func (cfg *Config) Complete() CompletedConfig {
}

func (config completedConfig) New() (*ClusterPediaServer, error) {
if config.ClientConfig == nil {
return nil, fmt.Errorf("CompletedConfig.New() called with config.ClientConfig == nil")
}
if config.StorageFactory == nil {
return nil, fmt.Errorf("CompletedConfig.New() called with config.StorageFactory == nil")
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -167,11 +153,6 @@ func (config completedConfig) New() (*ClusterPediaServer, error) {
clusterpediaInformerFactory.Start(context.StopCh)
clusterpediaInformerFactory.WaitForCacheSync(context.StopCh)

if config.BindingSyncController {
synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory)
synchromanager.Run(1, context.StopCh)
}

return nil
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubeapiserver/clusterresource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alp
return
}

discoveryapis := c.restManager.LoadResources(resources, cluster.Name)
discoveryapis := c.restManager.LoadResources(resources)
c.discoveryManager.SetClusterGroupResource(cluster.Name, discoveryapis)

c.clusterresources[cluster.Name] = resources
Expand Down
57 changes: 1 addition & 56 deletions pkg/kubeapiserver/resourcerest/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"github.com/clusterpedia-io/api/clusterpedia/v1beta1"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/printers"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/negotiation"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/request"
utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch"
)

type RESTStorage struct {
Expand Down Expand Up @@ -127,58 +125,5 @@ func (s *RESTStorage) ConvertToTable(ctx context.Context, object runtime.Object,
}

func (s *RESTStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
resourceversion := options.ResourceVersion
watchRV, err := cache.NewClusterResourceVersionFromString(resourceversion)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}

watcher := cache.NewCacheWatcher(100)
watchCache := s.Storage.GetStorageConfig().WatchCache
watchCache.Lock()
defer watchCache.Unlock()

initEvents, err := watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}

func() {
watchCache.WatchersLock.Lock()
defer watchCache.WatchersLock.Unlock()

watchCache.WatchersBuffer = append(watchCache.WatchersBuffer, watcher)
}()

go watcher.Process(ctx, initEvents)
return watcher, nil
}

type errWatcher struct {
result chan watch.Event
}

func newErrWatcher(err error) *errWatcher {
errEvent := utilwatch.NewErrorEvent(err)

// Create a watcher with room for a single event, populate it, and close the channel
watcher := &errWatcher{result: make(chan watch.Event, 1)}
watcher.result <- errEvent
close(watcher.result)

return watcher
}

func (c *errWatcher) ResultChan() <-chan watch.Event {
return c.result
}

func (c *errWatcher) Stop() {
// no-op
return s.Storage.Watch(ctx, options)
}
16 changes: 7 additions & 9 deletions pkg/kubeapiserver/restmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m *RESTManager) GetRESTResourceInfo(gvr schema.GroupVersionResource) RESTR
return infos[gvr]
}

func (m *RESTManager) LoadResources(infos ResourceInfoMap, cluster string) map[schema.GroupResource]discovery.ResourceDiscoveryAPI {
func (m *RESTManager) LoadResources(infos ResourceInfoMap) map[schema.GroupResource]discovery.ResourceDiscoveryAPI {
apigroups := m.groups.Load().(map[string]metav1.APIGroup)
apiresources := m.resources.Load().(map[schema.GroupResource]metav1.APIResource)
restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo)
Expand Down Expand Up @@ -175,7 +175,7 @@ func (m *RESTManager) LoadResources(infos ResourceInfoMap, cluster string) map[s
m.addAPIResourcesLocked(addedAPIResources)
}
if len(addedInfos) != 0 {
m.addRESTResourceInfosLocked(addedInfos, cluster)
m.addRESTResourceInfosLocked(addedInfos)
}
return apis
}
Expand Down Expand Up @@ -208,7 +208,7 @@ func (m *RESTManager) addAPIResourcesLocked(addedResources map[schema.GroupResou
m.resources.Store(resources)
}

func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo, cluster string) {
func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVersionResource]RESTResourceInfo) {
restinfos := m.restResourceInfos.Load().(map[schema.GroupVersionResource]RESTResourceInfo)

infos := make(map[schema.GroupVersionResource]RESTResourceInfo, len(restinfos)+len(addedInfos))
Expand All @@ -225,9 +225,9 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers
var err error
var storage *resourcerest.RESTStorage
if resourcescheme.LegacyResourceScheme.IsGroupRegistered(gvr.Group) {
storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind, cluster)
storage, err = m.genLegacyResourceRESTStorage(gvr, info.APIResource.Kind)
} else {
storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind, cluster)
storage, err = m.genCustomResourceRESTStorage(gvr, info.APIResource.Kind)
}
if err != nil {
klog.ErrorS(err, "Failed to gen resource rest storage", "gvr", gvr, "kind", info.APIResource.Kind)
Expand Down Expand Up @@ -263,12 +263,11 @@ func (m *RESTManager) addRESTResourceInfosLocked(addedInfos map[schema.GroupVers
m.restResourceInfos.Store(infos)
}

func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) {
func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResource, kind string) (*resourcerest.RESTStorage, error) {
storageConfig, err := m.resourcetSorageConfig.NewLegacyResourceConfig(gvr.GroupResource())
if err != nil {
return nil, err
}
storageConfig.Cluster = cluster

resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig)
if err != nil {
Expand All @@ -291,12 +290,11 @@ func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResour
}, nil
}

func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind, cluster string) (*resourcerest.RESTStorage, error) {
func (m *RESTManager) genCustomResourceRESTStorage(gvr schema.GroupVersionResource, kind string) (*resourcerest.RESTStorage, error) {
storageConfig, err := m.resourcetSorageConfig.NewCustomResourceConfig(gvr)
if err != nil {
return nil, err
}
storageConfig.Cluster = cluster

resourceStorage, err := m.storageFactory.NewResourceStorage(storageConfig)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/internalstorage/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func init() {
}

func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
if configPath == "" {
return nil, fmt.Errorf("configPath should not be empty")
}

cfg := &Config{}
if err := configor.Load(cfg, configPath); err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/internalstorage/resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (

"gorm.io/gorm"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
genericstorage "k8s.io/apiserver/pkg/storage"

internal "github.com/clusterpedia-io/api/clusterpedia"
Expand Down Expand Up @@ -272,6 +274,10 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
return nil
}

func (s *ResourceStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
return nil, nil
}

func applyListOptionsToResourceQuery(db *gorm.DB, query *gorm.DB, opts *internal.ListOptions) (int64, *int64, *gorm.DB, error) {
applyFn := func(query *gorm.DB, opts *internal.ListOptions) (*gorm.DB, error) {
query, err := applyOwnerToResourceQuery(db, query, opts)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/internalstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna
}
return crs, nil
}

func (s *StorageFactory) PrepareCluster(cluster string) error {
return nil
}
Loading

0 comments on commit 6cea7c2

Please sign in to comment.