From b9caeb1967d895d3444880f2662c5cf13adbd33a Mon Sep 17 00:00:00 2001 From: hc-github-team-consul-core Date: Wed, 21 Jun 2023 09:52:22 -0400 Subject: [PATCH] Backport of Adding support for weighted k8s service into release/1.2.x (#2402) * backport of commit fdef6171d413bd317cceead06404e69f8d586322 * backport of commit 7d47c0055dcdd13041c0674a9c663d596a761298 * backport of commit 552c0e76cb877723048330c830187fa62941585d * backport of commit 14fa790f3c7bd5daeac304d9c356393377e47009 * backport of commit 9f396c3f4b8c4305bbac71b1ca20f6076f682d68 * backport of commit 3320b62e788dc2c6a40fafdf0ebbeb77a9f6d369 * backport of commit dc3e24eb233f7f23b656d28a2edbc8d443083f4a * backport of commit 3dad9a0ef3d68d2661bede945a61066ea960e161 * backport of commit 95b3f7cff520b1ca46fd8a3f0de4a3e0d081c16d * backport of commit fa54600cfd0ddf272241f4246b4cd580529560d5 --------- Co-authored-by: srahul3 Co-authored-by: Ashwin Venkatesh --- .changelog/2293.txt | 3 + control-plane/catalog/to-consul/annotation.go | 6 + control-plane/catalog/to-consul/resource.go | 41 ++++++ .../catalog/to-consul/resource_test.go | 133 ++++++++++++++++++ 4 files changed, 183 insertions(+) create mode 100644 .changelog/2293.txt diff --git a/.changelog/2293.txt b/.changelog/2293.txt new file mode 100644 index 0000000000..ce6d888bcd --- /dev/null +++ b/.changelog/2293.txt @@ -0,0 +1,3 @@ +```release-note:feature +sync-catalog: add ability to support weighted loadbalancing by service annotation `consul.hashicorp.com/service-weight: ` +``` \ No newline at end of file diff --git a/control-plane/catalog/to-consul/annotation.go b/control-plane/catalog/to-consul/annotation.go index 27e37ca217..edca70b60c 100644 --- a/control-plane/catalog/to-consul/annotation.go +++ b/control-plane/catalog/to-consul/annotation.go @@ -26,4 +26,10 @@ const ( // annotationServiceMetaPrefix is the prefix for setting meta key/value // for a service. The remainder of the key is the meta key. annotationServiceMetaPrefix = "consul.hashicorp.com/service-meta-" + + // annotationServiceWeight is the key of the annotation that determines + // the traffic weight of the service which is spanned over multiple k8s cluster. + // e.g. Service `backend` in k8s cluster `A` receives 25% of the traffic + // compared to same `backend` service in k8s cluster `B`. + annotationServiceWeight = "consul.hashicorp.com/service-weight" ) diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 6a4e913d80..e02da6c71c 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -511,6 +511,19 @@ func (t *ServiceResource) generateRegistrations(key string) { r.Service = &rs r.Service.ID = serviceID(r.Service.Service, ip) r.Service.Address = ip + // Adding information about service weight. + // Overrides the existing weight if present + if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" { + weightI, err := getServiceWeight(weight) + if err == nil { + r.Service.Weights = consulapi.AgentWeights{ + Passing: weightI, + } + } else { + t.Log.Debug("[generateRegistrations] service weight err: ", err) + } + } + t.consulMap[key] = append(t.consulMap[key], &r) } @@ -547,6 +560,19 @@ func (t *ServiceResource) generateRegistrations(key string) { r.Service.ID = serviceID(r.Service.Service, addr) r.Service.Address = addr + // Adding information about service weight. + // Overrides the existing weight if present + if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" { + weightI, err := getServiceWeight(weight) + if err == nil { + r.Service.Weights = consulapi.AgentWeights{ + Passing: weightI, + } + } else { + t.Log.Debug("[generateRegistrations] service weight err: ", err) + } + } + t.consulMap[key] = append(t.consulMap[key], &r) } } @@ -999,3 +1025,18 @@ func (t *ServiceResource) isIngressService(key string) bool { func consulHealthCheckID(k8sNS string, serviceID string) string { return fmt.Sprintf("%s/%s", k8sNS, serviceID) } + +// Calculates the passing service weight. +func getServiceWeight(weight string) (int, error) { + // error validation if the input param is a number + weightI, err := strconv.Atoi(weight) + if err != nil { + return -1, err + } + + if weightI <= 1 { + return -1, fmt.Errorf("expecting the service annotation %s value to be greater than 1", annotationServiceWeight) + } + + return weightI, nil +} diff --git a/control-plane/catalog/to-consul/resource_test.go b/control-plane/catalog/to-consul/resource_test.go index 3c01088c0d..3b8fb78497 100644 --- a/control-plane/catalog/to-consul/resource_test.go +++ b/control-plane/catalog/to-consul/resource_test.go @@ -56,6 +56,139 @@ func TestServiceResource_createDelete(t *testing.T) { }) } +// Test that Loadbalancer service weight is set from service annotation. +func TestServiceWeight_ingress(t *testing.T) { + t.Parallel() + client := fake.NewSimpleClientset() + syncer := newTestSyncer() + serviceResource := defaultServiceResource(client, syncer) + + // Start the controller + closer := controller.TestControllerRun(&serviceResource) + defer closer() + + // Insert an LB service + svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4") + svc.Annotations[annotationServiceWeight] = "22" + svc.Status.LoadBalancer.Ingress = append( + svc.Status.LoadBalancer.Ingress, + corev1.LoadBalancerIngress{IP: "3.3.3.3"}, + ) + + svc.Status.LoadBalancer.Ingress = append( + svc.Status.LoadBalancer.Ingress, + corev1.LoadBalancerIngress{IP: "4.4.4.4"}, + ) + + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) + require.NoError(t, err) + + // Verify what we got + retry.Run(t, func(r *retry.R) { + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(r, actual, 3) + require.Equal(r, "foo", actual[1].Service.Service) + require.Equal(r, "3.3.3.3", actual[1].Service.Address) + require.Equal(r, 22, actual[1].Service.Weights.Passing) + require.Equal(r, "foo", actual[2].Service.Service) + require.Equal(r, "4.4.4.4", actual[2].Service.Address) + require.Equal(r, 22, actual[2].Service.Weights.Passing) + require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID) + }) +} + +// Test that Loadbalancer service weight is set from service annotation. +func TestServiceWeight_externalIP(t *testing.T) { + t.Parallel() + client := fake.NewSimpleClientset() + syncer := newTestSyncer() + serviceResource := defaultServiceResource(client, syncer) + + // Start the controller + closer := controller.TestControllerRun(&serviceResource) + defer closer() + + // Insert an LB service + svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4") + svc.Annotations[annotationServiceWeight] = "22" + svc.Spec.ExternalIPs = []string{"3.3.3.3", "4.4.4.4"} + + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{}) + require.NoError(t, err) + + // Verify what we got + retry.Run(t, func(r *retry.R) { + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(r, actual, 2) + require.Equal(r, "foo", actual[0].Service.Service) + require.Equal(r, "3.3.3.3", actual[0].Service.Address) + require.Equal(r, 22, actual[0].Service.Weights.Passing) + require.Equal(r, "foo", actual[1].Service.Service) + require.Equal(r, "4.4.4.4", actual[1].Service.Address) + require.Equal(r, 22, actual[1].Service.Weights.Passing) + require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID) + }) +} + +// Test service weight. +func TestServiceWeight(t *testing.T) { + t.Parallel() + cases := map[string]struct { + Weight string + ExpectError bool + ExtectedWeight int + }{ + "external-IP": { + Weight: "22", + ExpectError: false, + ExtectedWeight: 22, + }, + "non-int-weight": { + Weight: "non-int", + ExpectError: true, + ExtectedWeight: 0, + }, + "one-weight": { + Weight: "1", + ExpectError: true, + ExtectedWeight: 0, + }, + "zero-weight": { + Weight: "0", + ExpectError: true, + ExtectedWeight: 0, + }, + "negative-weight": { + Weight: "-2", + ExpectError: true, + ExtectedWeight: 0, + }, + "greater-than-100-is-allowed": { + Weight: "1000", + ExpectError: false, + ExtectedWeight: 1000, + }, + } + + for name, c := range cases { + t.Run(name, func(tt *testing.T) { + weightI, err := getServiceWeight(c.Weight) + // Verify what we got + retry.Run(tt, func(r *retry.R) { + if c.ExpectError { + require.Error(r, err) + } else { + require.Equal(r, c.ExtectedWeight, weightI) + } + }) + }) + } +} + // Test that we're default enabled. func TestServiceResource_defaultEnable(t *testing.T) { t.Parallel()