Skip to content

Commit

Permalink
Deploy es-kube-controllers in a multi-tenant environment (#3142)
Browse files Browse the repository at this point in the history
* Deploy es-kube-controllers in a multi-tenant environment

* [CODE REVIEW] Cosmetic changes

* [CODE REVIEW] Do not write KubeControllerConfig

* [CODE REVIEW] Update comments

* [CODE REVIEW] Update comments

* [CODE REVIEW] Update tests

* [CODE REVIEW] Typo
  • Loading branch information
asincu authored Mar 8, 2024
1 parent 7e8a795 commit 319da40
Show file tree
Hide file tree
Showing 4 changed files with 403 additions and 74 deletions.
148 changes: 100 additions & 48 deletions pkg/controller/logstorage/kubecontrollers/es_kube_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -32,15 +34,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"

operatorv1 "github.com/tigera/operator/api/v1"
"github.com/tigera/operator/pkg/common"
"github.com/tigera/operator/pkg/controller/certificatemanager"
"github.com/tigera/operator/pkg/controller/k8sapi"
"github.com/tigera/operator/pkg/controller/logstorage/initializer"
"github.com/tigera/operator/pkg/controller/options"
"github.com/tigera/operator/pkg/controller/status"
"github.com/tigera/operator/pkg/controller/tenancy"
"github.com/tigera/operator/pkg/controller/utils"
"github.com/tigera/operator/pkg/controller/utils/imageset"
"github.com/tigera/operator/pkg/ctrlruntime"
Expand All @@ -61,6 +62,7 @@ type ESKubeControllersController struct {
clusterDomain string
usePSP bool
elasticExternal bool
multiTenant bool
tierWatchReady *utils.ReadyFlag
}

Expand All @@ -69,17 +71,14 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return nil
}

if opts.MultiTenant {
return nil
}

// Create the reconciler
r := &ESKubeControllersController{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
clusterDomain: opts.ClusterDomain,
status: status.New(mgr.GetClient(), initializer.TigeraStatusLogStorageKubeController, opts.KubernetesVersion),
elasticExternal: opts.ElasticExternal,
multiTenant: opts.MultiTenant,
tierWatchReady: &utils.ReadyFlag{},
}
r.status.Run(opts.ShutdownContext)
Expand All @@ -90,8 +89,12 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return err
}

// Determine how to handle watch events for cluster-scoped resources.
// Determine how to handle watch events for cluster-scoped resources. For multi-tenant clusters,
// we should update all tenants whenever one changes. For single-tenant clusters, we can just queue the object.
var eventHandler handler.EventHandler = &handler.EnqueueRequestForObject{}
if opts.MultiTenant {
eventHandler = utils.EnqueueAllTenants(mgr.GetClient())
}

// Configure watches for operator.tigera.io APIs this controller cares about.
if err = c.WatchObject(&operatorv1.LogStorage{}, eventHandler); err != nil {
Expand All @@ -109,32 +112,40 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
if err = utils.AddTigeraStatusWatch(c, initializer.TigeraStatusLogStorageKubeController); err != nil {
return fmt.Errorf("logstorage-controller failed to watch logstorage Tigerastatus: %w", err)
}
if opts.MultiTenant {
if err = c.WatchObject(&operatorv1.Tenant{}, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch Tenant resource: %w", err)
}
}

// Watch secrets this controller cares about.
secretsToWatch := []string{
render.TigeraElasticsearchGatewaySecret,
monitor.PrometheusClientTLSSecretName,
}
for _, ns := range []string{common.OperatorNamespace(), render.ElasticsearchNamespace} {

// Determine namespaces to watch.
_, _, namespacesToWatch := tenancy.GetWatchNamespaces(r.multiTenant, render.ElasticsearchNamespace)
for _, ns := range namespacesToWatch {
for _, name := range secretsToWatch {
if err := utils.AddSecretsWatch(c, name, ns); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch Secret: %w", err)
}
}
}

// Catch if something modifies the resources that this controller consumes.
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil {
// The namespace(s) we need to monitor depend upon what tenancy mode we're running in.
// For single-tenant, everything is installed in the calico-system namespace.
// Make a helper for determining which namespaces to use based on tenancy mode.
esKubeControllersNamespace := utils.NewNamespaceHelper(opts.MultiTenant, common.CalicoNamespace, "")
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, esKubeControllersNamespace.InstallNamespace(), &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, common.CalicoNamespace, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the ConfigMap resource: %w", err)
if err := utils.AddDeploymentWatch(c, esgateway.DeploymentName, esKubeControllersNamespace.InstallNamespace()); err != nil {
return fmt.Errorf("log-storage-access-controller failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, render.ElasticsearchNamespace, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the ConfigMap resource: %w", err)
if err := utils.AddDeploymentWatch(c, kubecontrollers.EsKubeController, esKubeControllersNamespace.InstallNamespace()); err != nil {
return fmt.Errorf("log-storage-access-controller failed to watch the Service resource: %w", err)
}

// Perform periodic reconciliation. This acts as a backstop to catch reconcile issues,
Expand All @@ -149,25 +160,58 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return fmt.Errorf("log-storage-kubecontrollers failed to establish a connection to k8s: %w", err)
}

if !opts.MultiTenant {
// Catch if something modifies the resources that this controller consumes.
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, render.ElasticsearchNamespace, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the ConfigMap resource: %w", err)
}
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
})
}

// Start goroutines to establish watches against projectcalico.org/v3 resources.
go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: esKubeControllersNamespace.InstallNamespace()},
})

return nil
}

func (r *ESKubeControllersController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
helper := utils.NewNamespaceHelper(false, common.CalicoNamespace, request.Namespace)
helper := utils.NewNamespaceHelper(r.multiTenant, common.CalicoNamespace, request.Namespace)
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name, "installNS", helper.InstallNamespace(), "truthNS", helper.TruthNamespace())
reqLogger.Info("Reconciling LogStorage - ESKubeControllers")

// We skip requests without a namespace specified in multi-tenant setups.
if r.multiTenant && request.Namespace == "" {
return reconcile.Result{}, nil
}

// When running in multi-tenant mode, we need to install es-kubecontrollers in tenant Namespaces. However, the LogStorage
// resource is still cluster-scoped (since ES is a cluster-wide resource), so we need to look elsewhere to determine
// which tenant namespaces require an es-kubecontrollers instance. We use the tenant API to determine the set of
// namespaces that should have an es-kubecontrollers.
tenant, _, err := utils.GetTenant(ctx, r.multiTenant, r.client, request.Namespace)
if errors.IsNotFound(err) {
reqLogger.V(1).Info("No Tenant in this Namespace, skip")
return reconcile.Result{}, nil
} else if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "An error occurred while querying Tenant", err, reqLogger)
return reconcile.Result{}, err
}

// Get LogStorage resource.
logStorage := &operatorv1.LogStorage{}
key := utils.DefaultTSEEInstanceKey
err := r.client.Get(ctx, key, logStorage)
err = r.client.Get(ctx, key, logStorage)
if err != nil {
if errors.IsNotFound(err) {
r.status.OnCRNotFound()
Expand Down Expand Up @@ -244,16 +288,21 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
}
}

// Get secrets needed for kube-controllers to talk to elastic.
kubeControllersUserSecret, err := utils.GetSecret(ctx, r.client, kubecontrollers.ElasticsearchKubeControllersUserSecret, helper.TruthNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to get kube controllers gateway secret", err, reqLogger)
return reconcile.Result{}, err
// Get secrets needed for kube-controllers to talk to elastic. This is needed for zero-tenants and single-tenants
// that deploy es-kube-controllers and need to talk to es-gateway
var kubeControllersUserSecret *core.Secret
if !r.multiTenant {
kubeControllersUserSecret, err = utils.GetSecret(ctx, r.client, kubecontrollers.ElasticsearchKubeControllersUserSecret, helper.TruthNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to get kube controllers gateway secret", err, reqLogger)
return reconcile.Result{}, err
}
}

// Collect the certificates we need to provision es-kube-controllers. These will have been provisioned already by the ES secrets controller.
opts := []certificatemanager.Option{
certificatemanager.WithLogger(reqLogger),
certificatemanager.WithTenant(tenant),
}
cm, err := certificatemanager.Create(r.client, install, r.clusterDomain, helper.TruthNamespace(), opts...)
if err != nil {
Expand All @@ -270,40 +319,42 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
return reconcile.Result{}, err
}

gwNSHelper := utils.NewSingleTenantNamespaceHelper(render.ElasticsearchNamespace)
// Query the trusted bundle from the namespace.
gwTrustedBundle, err := cm.LoadTrustedBundle(ctx, r.client, gwNSHelper.InstallNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, fmt.Sprintf("Error getting trusted bundle in %s", gwNSHelper.InstallNamespace()), err, reqLogger)
return reconcile.Result{}, err
}

pullSecrets, err := utils.GetNetworkingPullSecrets(install, r.client)
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "An error occurring while retrieving the pull secrets", err, reqLogger)
return reconcile.Result{}, err
}

// ESGateway is required in order for kube-controllers to operate successfully, since es-kube-controllers talks to ES
// via this gateway. However, in multi-tenant mode we disable the elasticsearch controller and so this isn't needed.
if err := r.createESGateway(
ctx,
gwNSHelper,
install,
variant,
pullSecrets,
hdler,
reqLogger,
gwTrustedBundle,
r.usePSP,
); err != nil {
return reconcile.Result{}, err
// via this gateway. However, in multi-tenant mode, es-kube-controllers doesn't talk to elasticsearch,
// so this is only needed in single-tenant clusters and zero tenants clusters
if !r.multiTenant {
gwNSHelper := utils.NewSingleTenantNamespaceHelper(render.ElasticsearchNamespace)
// Query the trusted bundle from the namespace.
gwTrustedBundle, err := cm.LoadTrustedBundle(ctx, r.client, gwNSHelper.InstallNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, fmt.Sprintf("Error getting trusted bundle in %s", gwNSHelper.InstallNamespace()), err, reqLogger)
return reconcile.Result{}, err
}
if err := r.createESGateway(
ctx,
gwNSHelper,
install,
variant,
pullSecrets,
hdler,
reqLogger,
gwTrustedBundle,
r.usePSP,
); err != nil {
return reconcile.Result{}, err
}
}

// Query the trusted bundle from the namespace.
trustedBundle, err := cm.LoadTrustedBundle(ctx, r.client, helper.InstallNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, fmt.Sprintf("Error getting trusted bundle in %s", gwNSHelper.InstallNamespace()), err, reqLogger)
r.status.SetDegraded(operatorv1.ResourceReadError, fmt.Sprintf("Error getting trusted bundle in %s", helper.InstallNamespace()), err, reqLogger)
return reconcile.Result{}, err
}

Expand All @@ -324,6 +375,7 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
TrustedBundle: trustedBundle,
Namespace: helper.InstallNamespace(),
BindingNamespaces: namespaces,
Tenant: tenant,
}
esKubeControllerComponents := kubecontrollers.NewElasticsearchKubeControllers(&kubeControllersCfg)

Expand Down
Loading

0 comments on commit 319da40

Please sign in to comment.