Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deploy es-kube-controllers in a multi-tenant environment #3142

Merged
merged 7 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 100 additions & 48 deletions pkg/controller/logstorage/kubecontrollers/es_kube_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -32,15 +34,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"

operatorv1 "github.com/tigera/operator/api/v1"
"github.com/tigera/operator/pkg/common"
"github.com/tigera/operator/pkg/controller/certificatemanager"
"github.com/tigera/operator/pkg/controller/k8sapi"
"github.com/tigera/operator/pkg/controller/logstorage/initializer"
"github.com/tigera/operator/pkg/controller/options"
"github.com/tigera/operator/pkg/controller/status"
"github.com/tigera/operator/pkg/controller/tenancy"
"github.com/tigera/operator/pkg/controller/utils"
"github.com/tigera/operator/pkg/controller/utils/imageset"
"github.com/tigera/operator/pkg/ctrlruntime"
Expand All @@ -61,6 +62,7 @@ type ESKubeControllersController struct {
clusterDomain string
usePSP bool
elasticExternal bool
multiTenant bool
tierWatchReady *utils.ReadyFlag
}

Expand All @@ -69,17 +71,14 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return nil
}

if opts.MultiTenant {
return nil
}

// Create the reconciler
r := &ESKubeControllersController{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
clusterDomain: opts.ClusterDomain,
status: status.New(mgr.GetClient(), initializer.TigeraStatusLogStorageKubeController, opts.KubernetesVersion),
elasticExternal: opts.ElasticExternal,
multiTenant: opts.MultiTenant,
tierWatchReady: &utils.ReadyFlag{},
}
r.status.Run(opts.ShutdownContext)
Expand All @@ -90,8 +89,12 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return err
}

// Determine how to handle watch events for cluster-scoped resources.
// Determine how to handle watch events for cluster-scoped resources. For multi-tenant clusters,
// we should update all tenants whenever one changes. For single-tenant clusters, we can just queue the object.
var eventHandler handler.EventHandler = &handler.EnqueueRequestForObject{}
if opts.MultiTenant {
eventHandler = utils.EnqueueAllTenants(mgr.GetClient())
}

// Configure watches for operator.tigera.io APIs this controller cares about.
if err = c.WatchObject(&operatorv1.LogStorage{}, eventHandler); err != nil {
Expand All @@ -109,32 +112,40 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
if err = utils.AddTigeraStatusWatch(c, initializer.TigeraStatusLogStorageKubeController); err != nil {
return fmt.Errorf("logstorage-controller failed to watch logstorage Tigerastatus: %w", err)
}
if opts.MultiTenant {
if err = c.WatchObject(&operatorv1.Tenant{}, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch Tenant resource: %w", err)
}
}

// Watch secrets this controller cares about.
secretsToWatch := []string{
render.TigeraElasticsearchGatewaySecret,
monitor.PrometheusClientTLSSecretName,
}
for _, ns := range []string{common.OperatorNamespace(), render.ElasticsearchNamespace} {

// Determine namespaces to watch.
_, _, namespacesToWatch := tenancy.GetWatchNamespaces(r.multiTenant, render.ElasticsearchNamespace)
for _, ns := range namespacesToWatch {
for _, name := range secretsToWatch {
if err := utils.AddSecretsWatch(c, name, ns); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch Secret: %w", err)
}
}
}

// Catch if something modifies the resources that this controller consumes.
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil {
// The namespace(s) we need to monitor depend upon what tenancy mode we're running in.
// For single-tenant, everything is installed in the calico-system namespace.
// Make a helper for determining which namespaces to use based on tenancy mode.
esKubeControllersNamespace := utils.NewNamespaceHelper(opts.MultiTenant, common.CalicoNamespace, "")
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, esKubeControllersNamespace.InstallNamespace(), &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, common.CalicoNamespace, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the ConfigMap resource: %w", err)
if err := utils.AddDeploymentWatch(c, esgateway.DeploymentName, esKubeControllersNamespace.InstallNamespace()); err != nil {
return fmt.Errorf("log-storage-access-controller failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, render.ElasticsearchNamespace, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the ConfigMap resource: %w", err)
if err := utils.AddDeploymentWatch(c, kubecontrollers.EsKubeController, esKubeControllersNamespace.InstallNamespace()); err != nil {
return fmt.Errorf("log-storage-access-controller failed to watch the Service resource: %w", err)
}

// Perform periodic reconciliation. This acts as a backstop to catch reconcile issues,
Expand All @@ -149,25 +160,58 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return fmt.Errorf("log-storage-kubecontrollers failed to establish a connection to k8s: %w", err)
}

if !opts.MultiTenant {
// Catch if something modifies the resources that this controller consumes.
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, render.ElasticsearchNamespace, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the ConfigMap resource: %w", err)
}
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
})
}

// Start goroutines to establish watches against projectcalico.org/v3 resources.
go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: esKubeControllersNamespace.InstallNamespace()},
})

return nil
}

func (r *ESKubeControllersController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
helper := utils.NewNamespaceHelper(false, common.CalicoNamespace, request.Namespace)
helper := utils.NewNamespaceHelper(r.multiTenant, common.CalicoNamespace, request.Namespace)
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name, "installNS", helper.InstallNamespace(), "truthNS", helper.TruthNamespace())
reqLogger.Info("Reconciling LogStorage - ESKubeControllers")

// We skip requests without a namespace specified in multi-tenant setups.
if r.multiTenant && request.Namespace == "" {
return reconcile.Result{}, nil
}

// When running in multi-tenant mode, we need to install es-kubecontrollers in tenant Namespaces. However, the LogStorage
// resource is still cluster-scoped (since ES is a cluster-wide resource), so we need to look elsewhere to determine
// which tenant namespaces require an es-kubecontrollers instance. We use the tenant API to determine the set of
// namespaces that should have an es-kubecontrollers.
tenant, _, err := utils.GetTenant(ctx, r.multiTenant, r.client, request.Namespace)
if errors.IsNotFound(err) {
reqLogger.V(1).Info("No Tenant in this Namespace, skip")
return reconcile.Result{}, nil
} else if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "An error occurred while querying Tenant", err, reqLogger)
return reconcile.Result{}, err
}

// Get LogStorage resource.
logStorage := &operatorv1.LogStorage{}
key := utils.DefaultTSEEInstanceKey
err := r.client.Get(ctx, key, logStorage)
err = r.client.Get(ctx, key, logStorage)
if err != nil {
if errors.IsNotFound(err) {
r.status.OnCRNotFound()
Expand Down Expand Up @@ -244,16 +288,21 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
}
}

// Get secrets needed for kube-controllers to talk to elastic.
kubeControllersUserSecret, err := utils.GetSecret(ctx, r.client, kubecontrollers.ElasticsearchKubeControllersUserSecret, helper.TruthNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to get kube controllers gateway secret", err, reqLogger)
return reconcile.Result{}, err
// Get secrets needed for kube-controllers to talk to elastic. This is needed for zero-tenants and single-tenants
// that deploy eskube controllers and need to talk to es-gateway
var kubeControllersUserSecret *core.Secret
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a comment explaining that this secret is only used for talking to es-gateway, and that when running in multi-tenant mode es-kube-controllers doesn't talk to es-gateway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "eskube controllers" should be "es-kube-controllers"

if !r.multiTenant {
kubeControllersUserSecret, err = utils.GetSecret(ctx, r.client, kubecontrollers.ElasticsearchKubeControllersUserSecret, helper.TruthNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to get kube controllers gateway secret", err, reqLogger)
return reconcile.Result{}, err
}
}

// Collect the certificates we need to provision es-kube-controllers. These will have been provisioned already by the ES secrets controller.
opts := []certificatemanager.Option{
certificatemanager.WithLogger(reqLogger),
certificatemanager.WithTenant(tenant),
}
cm, err := certificatemanager.Create(r.client, install, r.clusterDomain, helper.TruthNamespace(), opts...)
if err != nil {
Expand All @@ -270,40 +319,42 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
return reconcile.Result{}, err
}

gwNSHelper := utils.NewSingleTenantNamespaceHelper(render.ElasticsearchNamespace)
// Query the trusted bundle from the namespace.
gwTrustedBundle, err := cm.LoadTrustedBundle(ctx, r.client, gwNSHelper.InstallNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, fmt.Sprintf("Error getting trusted bundle in %s", gwNSHelper.InstallNamespace()), err, reqLogger)
return reconcile.Result{}, err
}

pullSecrets, err := utils.GetNetworkingPullSecrets(install, r.client)
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "An error occurring while retrieving the pull secrets", err, reqLogger)
return reconcile.Result{}, err
}

// ESGateway is required in order for kube-controllers to operate successfully, since es-kube-controllers talks to ES
// via this gateway. However, in multi-tenant mode we disable the elasticsearch controller and so this isn't needed.
if err := r.createESGateway(
ctx,
gwNSHelper,
install,
variant,
pullSecrets,
hdler,
reqLogger,
gwTrustedBundle,
r.usePSP,
); err != nil {
return reconcile.Result{}, err
// via this gateway. However, in multi-tenant mode, es-kube-controllers doesn't talk to elasticsearch,
// so this is only needed in single-tenant clusters and zero tenants clusters
if !r.multiTenant {
gwNSHelper := utils.NewSingleTenantNamespaceHelper(render.ElasticsearchNamespace)
// Query the trusted bundle from the namespace.
gwTrustedBundle, err := cm.LoadTrustedBundle(ctx, r.client, gwNSHelper.InstallNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, fmt.Sprintf("Error getting trusted bundle in %s", gwNSHelper.InstallNamespace()), err, reqLogger)
return reconcile.Result{}, err
}
if err := r.createESGateway(
ctx,
gwNSHelper,
install,
variant,
pullSecrets,
hdler,
reqLogger,
gwTrustedBundle,
r.usePSP,
); err != nil {
return reconcile.Result{}, err
}
}

// Query the trusted bundle from the namespace.
trustedBundle, err := cm.LoadTrustedBundle(ctx, r.client, helper.InstallNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, fmt.Sprintf("Error getting trusted bundle in %s", gwNSHelper.InstallNamespace()), err, reqLogger)
r.status.SetDegraded(operatorv1.ResourceReadError, fmt.Sprintf("Error getting trusted bundle in %s", helper.InstallNamespace()), err, reqLogger)
return reconcile.Result{}, err
}

Expand All @@ -324,6 +375,7 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
TrustedBundle: trustedBundle,
Namespace: helper.InstallNamespace(),
BindingNamespaces: namespaces,
Tenant: tenant,
}
esKubeControllerComponents := kubecontrollers.NewElasticsearchKubeControllers(&kubeControllersCfg)

Expand Down
Loading
Loading