Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of Adding support for weighted k8s service into release/1.0.x #2403

3 changes: 3 additions & 0 deletions .changelog/2293.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
sync-catalog: add ability to support weighted loadbalancing by service annotation `consul.hashicorp.com/service-weight: <number>`
```
6 changes: 6 additions & 0 deletions control-plane/catalog/to-consul/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ const (
// annotationServiceMetaPrefix is the prefix for setting meta key/value
// for a service. The remainder of the key is the meta key.
annotationServiceMetaPrefix = "consul.hashicorp.com/service-meta-"

// annotationServiceWeight is the key of the annotation that determines
// the traffic weight of the service which is spanned over multiple k8s cluster.
// e.g. Service `backend` in k8s cluster `A` receives 25% of the traffic
// compared to same `backend` service in k8s cluster `B`.
annotationServiceWeight = "consul.hashicorp.com/service-weight"
)
41 changes: 41 additions & 0 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,19 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, ip)
r.Service.Address = ip
// Adding information about service weight.
// Overrides the existing weight if present
if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" {
weightI, err := getServiceWeight(weight)
if err == nil {
r.Service.Weights = consulapi.AgentWeights{
Passing: weightI,
}
} else {
t.Log.Debug("[generateRegistrations] service weight err: ", err)
}
}

t.consulMap[key] = append(t.consulMap[key], &r)
}

Expand Down Expand Up @@ -544,6 +557,19 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr

// Adding information about service weight.
// Overrides the existing weight if present
if weight, ok := svc.Annotations[annotationServiceWeight]; ok && weight != "" {
weightI, err := getServiceWeight(weight)
if err == nil {
r.Service.Weights = consulapi.AgentWeights{
Passing: weightI,
}
} else {
t.Log.Debug("[generateRegistrations] service weight err: ", err)
}
}

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
Expand Down Expand Up @@ -996,3 +1022,18 @@ func (t *ServiceResource) isIngressService(key string) bool {
func consulHealthCheckID(k8sNS string, serviceID string) string {
return fmt.Sprintf("%s/%s", k8sNS, serviceID)
}

// Calculates the passing service weight.
func getServiceWeight(weight string) (int, error) {
// error validation if the input param is a number
weightI, err := strconv.Atoi(weight)
if err != nil {
return -1, err
}

if weightI <= 1 {
return -1, fmt.Errorf("expecting the service annotation %s value to be greater than 1", annotationServiceWeight)
}

return weightI, nil
}
133 changes: 133 additions & 0 deletions control-plane/catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,139 @@ func TestServiceResource_createDelete(t *testing.T) {
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_ingress(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "3.3.3.3"},
)

svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "4.4.4.4"},
)

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 3)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "3.3.3.3", actual[1].Service.Address)
require.Equal(r, 22, actual[1].Service.Weights.Passing)
require.Equal(r, "foo", actual[2].Service.Service)
require.Equal(r, "4.4.4.4", actual[2].Service.Address)
require.Equal(r, 22, actual[2].Service.Weights.Passing)
require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID)
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_externalIP(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Spec.ExternalIPs = []string{"3.3.3.3", "4.4.4.4"}

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 2)
require.Equal(r, "foo", actual[0].Service.Service)
require.Equal(r, "3.3.3.3", actual[0].Service.Address)
require.Equal(r, 22, actual[0].Service.Weights.Passing)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "4.4.4.4", actual[1].Service.Address)
require.Equal(r, 22, actual[1].Service.Weights.Passing)
require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID)
})
}

// Test service weight.
func TestServiceWeight(t *testing.T) {
t.Parallel()
cases := map[string]struct {
Weight string
ExpectError bool
ExtectedWeight int
}{
"external-IP": {
Weight: "22",
ExpectError: false,
ExtectedWeight: 22,
},
"non-int-weight": {
Weight: "non-int",
ExpectError: true,
ExtectedWeight: 0,
},
"one-weight": {
Weight: "1",
ExpectError: true,
ExtectedWeight: 0,
},
"zero-weight": {
Weight: "0",
ExpectError: true,
ExtectedWeight: 0,
},
"negative-weight": {
Weight: "-2",
ExpectError: true,
ExtectedWeight: 0,
},
"greater-than-100-is-allowed": {
Weight: "1000",
ExpectError: false,
ExtectedWeight: 1000,
},
}

for name, c := range cases {
t.Run(name, func(tt *testing.T) {
weightI, err := getServiceWeight(c.Weight)
// Verify what we got
retry.Run(tt, func(r *retry.R) {
if c.ExpectError {
require.Error(r, err)
} else {
require.Equal(r, c.ExtectedWeight, weightI)
}
})
})
}
}

// Test that we're default enabled.
func TestServiceResource_defaultEnable(t *testing.T) {
t.Parallel()
Expand Down