Skip to content

Commit

Permalink
Adding support for weighted k8s service
Browse files Browse the repository at this point in the history
  • Loading branch information
srahul3 committed Jun 7, 2023
1 parent 0c28b9b commit fdef617
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
6 changes: 6 additions & 0 deletions control-plane/catalog/to-consul/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ const (
// annotationServiceMetaPrefix is the prefix for setting meta key/value
// for a service. The remainder of the key is the meta key.
annotationServiceMetaPrefix = "consul.hashicorp.com/service-meta-"

// annotationServiceWeight is the key of the annotation that determines
// the traffic weight of the service which is spanned over multiple k8s cluster.
// e.g. Service `backend` in k8s cluster `A` receives 25% of the traffic
// compared to same `backend` service in k8s cluster `B`
annotationServiceWeight = "consul.hashicorp.com/service-weight"
)
40 changes: 40 additions & 0 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, ip)
r.Service.Address = ip
// Adding information about service weight.
// Overrides the existing weight if present
if weight, ok := svc.Annotations[annotationServiceWeight]; ok || weight != "" {
err := t.setServiceWeight(weight, len(ips), r)
if err != nil {
t.Log.Debug("assertion: The service with ",
"key", key,
"service", baseService.Service,
"namespace", baseService.Namespace,
"expects a positive integer value, however observed value was not integer: ", weight)
}
}
t.consulMap[key] = append(t.consulMap[key], &r)
}

Expand Down Expand Up @@ -547,6 +559,20 @@ func (t *ServiceResource) generateRegistrations(key string) {
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr

// Adding information about service weight.
// Overrides the existing weight if present
if weight, ok := svc.Annotations[annotationServiceWeight]; ok || weight != "" {
ingress_count := len(svc.Status.LoadBalancer.Ingress)
err := t.setServiceWeight(weight, ingress_count, r)
if err != nil {
t.Log.Debug("assertion: The service with ",
"key", key,
"service", baseService.Service,
"namespace", baseService.Namespace,
"expects a positive integer value, however observed value was not integer: ", weight)
}
}

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
Expand Down Expand Up @@ -641,6 +667,20 @@ func (t *ServiceResource) generateRegistrations(key string) {
}
}

// Sets the passing service weight
func (t *ServiceResource) setServiceWeight(weight string, appsCount int, r consulapi.CatalogRegistration) error {
// error validation if the input param is a number
weightI, err := strconv.Atoi(weight)
if err != nil {
return err
} else if weightI > 1 {
r.Service.Weights = consulapi.AgentWeights{
Passing: weightI / appsCount,
}
}
return nil
}

func (t *ServiceResource) registerServiceInstance(
baseNode consulapi.CatalogRegistration,
baseService consulapi.AgentService,
Expand Down
112 changes: 112 additions & 0 deletions control-plane/catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,118 @@ func TestServiceResource_createDelete(t *testing.T) {
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_ingress(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "3.3.3.3"},
)

svc.Status.LoadBalancer.Ingress = append(
svc.Status.LoadBalancer.Ingress,
corev1.LoadBalancerIngress{IP: "4.4.4.4"},
)

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 3)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "3.3.3.3", actual[1].Service.Address)
require.Equal(r, 7, actual[1].Service.Weights.Passing)
require.Equal(r, "foo", actual[2].Service.Service)
require.Equal(r, "4.4.4.4", actual[2].Service.Address)
require.Equal(r, 7, actual[2].Service.Weights.Passing)
require.NotEqual(r, actual[1].Service.ID, actual[2].Service.ID)
})
}

// Test that Loadbalancer service weight is set from service annotation.
func TestServiceWeight_externalIP(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "22"
svc.Spec.ExternalIPs = []string{"3.3.3.3", "4.4.4.4"}

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 2)
require.Equal(r, "foo", actual[0].Service.Service)
require.Equal(r, "3.3.3.3", actual[0].Service.Address)
require.Equal(r, 11, actual[0].Service.Weights.Passing)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "4.4.4.4", actual[1].Service.Address)
require.Equal(r, 11, actual[1].Service.Weights.Passing)
require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID)
})
}

func TestServiceWeight_nonIntWeight(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert an LB service
svc := lbService("foo", metav1.NamespaceDefault, "1.2.3.4")
svc.Annotations[annotationServiceWeight] = "non-int"
svc.Spec.ExternalIPs = []string{"3.3.3.3", "4.4.4.4"}

_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 2)
require.Equal(r, "foo", actual[0].Service.Service)
require.Equal(r, "3.3.3.3", actual[0].Service.Address)
require.Equal(r, 0, actual[0].Service.Weights.Passing)
require.Equal(r, "foo", actual[1].Service.Service)
require.Equal(r, "4.4.4.4", actual[1].Service.Address)
require.Equal(r, 0, actual[1].Service.Weights.Passing)
require.NotEqual(r, actual[0].Service.ID, actual[1].Service.ID)
})
}

// Test that we're default enabled.
func TestServiceResource_defaultEnable(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit fdef617

Please sign in to comment.