Skip to content

Commit

Permalink
feature: Add group size environment variable injection
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Sep 4, 2024
1 parent d141696 commit 4c09205
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 20 deletions.
4 changes: 4 additions & 0 deletions api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ const (
// address the leader via the headless service.
LwsLeaderAddress string = "LWS_LEADER_ADDRESS"

// Environment variable added to all containers in the LeaderWorkerSet to
// track the size of the LWS group.
LwsGroupSize string = "LWS_GROUP_SIZE"

// Subgroup index tracks which subgroup the pod is part of. It will be added
// as a label to the pod only if LeaderWorkerSet.Spec.SubGroupSize is set.
SubGroupIndexLabelKey string = "leaderworkerset.sigs.k8s.io/subgroup-index"
Expand Down
38 changes: 29 additions & 9 deletions pkg/utils/pod/pod_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
)
Expand Down Expand Up @@ -91,25 +92,31 @@ func getPodConditionFromList(conditions []corev1.PodCondition, conditionType cor
return -1, nil
}

func addEnvVarIfNotExists(c *corev1.Container, e corev1.EnvVar) {
for _, env := range c.Env {
if env.Name == e.Name {
return
func addEnvVarsIfNotExists(c *corev1.Container, e ...corev1.EnvVar) {
for _, newEnv := range e {
exists := false
for _, env := range c.Env {
if env.Name == newEnv.Name {
exists = true
break
}
}
if !exists {
c.Env = append([]corev1.EnvVar{newEnv}, c.Env...)
}
}
c.Env = append([]corev1.EnvVar{e}, c.Env...)
}

// AddLWSVariables adds LWS_LEADER_ADDRESS environment variable to every container.
func AddLWSVariables(pod *corev1.Pod) error {
lwsName, found := pod.Labels[leaderworkerset.SetNameLabelKey]
if !found {
return fmt.Errorf("Failure constructing environment variables, no name label found for pod %v", pod.Name)
return fmt.Errorf("Failure constructing environment variables, no name label found for pod %v", klog.KObj(pod))
}

groupIndex, found := pod.Labels[leaderworkerset.GroupIndexLabelKey]
if !found {
return fmt.Errorf("Failure constructing environment variables, no group index label found for pod %v", pod.Name)
return fmt.Errorf("Failure constructing environment variables, no group index label found for pod %v", klog.KObj(pod))
}

// The headless service name is assumed to be the same as the LWS name.
Expand All @@ -119,11 +126,24 @@ func AddLWSVariables(pod *corev1.Pod) error {
Value: fmt.Sprintf("%s-%s.%s.%s", lwsName, groupIndex, lwsName, pod.ObjectMeta.Namespace),
}

size, found := pod.Annotations[leaderworkerset.SizeAnnotationKey]
if !found {
return fmt.Errorf("Failure constructing environment variables, no size annotation found for pod %v", klog.KObj(pod))
}

// The group size is assumed to be the same as the number of replicas.
sizeEnvVar := corev1.EnvVar{
Name: leaderworkerset.LwsGroupSize,
Value: size,
}

// The order of injection needs attention, see
// https://github.com/kubernetes-sigs/lws/pull/152.
for i := range pod.Spec.Containers {
addEnvVarIfNotExists(&pod.Spec.Containers[i], leaderAddressEnvVar)
addEnvVarsIfNotExists(&pod.Spec.Containers[i], sizeEnvVar, leaderAddressEnvVar)
}
for i := range pod.Spec.InitContainers {
addEnvVarIfNotExists(&pod.Spec.InitContainers[i], leaderAddressEnvVar)
addEnvVarsIfNotExists(&pod.Spec.InitContainers[i], sizeEnvVar, leaderAddressEnvVar)
}

return nil
Expand Down
25 changes: 18 additions & 7 deletions pkg/utils/pod/pod_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pod

import (
"strconv"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -104,36 +105,43 @@ func TestAddLWSVariables(t *testing.T) {
name string
pod *corev1.Pod
expectedLwsLeaderAddress string
expectedGroupSize int
}{
{
name: "Leader pod",
pod: testutils.MakePodWithLabels("test-sample", "0", "", "default"),
pod: testutils.MakePodWithLabels("test-sample", "0", "", "default", 2),
expectedLwsLeaderAddress: "test-sample-0.test-sample.default",
expectedGroupSize: 2,
},
{
name: "Worker pod",
pod: testutils.MakePodWithLabels("test-sample", "0", "1", "default"),
pod: testutils.MakePodWithLabels("test-sample", "0", "1", "default", 2),
expectedLwsLeaderAddress: "test-sample-0.test-sample.default",
expectedGroupSize: 2,
},
{
name: "Leader pod, group 1",
pod: testutils.MakePodWithLabels("test-sample", "1", "", "default"),
pod: testutils.MakePodWithLabels("test-sample", "1", "", "default", 2),
expectedLwsLeaderAddress: "test-sample-1.test-sample.default",
expectedGroupSize: 2,
},
{
name: "Worker pod, group 1",
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "default"),
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "default", 2),
expectedLwsLeaderAddress: "test-sample-1.test-sample.default",
expectedGroupSize: 2,
},
{
name: "Leader pod, group 1, non-default namespace",
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "lws"),
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "lws", 2),
expectedLwsLeaderAddress: "test-sample-1.test-sample.lws",
expectedGroupSize: 2,
},
{
name: "Worker pod, group 1, non-default namespace",
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "lws"),
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "lws", 2),
expectedLwsLeaderAddress: "test-sample-1.test-sample.lws",
expectedGroupSize: 2,
},
}

Expand All @@ -152,11 +160,14 @@ func TestAddLWSVariables(t *testing.T) {
if len(container.Env) == 0 {
t.Errorf("Failed to add LWS Variables to container %+v", container)
}

envVar := container.Env[0]
if diff := cmp.Diff(envVar.Value, tc.expectedLwsLeaderAddress); diff != "" {
t.Errorf("Unexpected lws leader address %s", diff)
}
envVar = container.Env[1]
if diff := cmp.Diff(envVar.Value, strconv.Itoa(tc.expectedGroupSize)); diff != "" {
t.Errorf("Unexpected lws group size %s", diff)
}
}
})
}
Expand Down
8 changes: 6 additions & 2 deletions test/integration/webhooks/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ var _ = ginkgo.Describe("leaderworkerset pod defaulting, creation and update", f
return nil
},
}),
ginkgo.Entry("Leader address env var should be populated and is the first env var", &testDefaultingCase{
ginkgo.Entry("Leader env var should be populated and leader address env should be the first env var", &testDefaultingCase{
makePod: func(ns *corev1.Namespace) corev1.Pod {
return corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -750,12 +750,16 @@ var _ = ginkgo.Describe("leaderworkerset pod defaulting, creation and update", f
},
checkExpectedPod: func(expected corev1.Pod, got corev1.Pod) error {
if !testutils.HasLWSEnvVarsPopulated(got) {
return fmt.Errorf("should expect leader address env var for pod %s", got.Name)
return fmt.Errorf("should expect lws env var for pod %s", got.Name)
}
expectedLeaderAddress := fmt.Sprintf("test-sample-1.test-sample.%s", expected.ObjectMeta.Namespace)
if err := testutils.CheckContainerHasCorrectEnvVar(got, corev1.EnvVar{Name: leaderworkerset.LwsLeaderAddress, Value: expectedLeaderAddress}); err != nil {
return err
}
expectedGroupSize := fmt.Sprintf("%d", 2)
if err := testutils.CheckContainerHasCorrectEnvVar(got, corev1.EnvVar{Name: leaderworkerset.LwsGroupSize, Value: expectedGroupSize}); err != nil {
return err
}
if err := testutils.IsContainerFirstEnvVarLWSLeaderAddress(got); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion test/testutils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func hasAllEnvVarPopulated(pod corev1.Pod, envVars []string) bool {
}

func HasLWSEnvVarsPopulated(pod corev1.Pod) bool {
return hasAllEnvVarPopulated(pod, []string{leaderworkerset.LwsLeaderAddress})
return hasAllEnvVarPopulated(pod, []string{leaderworkerset.LwsLeaderAddress, leaderworkerset.LwsGroupSize})
}

func CheckContainerHasCorrectEnvVar(pod corev1.Pod, expect corev1.EnvVar) error {
Expand Down
6 changes: 5 additions & 1 deletion test/testutils/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package testutils

import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -147,7 +148,7 @@ func BuildLeaderWorkerSet(nsName string) *LeaderWorkerSetWrapper {
}
}

func MakePodWithLabels(setName, groupIndex, workerIndex, namespace string) *corev1.Pod {
func MakePodWithLabels(setName, groupIndex, workerIndex, namespace string, size int) *corev1.Pod {
podName := fmt.Sprintf("%s-%s-%s", setName, groupIndex, workerIndex)
if workerIndex == "0" {
podName = fmt.Sprintf("%s-%s", setName, groupIndex)
Expand All @@ -161,6 +162,9 @@ func MakePodWithLabels(setName, groupIndex, workerIndex, namespace string) *core
leaderworkerset.GroupIndexLabelKey: groupIndex,
leaderworkerset.SetNameLabelKey: setName,
},
Annotations: map[string]string{
leaderworkerset.SizeAnnotationKey: strconv.Itoa(size),
},
},
}
}
Expand Down

0 comments on commit 4c09205

Please sign in to comment.