Skip to content

Commit

Permalink
Deploy es-kube-controllers in a multi-tenant environment
Browse files Browse the repository at this point in the history
  • Loading branch information
asincu committed Feb 8, 2024
1 parent c68c5cc commit 07465e7
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 58 deletions.
114 changes: 83 additions & 31 deletions pkg/controller/logstorage/kubecontrollers/es_kube_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"fmt"
"time"

"github.com/tigera/operator/pkg/controller/tenancy"

core "k8s.io/api/core/v1"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
"github.com/tigera/operator/pkg/render/logstorage/esgateway"
Expand Down Expand Up @@ -61,6 +65,7 @@ type ESKubeControllersController struct {
clusterDomain string
usePSP bool
elasticExternal bool
multiTenant bool
tierWatchReady *utils.ReadyFlag
}

Expand All @@ -69,17 +74,14 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return nil
}

if opts.MultiTenant {
return nil
}

// Create the reconciler
r := &ESKubeControllersController{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
clusterDomain: opts.ClusterDomain,
status: status.New(mgr.GetClient(), "log-storage-kubecontrollers", opts.KubernetesVersion),
elasticExternal: opts.ElasticExternal,
multiTenant: opts.MultiTenant,
tierWatchReady: &utils.ReadyFlag{},
}
r.status.Run(opts.ShutdownContext)
Expand All @@ -90,8 +92,12 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return err
}

// Determine how to handle watch events for cluster-scoped resources.
// Determine how to handle watch events for cluster-scoped resources. For multi-tenant clusters,
// we should update all tenants whenever one changes. For single-tenant clusters, we can just queue the object.
var eventHandler handler.EventHandler = &handler.EnqueueRequestForObject{}
if opts.MultiTenant {
eventHandler = utils.EnqueueAllTenants(mgr.GetClient())
}

// Configure watches for operator.tigera.io APIs this controller cares about.
if err = c.Watch(&source.Kind{Type: &operatorv1.LogStorage{}}, eventHandler); err != nil {
Expand All @@ -109,13 +115,27 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
if err = utils.AddTigeraStatusWatch(c, "log-storage-kubecontrollers"); err != nil {
return fmt.Errorf("logstorage-controller failed to watch logstorage Tigerastatus: %w", err)
}
if opts.MultiTenant {
if err = c.Watch(&source.Kind{Type: &operatorv1.Tenant{}}, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch Tenant resource: %w", err)
}
}

// The namespace(s) we need to monitor depend upon what tenancy mode we're running in.
// For single-tenant, everything is installed in the calico-system namespace.
// Make a helper for determining which namespaces to use based on tenancy mode.
esGatewayHelper := utils.NewNamespaceHelper(opts.MultiTenant, render.ElasticsearchNamespace, "")
helper := utils.NewNamespaceHelper(opts.MultiTenant, common.CalicoNamespace, "")

// Watch secrets this controller cares about.
secretsToWatch := []string{
render.TigeraElasticsearchGatewaySecret,
monitor.PrometheusClientTLSSecretName,
}
for _, ns := range []string{common.OperatorNamespace(), render.ElasticsearchNamespace} {

// Determine namespaces to watch.
_, _, namespacesToWatch := tenancy.GetWatchNamespaces(r.multiTenant, render.ElasticsearchNamespace)
for _, ns := range namespacesToWatch {
for _, name := range secretsToWatch {
if err := utils.AddSecretsWatch(c, name, ns); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch Secret: %w", err)
Expand All @@ -124,15 +144,21 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
}

// Catch if something modifies the resources that this controller consumes.
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil {
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, esGatewayHelper.InstallNamespace()); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil {
if err := utils.AddServiceWatch(c, esgateway.ServiceName, esGatewayHelper.InstallNamespace()); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, common.CalicoNamespace, &handler.EnqueueRequestForObject{}); err != nil {
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, helper.InstallNamespace(), &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddDeploymentWatch(c, esgateway.DeploymentName, helper.InstallNamespace()); err != nil {
return fmt.Errorf("log-storage-access-controller failed to watch the Service resource: %w", err)
}
if err := utils.AddDeploymentWatch(c, kubecontrollers.EsKubeController, helper.InstallNamespace()); err != nil {
return fmt.Errorf("log-storage-access-controller failed to watch the Service resource: %w", err)
}

// Perform periodic reconciliation. This acts as a backstop to catch reconcile issues,
// and also makes sure we spot when things change that might not trigger a reconciliation.
Expand All @@ -149,22 +175,40 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
// Start goroutines to establish watches against projectcalico.org/v3 resources.
go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace},
{Name: esgateway.PolicyName, Namespace: esGatewayHelper.InstallNamespace()},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: helper.InstallNamespace()},
})

return nil
}

func (r *ESKubeControllersController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
helper := utils.NewNamespaceHelper(false, common.CalicoNamespace, request.Namespace)
helper := utils.NewNamespaceHelper(r.multiTenant, common.CalicoNamespace, request.Namespace)
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name, "installNS", helper.InstallNamespace(), "truthNS", helper.TruthNamespace())
reqLogger.Info("Reconciling LogStorage - ESKubeControllers")

// We skip requests without a namespace specified in multi-tenant setups.
if r.multiTenant && request.Namespace == "" {
return reconcile.Result{}, nil
}

// When running in multi-tenant mode, we need to install es-kubecontrollers in tenant Namespaces. However, the LogStorage
// resource is still cluster-scoped (since ES is a cluster-wide resource), so we need to look elsewhere to determine
// which tenant namespaces require an es-kubecontrollers instance. We use the tenant API to determine the set of
// namespaces that should have an es-kubecontrollers.
tenant, _, err := utils.GetTenant(ctx, r.multiTenant, r.client, request.Namespace)
if errors.IsNotFound(err) {
reqLogger.V(1).Info("No Tenant in this Namespace, skip")
return reconcile.Result{}, nil
} else if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "An error occurred while querying Tenant", err, reqLogger)
return reconcile.Result{}, err
}

// Get LogStorage resource.
logStorage := &operatorv1.LogStorage{}
key := utils.DefaultTSEEInstanceKey
err := r.client.Get(ctx, key, logStorage)
err = r.client.Get(ctx, key, logStorage)
if err != nil {
if errors.IsNotFound(err) {
r.status.OnCRNotFound()
Expand Down Expand Up @@ -242,15 +286,19 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
}

// Get secrets needed for kube-controllers to talk to elastic.
kubeControllersUserSecret, err := utils.GetSecret(ctx, r.client, kubecontrollers.ElasticsearchKubeControllersUserSecret, helper.TruthNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to get kube controllers gateway secret", err, reqLogger)
return reconcile.Result{}, err
var kubeControllersUserSecret *core.Secret
if !r.multiTenant {
kubeControllersUserSecret, err = utils.GetSecret(ctx, r.client, kubecontrollers.ElasticsearchKubeControllersUserSecret, helper.TruthNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to get kube controllers gateway secret", err, reqLogger)
return reconcile.Result{}, err
}
}

// Collect the certificates we need to provision es-kube-controllers. These will have been provisioned already by the ES secrets controller.
opts := []certificatemanager.Option{
certificatemanager.WithLogger(reqLogger),
certificatemanager.WithTenant(tenant),
}
cm, err := certificatemanager.Create(r.client, install, r.clusterDomain, helper.TruthNamespace(), opts...)
if err != nil {
Expand Down Expand Up @@ -280,20 +328,23 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
return reconcile.Result{}, err
}

// ESGateway is required in order for kube-controllers to operate successfully, since es-kube-controllers talks to ES
// via this gateway. However, in multi-tenant mode we disable the elasticsearch controller and so this isn't needed.
if err := r.createESGateway(
ctx,
utils.NewSingleTenantNamespaceHelper(render.ElasticsearchNamespace),
install,
variant,
pullSecrets,
hdler,
reqLogger,
trustedBundle,
r.usePSP,
); err != nil {
return reconcile.Result{}, err
if !r.multiTenant {
// ESGateway is required in order for kube-controllers to operate successfully, since es-kube-controllers talks to ES
// via this gateway. However, in multi-tenant mode, es-kube-controllers doesn't talk to elasticsearch,
// so this is only needed in single-tenant clusters.
if err := r.createESGateway(
ctx,
utils.NewSingleTenantNamespaceHelper(render.ElasticsearchNamespace),
install,
variant,
pullSecrets,
hdler,
reqLogger,
trustedBundle,
r.usePSP,
); err != nil {
return reconcile.Result{}, err
}
}

// Determine the namespaces to which we must bind the cluster role.
Expand All @@ -313,6 +364,7 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
TrustedBundle: trustedBundle,
Namespace: helper.InstallNamespace(),
BindingNamespaces: namespaces,
Tenant: tenant,
}
esKubeControllerComponents := kubecontrollers.NewElasticsearchKubeControllers(&kubeControllersCfg)

Expand Down
111 changes: 95 additions & 16 deletions pkg/controller/logstorage/kubecontrollers/es_kube_controllers_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Tigera, Inc. All rights reserved.
// Copyright (c) 2023,2024 Tigera, Inc. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,9 +18,10 @@ import (
"context"
"fmt"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

controllerruntime "sigs.k8s.io/controller-runtime"
v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -83,6 +84,7 @@ func NewControllerWithShims(
status: status,
clusterDomain: opts.ClusterDomain,
tierWatchReady: tierWatchReady,
multiTenant: multiTenant,
}
r.status.Run(opts.ShutdownContext)
return r, nil
Expand Down Expand Up @@ -281,19 +283,6 @@ var _ = Describe("LogStorage ES kube-controllers controller", func() {
Expect(kc.Image).To(Equal(fmt.Sprintf("some.registry.org/%s@%s", components.ComponentTigeraKubeControllers.Image, "sha256:kubecontrollershash")))
})

It("should not create es-kube-controller controller in multi-tenant mode", func() {
mgr, err := controllerruntime.NewManager(nil, controllerruntime.Options{})
Expect(err).To(Not(BeNil()))

options := options.AddOptions{
MultiTenant: true,
EnterpriseCRDExists: true,
}

err = Add(mgr, options)
Expect(err).To(BeNil())
})

Context("External ES mode", func() {
BeforeEach(func() {
// Delete the Elasticsearch CR. This is created for ECK only.
Expand Down Expand Up @@ -351,4 +340,94 @@ var _ = Describe("LogStorage ES kube-controllers controller", func() {
Expect(test.GetResource(cli, &dep)).To(BeNil())
})
})

Context("Multi-tenant", func() {
var (
tenantNS string
tenant *operatorv1.Tenant
)

BeforeEach(func() {
tenantNS = "tenant-namespace"
Expect(cli.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: tenantNS}})).ShouldNot(HaveOccurred())

// Create the Tenant object.
tenant = &operatorv1.Tenant{}
tenant.Name = "default"
tenant.Namespace = tenantNS
tenant.Spec.ID = "test-tenant-id"
tenant.Spec.Indices = []operatorv1.Index{}
Expect(cli.Create(ctx, tenant)).ShouldNot(HaveOccurred())

// Create a CA secret for the test, and create its KeyPair.
opts := []certificatemanager.Option{
certificatemanager.AllowCACreation(),
certificatemanager.WithTenant(tenant),
}
cm, err := certificatemanager.Create(cli, &install.Spec, dns.DefaultClusterDomain, tenantNS, opts...)
Expect(err).ShouldNot(HaveOccurred())
Expect(cli.Create(ctx, cm.KeyPair().Secret(tenantNS))).ShouldNot(HaveOccurred())
bundle := cm.CreateTrustedBundle()
Expect(cli.Create(ctx, bundle.ConfigMap(tenantNS))).ShouldNot(HaveOccurred())

// Create the reconciler for the tests.
r, err = NewControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, true, readyFlag)
Expect(err).ShouldNot(HaveOccurred())
})

It("should wait for the tenant CA to be provisioned", func() {
// Delete the CA secret for this test.
caSecret := &corev1.Secret{}
caSecret.Name = certificatemanagement.TenantCASecretName
caSecret.Namespace = tenantNS
Expect(cli.Delete(ctx, caSecret)).ShouldNot(HaveOccurred())

// Run the reconciler.
_, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "default", Namespace: tenantNS}})
Expect(err).Should(HaveOccurred())
Expect(err.Error()).Should(ContainSubstring("CA secret"))
})

It("should not reconcile any resources if no Namespace was given", func() {
// Run the reconciler, passing in a Request with no Namespace. It should return successfully.
_, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "default"}})
Expect(err).ShouldNot(HaveOccurred())

// Check that nothing was installed on the cluster.
dep := appsv1.Deployment{
TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "apps/v1"},
ObjectMeta: metav1.ObjectMeta{
Name: kubecontrollers.EsKubeController,
Namespace: tenantNS,
},
}
err = cli.Get(ctx, types.NamespacedName{Name: dep.Name, Namespace: dep.Namespace}, &dep)
Expect(err).Should(HaveOccurred())
Expect(errors.IsNotFound(err)).Should(BeTrue())

// Check that OnCRFound was not called.
mockStatus.AssertNotCalled(GinkgoT(), "OnCRFound")
})

It("should reconcile resources for a cluster", func() {
// Run the reconciler.
result, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "default", Namespace: tenantNS}})
Expect(err).ShouldNot(HaveOccurred())
Expect(result).Should(Equal(successResult))

// SetDegraded should not have been called.
mockStatus.AssertNumberOfCalls(GinkgoT(), "SetDegraded", 0)

// Check that kube-controllers was created as expected. We don't need to check every resource in detail, since
// the render package has its own tests which cover this in more detail.
dep := appsv1.Deployment{
TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "apps/v1"},
ObjectMeta: metav1.ObjectMeta{
Name: kubecontrollers.EsKubeController,
Namespace: tenantNS,
},
}
Expect(test.GetResource(cli, &dep)).To(BeNil())
})
})
})
Loading

0 comments on commit 07465e7

Please sign in to comment.