Skip to content

Commit

Permalink
PolicyRefs: fix an issue where DeploymentType was not honored
Browse files Browse the repository at this point in the history
  • Loading branch information
mgianluc committed Jul 23, 2024
1 parent d5db7b0 commit c3bfcc0
Show file tree
Hide file tree
Showing 17 changed files with 1,211 additions and 1,091 deletions.
41 changes: 23 additions & 18 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -130,24 +131,8 @@ func main() {
controllers.RegisterFeatures(d, setupLog)

var eventTriggerController controller.Controller
eventTriggerReconciler := &controllers.EventTriggerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConcurrentReconciles: concurrentReconciles,
ShardKey: shardKey,
Mux: sync.Mutex{},
Deployer: d,
Logger: ctrl.Log.WithName("eventTriggerReconciler"),
ClusterMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
ToClusterMap: make(map[types.NamespacedName]*libsveltosset.Set),
EventTriggers: make(map[corev1.ObjectReference]libsveltosv1beta1.Selector),
ClusterLabels: make(map[corev1.ObjectReference]map[string]string),
EventSourceMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
ToEventSourceMap: make(map[types.NamespacedName]*libsveltosset.Set),
EventTriggerMap: make(map[types.NamespacedName]*libsveltosset.Set),
ReferenceMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
ClusterSetMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
}
eventTriggerReconciler := getEventTriggerReconciler(mgr)
eventTriggerReconciler.Deployer = d

eventTriggerController, err = eventTriggerReconciler.SetupWithManager(mgr)
if err != nil {
Expand Down Expand Up @@ -318,3 +303,23 @@ func getDiagnosticsOptions() metricsserver.Options {
FilterProvider: filters.WithAuthenticationAndAuthorization,
}
}

func getEventTriggerReconciler(mgr manager.Manager) *controllers.EventTriggerReconciler {
return &controllers.EventTriggerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConcurrentReconciles: concurrentReconciles,
ShardKey: shardKey,
Mux: sync.Mutex{},
Logger: ctrl.Log.WithName("eventTriggerReconciler"),
ClusterMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
ToClusterMap: make(map[types.NamespacedName]*libsveltosset.Set),
EventTriggers: make(map[corev1.ObjectReference]libsveltosv1beta1.Selector),
ClusterLabels: make(map[corev1.ObjectReference]map[string]string),
EventSourceMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
ToEventSourceMap: make(map[types.NamespacedName]*libsveltosset.Set),
EventTriggerMap: make(map[types.NamespacedName]*libsveltosset.Set),
ReferenceMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
ClusterSetMap: make(map[corev1.ObjectReference]*libsveltosset.Set),
}
}
14 changes: 2 additions & 12 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,13 @@ rules:
resources:
- configmaps
verbs:
- create
- delete
- get
- list
- update
- watch
- '*'
- apiGroups:
- ""
resources:
- secrets
verbs:
- create
- delete
- get
- list
- update
- watch
- '*'
- apiGroups:
- apiextensions.k8s.io
resources:
Expand Down
192 changes: 177 additions & 15 deletions controllers/eventreport_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ import (
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/projectsveltos/event-manager/api/v1beta1"
libsveltosv1beta1 "github.com/projectsveltos/libsveltos/api/v1beta1"
"github.com/projectsveltos/libsveltos/lib/clusterproxy"
logs "github.com/projectsveltos/libsveltos/lib/logsettings"
libsveltosset "github.com/projectsveltos/libsveltos/lib/set"
libsveltostemplate "github.com/projectsveltos/libsveltos/lib/template"
)

const (
Expand Down Expand Up @@ -98,6 +101,72 @@ func removeEventReportsFromCluster(ctx context.Context, c client.Client, cluster
return nil
}

// buildEventTriggersForEventSourceMap builds a map:
// key => eventSource name;
// values => slice of the EventTriggers referencing it
// This map is built one per cluster as EventSource can be expressed as a template and instantiated using
// cluster namespace, name and type
func buildEventTriggersForEventSourceMap(cluster *corev1.ObjectReference, eventTriggers *v1beta1.EventTriggerList,
) (map[string][]*v1beta1.EventTrigger, error) {

clusterType := clusterproxy.GetClusterType(cluster)

eventSourceMap := map[string][]*v1beta1.EventTrigger{}

for i := range eventTriggers.Items {
et := &eventTriggers.Items[i]

eventSourceName, err := libsveltostemplate.GetReferenceResourceName(cluster.Namespace, cluster.Name,
string(clusterType), et.Spec.EventSourceName)
if err != nil {
return nil, err
}

s := eventSourceMap[eventSourceName]
if s == nil {
s = make([]*v1beta1.EventTrigger, 0)
eventSourceMap[eventSourceName] = s
}

s = append(s, et)
eventSourceMap[eventSourceName] = s
}

return eventSourceMap, nil
}

func addEventTriggerMatchingCluster(et *v1beta1.EventTrigger,
eventTriggerMap map[string]libsveltosset.Set) map[string]libsveltosset.Set {

matchingClusters := libsveltosset.Set{}

for i := range et.Status.MatchingClusterRefs {
cluster := &et.Status.MatchingClusterRefs[i]
matchingClusters.Insert(cluster)
}

eventTriggerMap[et.Name] = matchingClusters

return eventTriggerMap
}

// buildClusterForEventTriggerMap builds a map:
// key => eventTrigger name
// values => slice of currently matching clusters
func buildEventTriggersForClusterMap(eventTriggers *v1beta1.EventTriggerList,
) map[string]libsveltosset.Set {

eventTriggerMap := map[string]libsveltosset.Set{}

for i := range eventTriggers.Items {
et := &eventTriggers.Items[i]

eventTriggerMap = addEventTriggerMatchingCluster(et, eventTriggerMap)
}

return eventTriggerMap
}

// Periodically collects EventReports from each CAPI cluster.
func collectEventReports(c client.Client, shardKey string, logger logr.Logger) {
interval := 10 * time.Second
Expand All @@ -108,15 +177,40 @@ func collectEventReports(c client.Client, shardKey string, logger logr.Logger) {

ctx := context.TODO()
for {
logger.V(logs.LogDebug).Info("collecting EventReports")
logger.V(logs.LogDebug).Info("collecting EventTriggers")
// get all EventTriggers
eventTriggers := &v1beta1.EventTriggerList{}
err := c.List(ctx, eventTriggers)
if err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to get eventTriggers: %v", err))
time.Sleep(interval)
continue
}

// build a map eventTrigger: matching clusters
eventTriggerMap := buildEventTriggersForClusterMap(eventTriggers)

logger.V(logs.LogDebug).Info("collecting managed clusters")
clusterList, err := clusterproxy.GetListOfClustersForShardKey(ctx, c, "", shardKey, logger)
if err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to get clusters: %v", err))
time.Sleep(interval)
continue
}

for i := range clusterList {
cluster := &clusterList[i]
err = collectAndProcessEventReportsFromCluster(ctx, c, cluster, logger)

// Build a map of EventTrigger consuming an EventSource. This is built once per cluster
// as EventSourceName in EventTrigger.Spec can be expressed as a template and instantiated
// using cluster namespace, name and type.
eventSourceMap, err := buildEventTriggersForEventSourceMap(cluster, eventTriggers)
if err != nil {
time.Sleep(interval)
continue
}

err = collectAndProcessEventReportsFromCluster(ctx, c, cluster, eventSourceMap, eventTriggerMap, logger)
if err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to collect EventReports from cluster: %s/%s %v",
cluster.Namespace, cluster.Name, err))
Expand All @@ -127,8 +221,9 @@ func collectEventReports(c client.Client, shardKey string, logger logr.Logger) {
}
}

func collectAndProcessEventReportsFromCluster(ctx context.Context, c client.Client,
cluster *corev1.ObjectReference, logger logr.Logger) error {
func collectAndProcessEventReportsFromCluster(ctx context.Context, c client.Client, cluster *corev1.ObjectReference,
eventSourceMap map[string][]*v1beta1.EventTrigger, eventTriggerMap map[string]libsveltosset.Set,
logger logr.Logger) error {

logger = logger.WithValues("cluster", fmt.Sprintf("%s/%s", cluster.Namespace, cluster.Name))
clusterRef := &corev1.ObjectReference{
Expand All @@ -155,6 +250,7 @@ func collectAndProcessEventReportsFromCluster(ctx context.Context, c client.Clie
}

logger.V(logs.LogDebug).Info("collecting EventReports from cluster")

eventReportList := libsveltosv1beta1.EventReportList{}
err = remoteClient.List(ctx, &eventReportList)
if err != nil {
Expand All @@ -164,19 +260,26 @@ func collectAndProcessEventReportsFromCluster(ctx context.Context, c client.Clie
currentEventReports := make(map[string]bool)
for i := range eventReportList.Items {
er := &eventReportList.Items[i]

if shouldIgnore(er) {
continue
}

l := logger.WithValues("eventReport", er.Name)
// First update/delete eventReports in managemnent cluster
if !er.DeletionTimestamp.IsZero() {
logger.V(logs.LogDebug).Info("deleting from management cluster")
err = deleteEventReport(ctx, c, cluster, er, l)
if err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to delete EventReport in management cluster. Err: %v", err))
continue
}
} else {
} else if shouldReprocess(er) {
logger.V(logs.LogDebug).Info("updating in management cluster")
err = updateEventReport(ctx, c, cluster, er, l)
if err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to update EventReport in management cluster. Err: %v", err))
continue
}
// Name in the management cluster is different than name in the managed cluster
eventSourceName := er.Labels[libsveltosv1beta1.EventSourceNameLabel]
Expand All @@ -185,7 +288,9 @@ func collectAndProcessEventReportsFromCluster(ctx context.Context, c client.Clie
currentEventReports[eventReportName] = true
}

logger.V(logs.LogDebug).Info("updating in managed cluster")
updateAllClusterProfiles(ctx, c, cluster, er, eventSourceMap, eventTriggerMap, logger)

logger.V(logs.LogDebug).Info("updating EventReport in the managed cluster")
// Update EventReport Status in managed cluster
phase := libsveltosv1beta1.ReportProcessed
er.Status.Phase = &phase
Expand All @@ -195,8 +300,72 @@ func collectAndProcessEventReportsFromCluster(ctx context.Context, c client.Clie
}
}

return removeEventReportsFromCluster(ctx, c, cluster.Namespace, cluster.Name, clusterproxy.GetClusterType(cluster),
currentEventReports, logger)
return nil
}

// EventReports are collected from managed cluster to the management cluster.
// When an EventReport is collected from a managed cluster and created in the
// management cluster, the label eventreport.projectsveltos.io/cluster-name
// is added. All EventReports found in the management cluster with this
// labels should be ignored as collected from other managed clusters.
func shouldIgnore(er *libsveltosv1beta1.EventReport) bool {
if er.Labels == nil {
return false
}

_, ok := er.Labels[libsveltosv1beta1.EventReportClusterNameLabel]
return ok
}

// If the EventReport in the managed cluster is marked as Processed, ignore it.
func shouldReprocess(er *libsveltosv1beta1.EventReport) bool {
if er.Status.Phase == nil {
return true
}

return *er.Status.Phase != libsveltosv1beta1.ReportProcessed
}

// isEventTriggerMatchingTheCluster returns true if EventTrigger is currently matching
// cluster
func isEventTriggerMatchingTheCluster(et *v1beta1.EventTrigger, cluster *corev1.ObjectReference,
eventTriggerMap map[string]libsveltosset.Set) bool {

matchingClusters := eventTriggerMap[et.Name]
return matchingClusters.Has(cluster)
}

func updateAllClusterProfiles(ctx context.Context, mgmtClient client.Client, cluster *corev1.ObjectReference,
er *libsveltosv1beta1.EventReport, eventSourceMap map[string][]*v1beta1.EventTrigger,
eventTriggerMap map[string]libsveltosset.Set, logger logr.Logger) {

clusterType := clusterproxy.GetClusterType(cluster)

// Get all EventSource from EventReport
eventSourceName := er.Labels[libsveltosv1beta1.EventSourceNameLabel]
logger.V(logs.LogDebug).Info(fmt.Sprintf("eventSource is %s", eventSourceName))

// Get all EventTriggers referencing this EventSource
eventTriggers := eventSourceMap[eventSourceName]

// For each EventTrigger
for i := range eventTriggers {
l := logger.WithValues("eventTrigger", eventTriggers[i].Name)

// If EventTrigger is currently not matching this cluster, ignore this EventReports
if !isEventTriggerMatchingTheCluster(eventTriggers[i], cluster, eventTriggerMap) {
l.V(logs.LogDebug).Info("cluster is not a match anymore. Ignore.")
continue
}

l.V(logs.LogDebug).Info("updating ClusterProfile")
err := updateClusterProfiles(ctx, mgmtClient, cluster.Namespace, cluster.Name, clusterType,
eventTriggers[i], er, logger)
if err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to update ClusterProfile for EventTrigger %s: %v",
eventTriggers[i].GetName(), err))
}
}
}

func deleteEventReport(ctx context.Context, c client.Client, cluster *corev1.ObjectReference,
Expand Down Expand Up @@ -230,13 +399,6 @@ func deleteEventReport(ctx context.Context, c client.Client, cluster *corev1.Obj
func updateEventReport(ctx context.Context, c client.Client, cluster *corev1.ObjectReference,
eventReport *libsveltosv1beta1.EventReport, logger logr.Logger) error {

if eventReport.Spec.ClusterName != "" {
// if ClusterName is set, this is coming from a
// managed cluster. If management cluster is in turn
// managed by another cluster, do not pull those.
return nil
}

if eventReport.Labels == nil {
logger.V(logs.LogInfo).Info(eventReportMalformedLabelError)
return errors.New(eventReportMalformedLabelError)
Expand Down
Loading

0 comments on commit c3bfcc0

Please sign in to comment.