Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for weighted k8s service #2293

Merged
merged 13 commits into from
Jun 19, 2023
3 changes: 3 additions & 0 deletions .changelog/2293.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
sync-catalog: add ability to support weighted loadbalancing by service annotation `consul.hashicorp.com/service-weight: <number>`
```
6 changes: 6 additions & 0 deletions control-plane/catalog/to-consul/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ const (
// annotationServiceMetaPrefix is the prefix for setting meta key/value
// for a service. The remainder of the key is the meta key.
annotationServiceMetaPrefix = "consul.hashicorp.com/service-meta-"

// annotationServiceWeight is the key of the annotation that determines
// the traffic weight of the service which is spanned over multiple k8s cluster.
// e.g. Service `backend` in k8s cluster `A` receives 25% of the traffic
// compared to same `backend` service in k8s cluster `B`
annotationServiceWeight = "consul.hashicorp.com/service-weight"
)
49 changes: 49 additions & 0 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, ip)
r.Service.Address = ip
// Adding information about service weight.
// Overrides the existing weight if present
if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" {
err := setServiceWeight(weight, len(ips), &r)
if err != nil {
t.Log.Debug("assertion: The service with ",
"key", key,
"service", baseService.Service,
"namespace", baseService.Namespace,
"expects a positive integer value, however observed value was not integer: ", weight)
}
}
t.consulMap[key] = append(t.consulMap[key], &r)
}

Expand Down Expand Up @@ -547,6 +559,20 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr

// Adding information about service weight.
// Overrides the existing weight if present
if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" {
Copy link
Contributor

@absolutelightning absolutelightning Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these lines 567 - 577 and lines 518 - 528 are exactly the same. can we take it out in a function as well?

Copy link
Contributor Author

@srahul3 srahul3 Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code at both place is constructing the CatalogRegistration object, there is no logic which is duplicate. Read the comment #2293 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok to keep as is. it appears that the only behavior here is an assignment which IMO is not really duplication.

ingress_count := len(svc.Status.LoadBalancer.Ingress)
err := setServiceWeight(weight, ingress_count, &r)
if err != nil {
t.Log.Debug("assertion: The service with ",
"key", key,
"service", baseService.Service,
"namespace", baseService.Namespace,
"expects a positive integer value, however observed value was not integer: ", weight)
}
}

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
Expand Down Expand Up @@ -999,3 +1025,26 @@ func (t *ServiceResource) isIngressService(key string) bool {
func consulHealthCheckID(k8sNS string, serviceID string) string {
return fmt.Sprintf("%s/%s", k8sNS, serviceID)
}

// Sets the passing service weight
func setServiceWeight(weight string, appsCount int, r *consulapi.CatalogRegistration) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id change this method to return the service weight or an error, providing the value of the annotation as an input. we should avoid sending in the consul registration into it. it will make this is a cleaner interface that is significantly easier to test.

// error validation if the input param is a number
weightI, err := strconv.Atoi(weight)
if err != nil {
return err
}

if weightI <= 1 {
return nil
}

var perAppWeight = weightI / appsCount
Copy link
Contributor

@absolutelightning absolutelightning Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can set weight to whatever value we are getting. We don't need to divide

Copy link
Contributor

@absolutelightning absolutelightning Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. One simple reason being - if a customer is setting weight - x for a service - and if they query, they will get another value from APIs which will be divided.
  2. Weight are proportional - if you have two services with weights 10 and 20 respectively, the service with weight 20 would receive twice as much traffic as the service with weight 10. If we divide, we will not get the correct traffic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup! we should set the weight that is set on the annotation and we dont need the app count for the reasons @absolutelightning described!

if perAppWeight < 1 {
perAppWeight = 1
}
r.Service.Weights = consulapi.AgentWeights{
Passing: perAppWeight,
}

return nil
}
112 changes: 112 additions & 0 deletions control-plane/catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,118 @@ func TestServiceResource_createDelete(t *testing.T) {
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_ingress(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "3.3.3.3"},
)

svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "4.4.4.4"},
)

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 3)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "3.3.3.3", actual[1].Service.Address)
require.Equal(r, 7, actual[1].Service.Weights.Passing)
require.Equal(r, "foo", actual[2].Service.Service)
require.Equal(r, "4.4.4.4", actual[2].Service.Address)
require.Equal(r, 7, actual[2].Service.Weights.Passing)
require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID)
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_externalIP(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Spec.ExternalIPs = []string{"3.3.3.3", "4.4.4.4"}

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 2)
require.Equal(r, "foo", actual[0].Service.Service)
require.Equal(r, "3.3.3.3", actual[0].Service.Address)
require.Equal(r, 11, actual[0].Service.Weights.Passing)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "4.4.4.4", actual[1].Service.Address)
require.Equal(r, 11, actual[1].Service.Weights.Passing)
require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID)
})
}

func TestServiceWeight_nonIntWeight(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "non-int"
svc.Spec.ExternalIPs = []string{"3.3.3.3", "4.4.4.4"}

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 2)
require.Equal(r, "foo", actual[0].Service.Service)
require.Equal(r, "3.3.3.3", actual[0].Service.Address)
require.Equal(r, 0, actual[0].Service.Weights.Passing)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "4.4.4.4", actual[1].Service.Address)
require.Equal(r, 0, actual[1].Service.Weights.Passing)
srahul3 marked this conversation as resolved.
Show resolved Hide resolved
require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID)
})
}

// Test that we're default enabled.
func TestServiceResource_defaultEnable(t *testing.T) {
t.Parallel()
Expand Down