Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

operator: Add Pod annotations with node topology labels to support zone aware scheduling #9503

Merged
merged 21 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions operator/controllers/loki/lokistack_zone_labeling_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package controllers

import (
"context"

"github.com/ViaQ/logerr/v2/kverrors"
"github.com/go-logr/logr"

"github.com/grafana/loki/operator/internal/external/k8s"
"github.com/grafana/loki/operator/internal/handlers"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var createOrUpdatePred = builder.WithPredicates(predicate.Funcs{
xperimental marked this conversation as resolved.
Show resolved Hide resolved
UpdateFunc: func(e event.UpdateEvent) bool { return true },
CreateFunc: func(e event.CreateEvent) bool { return true },
xperimental marked this conversation as resolved.
Show resolved Hide resolved
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
})

// LokiStackZoneAwarePodReconciler watches all the loki component pods and updates the pod annotations with the topology node labels.
type LokiStackZoneAwarePodReconciler struct {
client.Client
Log logr.Logger
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile
func (r *LokiStackZoneAwarePodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// managed, err := state.IsManaged(ctx, req, r.Client)
// if err != nil {
// return ctrl.Result{}, err
// }
// if !managed {
// r.Log.Info("Skipping reconciliation for unmanaged LokiStack resource", "name", req.String())
// // Stop requeueing for unmanaged LokiStack custom resources
// return ctrl.Result{}, nil
// }

// var stack lokiv1.LokiStack

var lokiPod corev1.Pod

if err := r.Client.Get(ctx, req.NamespacedName, &lokiPod); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, kverrors.Wrap(err, "failed to lookup lokistack", "name", req.NamespacedName)
xperimental marked this conversation as resolved.
Show resolved Hide resolved
}

// if len(stack.Spec.Replication.Zones) > 0 {
labels := lokiPod.GetLabels()
for key, value := range labels {
if key == "loki.grafana.com/zoneaware" {
xperimental marked this conversation as resolved.
Show resolved Hide resolved
err := handlers.AnnotatePodsWithNodeLabels(ctx, r.Log, r.Client, lokiPod, value)
if err != nil {
return ctrl.Result{}, err
}
}
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *LokiStackZoneAwarePodReconciler) SetupWithManager(mgr ctrl.Manager) error {
b := ctrl.NewControllerManagedBy(mgr)
return r.buildController(k8s.NewCtrlBuilder(b))
}

func (r *LokiStackZoneAwarePodReconciler) buildController(bld k8s.Builder) error {
return bld.
// For(&lokiv1.LokiStack{}).
Named("LokiPod").
xperimental marked this conversation as resolved.
Show resolved Hide resolved
Watches(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, createOrUpdatePred).
Complete(r)
}

/* func (r *LokiStackZoneAwarePodReconciler) enqueueForPodBinding() handler.EventHandler {
ctx := context.TODO()
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
//lokiPod := &corev1.Pod{}
labels := obj.GetLabels()
var requests []reconcile.Request

for key, value := range labels {
if key == "loki.grafana.com/zoneaware" && value == "enabled" {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: stack.Namespace,
Name: stack.Name,
},
})
}
}
r.Log.Info("Enqueued requests for LokiStack because Zone Aware Pod was created or updated", "LokiStack", stack.Name, "Pod", obj.GetName())
return requests



// if err := r.Client.List(ctx, lokiStacks); err != nil {
// r.Log.Error(err, "Error getting LokiStack resources in event handler")
// return nil
// }

// var requests []reconcile.Request

// for _, stack := range lokiStacks.Items {
// if obj.GetNamespace() == stack.Namespace && len(stack.Spec.Replication.Zones) > 0 {
// requests = append(requests, reconcile.Request{
// NamespacedName: types.NamespacedName{
// Namespace: stack.Namespace,
// Name: stack.Name,
// },
// })
// r.Log.Info("Enqueued requests for LokiStack because Zone Aware Pod was created or updated", "LokiStack", stack.Name, "Pod", obj.GetName())

// return requests
// }
// }
})
} */
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package controllers

import (
"testing"

"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/grafana/loki/operator/internal/external/k8s/k8sfakes"
)

func TestLokiStackZoneAwarePodController_RegisterWatchedResources(t *testing.T) {
k := &k8sfakes.FakeClient{}

// Require owned resources
type test struct {
index int
watchesCallsCount int
src source.Source
pred builder.OwnsOption
}
table := []test{
{
src: &source.Kind{Type: &corev1.Pod{}},
index: 0,
watchesCallsCount: 1,
pred: createOrUpdatePred,
},
}
for _, tst := range table {
b := &k8sfakes.FakeBuilder{}
b.NamedReturns(b)
b.WatchesReturns(b)

c := &LokiStackZoneAwarePodReconciler{Client: k}
err := c.buildController(b)
require.NoError(t, err)

// Require Watches-calls for all watches resources
require.Equal(t, tst.watchesCallsCount, b.WatchesCallCount())

src, _, opts := b.WatchesArgsForCall(tst.index)
require.Equal(t, tst.src, src)
require.Equal(t, tst.pred, opts[0])
}
}
Loading