From 3136b91f4a3ad7de0cde5948c2af063002efecc4 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 24 Sep 2024 14:21:06 -0400 Subject: [PATCH] Handle conflicting service ports when merging per MCS spec The spec states that if the properties of service ports between clusters don't match, the clusterset service should expose the union of the ports. We're doing this however we're not properly handle conflicts as per the spec. If multiple clusters have a matching port by name but the other properties don't match, it should be considered a conflict and the conflict resolution policy should be applied, ie the port from the cluster with the oldest export timestamp should be used. When merging, we need to sort the ports by cluster timestamp and use just the port name when computing the union rather than all the port properties as we did before. Also when checking for conflicts, we should also check the AppProtocol property. Signed-off-by: Tom Pantelis --- coredns/resolver/service_info.go | 3 +- pkg/agent/controller/agent.go | 10 +-- .../controller/clusterip_service_test.go | 16 +++++ pkg/agent/controller/endpoint_slice.go | 10 ++- pkg/agent/controller/service_import.go | 25 ++------ .../controller/service_import_aggregator.go | 64 +++++++++++++++++-- 6 files changed, 94 insertions(+), 34 deletions(-) diff --git a/coredns/resolver/service_info.go b/coredns/resolver/service_info.go index abdea075..f3c8c985 100644 --- a/coredns/resolver/service_info.go +++ b/coredns/resolver/service_info.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/submariner-io/admiral/pkg/slices" + "k8s.io/utils/ptr" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -44,7 +45,7 @@ func (si *serviceInfo) mergePorts() { si.ports = info.endpointRecords[0].Ports } else { si.ports = slices.Intersect(si.ports, info.endpointRecords[0].Ports, func(p mcsv1a1.ServicePort) string { - return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port) + return fmt.Sprintf("%s:%s:%d:%s", p.Name, p.Protocol, p.Port, ptr.Deref(p.AppProtocol, "")) }) } } diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index d00f0509..ab276be1 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -41,6 +41,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" validations "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/utils/ptr" logf "sigs.k8s.io/controller-runtime/pkg/log" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -123,7 +124,8 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, agentConfig A agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient, agentController.serviceSyncer, func(serviceName string, serviceNamespace string) *mcsv1a1.ServiceImport { obj, found, _ := agentController.serviceImportController.remoteSyncer.GetResource( - brokerAggregatedServiceImportName(serviceName, serviceNamespace), syncerConf.BrokerNamespace) + brokerAggregatedServiceImportName(serviceName, serviceNamespace), + agentController.endpointSliceController.syncer.GetBrokerNamespace()) if !found { return nil } @@ -479,9 +481,9 @@ func (c converter) toServicePorts(from []discovery.EndpointPort) []mcsv1a1.Servi to := make([]mcsv1a1.ServicePort, len(from)) for i := range from { to[i] = mcsv1a1.ServicePort{ - Name: *from[i].Name, - Protocol: *from[i].Protocol, - Port: *from[i].Port, + Name: ptr.Deref(from[i].Name, ""), + Protocol: ptr.Deref(from[i].Protocol, ""), + Port: ptr.Deref(from[i].Port, 0), AppProtocol: from[i].AppProtocol, } } diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index 0fe57c08..a16bb460 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -572,6 +572,22 @@ func testClusterIPServiceInTwoClusters() { }) }) + Context("with conflicting ports", func() { + BeforeEach(func() { + t.cluster2.service.Spec.Ports = []corev1.ServicePort{t.cluster1.service.Spec.Ports[0], toServicePort(port3)} + t.cluster2.service.Spec.Ports[0].Port++ + t.aggregatedServicePorts = []mcsv1a1.ServicePort{port1, port2, port3} + }) + + It("should correctly set the ports in the aggregated ServiceImport and set the Conflict status condition", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) + + condition := newServiceExportConflictCondition(controller.PortConflictReason) + t.cluster1.awaitServiceExportCondition(condition) + t.cluster2.awaitServiceExportCondition(condition) + }) + }) + Context("with differing service types", func() { BeforeEach(func() { t.cluster2.service.Spec.ClusterIP = corev1.ClusterIPNone diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index 564c92b2..11dfda71 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -39,6 +39,7 @@ import ( k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/ptr" "k8s.io/utils/set" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) @@ -243,6 +244,10 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( mcsv1a1.LabelServiceName: name, })) + servicePortKey := func(p mcsv1a1.ServicePort) string { + return fmt.Sprintf("%s:%s:%d:%s", p.Name, p.Protocol, p.Port, ptr.Deref(p.AppProtocol, "")) + } + var prevServicePorts []mcsv1a1.ServicePort var intersectedServicePorts []mcsv1a1.ServicePort clusterNames := set.New[string]() @@ -271,7 +276,7 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( if conflict { aggregatedSI := c.aggregatedServiceImportGetter(name, namespace) if aggregatedSI == nil { - return false, nil + return true, nil } exposedOp := "intersection" @@ -321,7 +326,8 @@ func (c *EndpointSliceController) enqueueForConflictCheck(ctx context.Context, e func servicePortsToString(p []mcsv1a1.ServicePort) string { s := make([]string, len(p)) for i := range p { - s[i] = fmt.Sprintf("[name: %s, protocol: %s, port: %v]", p[i].Name, p[i].Protocol, p[i].Port) + s[i] = fmt.Sprintf("[name: %s, protocol: %s, port: %v, appProtocol: %q]", p[i].Name, p[i].Protocol, p[i].Port, + ptr.Deref(p[i].AppProtocol, "")) } return strings.Join(s, ", ") diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index e54018f7..c988ccdf 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -25,7 +25,6 @@ import ( "net" "reflect" "strconv" - "strings" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -699,28 +698,12 @@ func (c *ServiceImportController) localServiceImportLister(transform func(si *mc } func findClusterWithOldestTimestamp(from map[string]string) string { - oldest := int64(math.MaxInt64) - foundCluster := "" - - for k, v := range from { - cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix) - if !found { - continue - } - - t, err := strconv.ParseInt(v, 10, 64) - if err != nil { - logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster) - continue - } - - if t < oldest || (t == oldest && cluster < foundCluster) { - foundCluster = cluster - oldest = t - } + names := getClusterNamesOrderedByTimestamp(from) + if len(names) > 0 { + return names[0] } - return foundCluster + return "" } func toSessionAffinityConfigString(c *corev1.SessionAffinityConfig) string { diff --git a/pkg/agent/controller/service_import_aggregator.go b/pkg/agent/controller/service_import_aggregator.go index afb34b19..76d68339 100644 --- a/pkg/agent/controller/service_import_aggregator.go +++ b/pkg/agent/controller/service_import_aggregator.go @@ -20,7 +20,9 @@ package controller import ( "context" - "fmt" + goslices "slices" + "strconv" + "strings" "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/log" @@ -76,14 +78,25 @@ func (a *ServiceImportAggregator) setServicePorts(ctx context.Context, si *mcsv1 serviceNamespace, serviceName) } - si.Spec.Ports = make([]mcsv1a1.ServicePort, 0) + portsByCluster := map[string][]mcsv1a1.ServicePort{} for i := range list.Items { eps := a.converter.toEndpointSlice(&list.Items[i]) - si.Spec.Ports = slices.Union(si.Spec.Ports, a.converter.toServicePorts(eps.Ports), servicePortKey) + portsByCluster[eps.Labels[constants.MCSLabelSourceCluster]] = a.converter.toServicePorts(eps.Ports) + } + + // Sort the clusters by their ServiceExport timestamps stored in the ServiceImport annotations so conflicting ports are + // resolved by taking the oldest as per the MCS spec's conflict resolution policy. + + si.Spec.Ports = make([]mcsv1a1.ServicePort, 0) + for _, clusterName := range getClusterNamesOrderedByTimestamp(si.Annotations) { + ports := portsByCluster[clusterName] + si.Spec.Ports = slices.Union(si.Spec.Ports, ports, func(p mcsv1a1.ServicePort) string { + return p.Name + }) } - logger.V(log.DEBUG).Infof("Calculated ports for aggregated ServiceImport %q: %#v", si.Name, si.Spec.Ports) + logger.V(log.DEBUG).Infof("Calculated ports for aggregated ServiceImport %q: %s", si.Name, servicePortsToString(si.Spec.Ports)) return nil } @@ -152,6 +165,45 @@ func clusterStatusKey(c mcsv1a1.ClusterStatus) string { return c.Cluster } -func servicePortKey(p mcsv1a1.ServicePort) string { - return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port) +type clusterSortInfo struct { + name string + timestamp int64 +} + +func getClusterNamesOrderedByTimestamp(from map[string]string) []string { + info := make([]clusterSortInfo, 0, len(from)) + + for k, v := range from { + cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix) + if !found { + continue + } + + t, err := strconv.ParseInt(v, 10, 64) + if err != nil { + logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster) + continue + } + + info = append(info, clusterSortInfo{name: cluster, timestamp: t}) + } + + goslices.SortFunc(info, func(a, b clusterSortInfo) int { + if a.timestamp == b.timestamp { + return strings.Compare(a.name, b.name) + } + + if a.timestamp < b.timestamp { + return -1 + } + + return 1 + }) + + sortedNames := make([]string, len(info)) + for i := range info { + sortedNames[i] = info[i].name + } + + return sortedNames }