Skip to content

Commit

Permalink
Use new K8s clients provider in Flux plugin (#5379)
Browse files Browse the repository at this point in the history
* Added new clients provider to Flux plugin

Signed-off-by: Rafa Castelblanque <rcastelblanq@vmware.com>

* Added qps and burst to in-cluster client

Signed-off-by: Rafa Castelblanque <rcastelblanq@vmware.com>

Signed-off-by: Rafa Castelblanque <rcastelblanq@vmware.com>
  • Loading branch information
castelblanque authored Sep 28, 2022
1 parent 0dcc8e9 commit 32749bc
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type NamespacedResourceWatcherCacheConfig struct {
Gvr schema.GroupVersionResource
// this ClientGetter is for running out-of-request interactions with the Kubernetes API server,
// such as watching for resource changes
ClientGetter clientgetter.BackgroundClientGetterFunc
ClientGetter clientgetter.FixedClusterClientProviderInterface
// 'OnAddFunc' hook is called when an object comes about and the cache does not have a
// corresponding entry. Note this maybe happen as a result of a newly created k8s object
// or a modified object for which there was no entry in the cache
Expand Down
4 changes: 3 additions & 1 deletion cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

// RegisterWithGRPCServer enables a plugin to register with a gRPC server
// returning the server implementation.
//
//nolint:deadcode
func RegisterWithGRPCServer(opts pluginsv1alpha1.GRPCPluginRegistrationOptions) (interface{}, error) {
log.Info("+fluxv2 RegisterWithGRPCServer")
Expand All @@ -25,7 +26,7 @@ func RegisterWithGRPCServer(opts pluginsv1alpha1.GRPCPluginRegistrationOptions)
// 'Shutdown' hook
stopCh := make(chan struct{})

svr, err := NewServer(opts.ConfigGetter, opts.ClustersConfig.KubeappsClusterName, stopCh, opts.PluginConfigPath)
svr, err := NewServer(opts.ConfigGetter, opts.ClustersConfig.KubeappsClusterName, stopCh, opts.PluginConfigPath, opts.ClientQPS, opts.ClientBurst)
if err != nil {
return nil, err
}
Expand All @@ -36,6 +37,7 @@ func RegisterWithGRPCServer(opts pluginsv1alpha1.GRPCPluginRegistrationOptions)

// RegisterHTTPHandlerFromEndpoint enables a plugin to register an http
// handler to translate to the gRPC request.
//
//nolint:deadcode
func RegisterHTTPHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error {
log.Info("+fluxv2 RegisterHTTPHandlerFromEndpoint")
Expand Down
16 changes: 8 additions & 8 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func (s *Server) filterReadyReposByName(repoList []sourcev1.HelmRepository, matc
}

// Notes:
// 1. with flux, an available package may be from a repo in any namespace accessible to the caller
// 2. can't rely on cache as a real source of truth for key names
// because redis may evict cache entries due to memory pressure to make room for new ones
// 1. with flux, an available package may be from a repo in any namespace accessible to the caller
// 2. can't rely on cache as a real source of truth for key names
// because redis may evict cache entries due to memory pressure to make room for new ones
func (s *Server) getChartsForRepos(ctx context.Context, match []string) (map[string][]models.Chart, error) {
repoList, err := s.listReposInAllNamespaces(ctx)
if err != nil {
Expand Down Expand Up @@ -551,7 +551,9 @@ func (s *Server) createKubeappsManagedRepoSecret(
// using owner references on the secret so that it can be
// (1) cleaned up automatically and/or
// (2) enable some control (ie. if I add a secret manually
// via kubectl before running kubeapps, it won't get deleted just
//
// via kubectl before running kubeapps, it won't get deleted just
//
// because Kubeapps is deleting it)?
// see https://github.com/vmware-tanzu/kubeapps/pull/4630#discussion_r861446394 for details
func (s *Server) setOwnerReferencesForRepoSecret(
Expand Down Expand Up @@ -753,11 +755,9 @@ func (s *Server) deleteRepo(ctx context.Context, repoRef *corev1.PackageReposito
}
}

//
// implements plug-in specific cache-related functionality
//
type repoEventSink struct {
clientGetter clientgetter.BackgroundClientGetterFunc
clientGetter clientgetter.FixedClusterClientProviderInterface
chartCache *cache.ChartCache // chartCache maybe nil only in unit tests
}

Expand Down Expand Up @@ -1282,7 +1282,7 @@ func getRepoTlsConfigAndAuthWithUserManagedSecrets(secret *apiv1.Secret) (*corev

// TODO (gfichtenolt) Per slack discussion
// In fact, keeping the existing API might mean we could return exactly what it already does today
//(i.e. all secrets) if called with an extra explicit option (includeSecrets=true in the request
// (i.e. all secrets) if called with an extra explicit option (includeSecrets=true in the request
// message, not sure, similar to kubectl config view --raw) and by default the secrets are REDACTED
// as you mention? This would mean clients will by default see only REDACTED secrets,
// but can request the full sensitive data when necessary?
Expand Down
12 changes: 6 additions & 6 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2466,9 +2466,9 @@ func (s *Server) redisMockExpectGetFromRepoCache(mock redismock.ClientMock, filt
}

func (s *Server) redisMockSetValueForRepo(mock redismock.ClientMock, repo sourcev1.HelmRepository, oldValue []byte) (key string, bytes []byte, err error) {
backgroundClientGetter := func(ctx context.Context) (clientgetter.ClientInterfaces, error) {
return s.clientGetter(ctx, s.kubeappsCluster)
}
backgroundClientGetter := &clientgetter.FixedClusterClientProvider{ClientsFunc: func(ctx context.Context) (*clientgetter.ClientGetter, error) {
return s.clientGetter.GetClients(ctx, s.kubeappsCluster)
}}
sink := repoEventSink{
clientGetter: backgroundClientGetter,
chartCache: nil,
Expand Down Expand Up @@ -2502,9 +2502,9 @@ func redisMockSetValueForRepo(mock redismock.ClientMock, key string, newValue, o
}

func (s *Server) redisKeyValueForRepo(r sourcev1.HelmRepository) (key string, byteArray []byte, err error) {
cg := func(ctx context.Context) (clientgetter.ClientInterfaces, error) {
return s.clientGetter(ctx, s.kubeappsCluster)
}
cg := &clientgetter.FixedClusterClientProvider{ClientsFunc: func(ctx context.Context) (*clientgetter.ClientGetter, error) {
return s.clientGetter.GetClients(ctx, s.kubeappsCluster)
}}
sinkNoChartCache := repoEventSink{clientGetter: cg}
return sinkNoChartCache.redisKeyValueForRepo(r)
}
Expand Down
27 changes: 15 additions & 12 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ type Server struct {
// non-test implementation.
// It is meant for in-band interactions (i.e. in the context of a caller)
// with k8s API server
clientGetter clientgetter.ClientGetterFunc
clientGetter clientgetter.ClientProviderInterface
// for interactions with k8s API server in the context of
// kubeapps-internal-kubeappsapis service account
serviceAccountClientGetter clientgetter.BackgroundClientGetterFunc
serviceAccountClientGetter clientgetter.FixedClusterClientProviderInterface

actionConfigGetter clientgetter.HelmActionConfigGetterFunc

Expand All @@ -63,7 +63,7 @@ type Server struct {

// NewServer returns a Server automatically configured with a function to obtain
// the k8s client config.
func NewServer(configGetter core.KubernetesConfigGetter, kubeappsCluster string, stopCh <-chan struct{}, pluginConfigPath string) (*Server, error) {
func NewServer(configGetter core.KubernetesConfigGetter, kubeappsCluster string, stopCh <-chan struct{}, pluginConfigPath string, clientQPS float32, clientBurst int) (*Server, error) {
log.Infof("+fluxv2 NewServer(kubeappsCluster: [%v], pluginConfigPath: [%s]",
kubeappsCluster, pluginConfigPath)

Expand Down Expand Up @@ -94,8 +94,7 @@ func NewServer(configGetter core.KubernetesConfigGetter, kubeappsCluster string,
log.Fatalf("%s", err)
}

backgroundClientGetter := clientgetter.NewBackgroundClientGetter(
configGetter, clientgetter.Options{Scheme: scheme})
backgroundClientGetter := clientgetter.NewBackgroundClientProvider(clientgetter.Options{Scheme: scheme}, clientQPS, clientBurst)

s := repoEventSink{
clientGetter: backgroundClientGetter,
Expand Down Expand Up @@ -128,9 +127,12 @@ func NewServer(configGetter core.KubernetesConfigGetter, kubeappsCluster string,
"repoCache", repoCacheConfig, redisCli, stopCh, false); err != nil {
return nil, err
} else {
clientProvider, err := clientgetter.NewClientProvider(configGetter, clientgetter.Options{Scheme: scheme})
if err != nil {
log.Fatalf("%s", err)
}
return &Server{
clientGetter: clientgetter.NewClientGetter(
configGetter, clientgetter.Options{Scheme: scheme}),
clientGetter: clientProvider,
serviceAccountClientGetter: backgroundClientGetter,
actionConfigGetter: clientgetter.NewHelmActionConfigGetter(
configGetter, kubeappsCluster),
Expand Down Expand Up @@ -644,12 +646,13 @@ func (s *Server) SetUserManagedSecrets(ctx context.Context, request *v1alpha1.Se
// aka an "out-of-band" interaction and use cases when the user wants something
// done explicitly, aka "in-band" interaction
func (s *Server) newRepoEventSink() repoEventSink {
cg := func(ctx context.Context) (clientgetter.ClientInterfaces, error) {
return s.clientGetter(ctx, s.kubeappsCluster)
}

// notice a bit of inconsistency here, we are using s.clientGetter
// (i.e. the context of the incoming request) to read the secret
cg := &clientgetter.FixedClusterClientProvider{ClientsFunc: func(ctx context.Context) (*clientgetter.ClientGetter, error) {
return s.clientGetter.GetClients(ctx, s.kubeappsCluster)
}}

// notice a bit of inconsistency here, we are using the context
// of the incoming request to read the secret
// as opposed to s.repoCache.clientGetter (which uses the context of
// User "system:serviceaccount:kubeapps:kubeapps-internal-kubeappsapis")
// which is what is used when the repo is being processed/indexed.
Expand Down
38 changes: 17 additions & 21 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package main
import (
"context"
"io"
apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/kubernetes"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -233,14 +235,11 @@ func newServerWithRepos(t *testing.T, repos []sourcev1.HelmRepository, charts []

apiextIfc := apiextfake.NewSimpleClientset(fluxHelmRepositoryCRD)
ctrlClient := newCtrlClient(repos, nil, nil)
clientGetter := func(context.Context, string) (clientgetter.ClientInterfaces, error) {
return clientgetter.
NewBuilder().
WithTyped(typedClient).
WithApiExt(apiextIfc).
WithControllerRuntime(&ctrlClient).
Build(), nil
}
clientGetter := clientgetter.NewFixedClientProvider(&clientgetter.ClientGetter{
ApiExt: func() (apiext.Interface, error) { return apiextIfc, nil },
Typed: func() (kubernetes.Interface, error) { return typedClient, nil },
ControllerRuntime: func() (ctrlclient.WithWatch, error) { return &ctrlClient, nil },
})
return newServer(t, clientGetter, nil, repos, charts)
}

Expand All @@ -255,14 +254,11 @@ func newServerWithChartsAndReleases(t *testing.T, actionConfig *action.Configura

apiextIfc := apiextfake.NewSimpleClientset(fluxHelmRepositoryCRD)
ctrlClient := newCtrlClient(nil, charts, releases)
clientGetter := func(context.Context, string) (clientgetter.ClientInterfaces, error) {
return clientgetter.
NewBuilder().
WithApiExt(apiextIfc).
WithTyped(typedClient).
WithControllerRuntime(&ctrlClient).
Build(), nil
}
clientGetter := clientgetter.NewFixedClientProvider(&clientgetter.ClientGetter{
ApiExt: func() (apiext.Interface, error) { return apiextIfc, nil },
Typed: func() (kubernetes.Interface, error) { return typedClient, nil },
ControllerRuntime: func() (ctrlclient.WithWatch, error) { return &ctrlClient, nil },
})
return newServer(t, clientGetter, actionConfig, nil, nil)
}

Expand Down Expand Up @@ -318,7 +314,7 @@ func newHelmActionConfig(t *testing.T, namespace string, rels []helmReleaseStub)
// (unlike charts or releases) is that repos are treated special because
// a new instance of a Server object is only returned once the cache has been synced with indexed repos
func newServer(t *testing.T,
clientGetter clientgetter.ClientGetterFunc,
clientGetter clientgetter.ClientProviderInterface,
actionConfig *action.Configuration,
repos []sourcev1.HelmRepository,
charts []testSpecChartWithUrl) (*Server, redismock.ClientMock, error) {
Expand All @@ -332,14 +328,14 @@ func newServer(t *testing.T,
if clientGetter != nil {
// if client getter returns an error, FLUSHDB call does not take place, because
// newCacheWithRedisClient() raises an error before redisCli.FlushDB() call
if _, err := clientGetter(context.TODO(), ""); err == nil {
if _, err := clientGetter.GetClients(context.TODO(), ""); err == nil {
mock.ExpectFlushDB().SetVal("OK")
}
}

backgroundClientGetter := func(ctx context.Context) (clientgetter.ClientInterfaces, error) {
return clientGetter(ctx, KubeappsCluster)
}
backgroundClientGetter := &clientgetter.FixedClusterClientProvider{ClientsFunc: func(ctx context.Context) (*clientgetter.ClientGetter, error) {
return clientGetter.GetClients(ctx, KubeappsCluster)
}}

sink := repoEventSink{
clientGetter: backgroundClientGetter,
Expand Down
Loading

0 comments on commit 32749bc

Please sign in to comment.