Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Add group size environment variable injection #208

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ const (
// address the leader via the headless service.
LwsLeaderAddress string = "LWS_LEADER_ADDRESS"

// Environment variable added to all containers in the LeaderWorkerSet to
// track the size of the LWS group.
LwsGroupSize string = "LWS_GROUP_SIZE"

// Subgroup index tracks which subgroup the pod is part of. It will be added
// as a label to the pod only if LeaderWorkerSet.Spec.SubGroupSize is set.
SubGroupIndexLabelKey string = "leaderworkerset.sigs.k8s.io/subgroup-index"
Expand Down
50 changes: 41 additions & 9 deletions pkg/utils/pod/pod_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
)
Expand Down Expand Up @@ -91,25 +92,43 @@ func getPodConditionFromList(conditions []corev1.PodCondition, conditionType cor
return -1, nil
}

func addEnvVarIfNotExists(c *corev1.Container, e corev1.EnvVar) {
// addEnvVarsIfNotExists adds env vars to the container if they don't already exist.
// It takes a slice of existing env vars and appends new env vars to the beginning of it,
// ensuring that the 'firstEnv' and 'e' env vars maintain their order at the front.
// The function then adds any existing env vars from 'c.Env' that are not in 'e' to the newEnvVars slice.
func addEnvVarsIfNotExists(c *corev1.Container, firstEnv corev1.EnvVar, e ...corev1.EnvVar) {
newEnvVars := make([]corev1.EnvVar, 0)

// Add firstEnv to the beginning of the newEnvVars slice
newEnvVars = append(newEnvVars, firstEnv)
newEnvVars = append(newEnvVars, e...)

// Add existing env vars from c.Env that are not in e to newEnvVars
for _, env := range c.Env {
if env.Name == e.Name {
return
exists := false
for _, newEnv := range newEnvVars {
if newEnv.Name == env.Name {
exists = true
break
}
}
if !exists {
newEnvVars = append(newEnvVars, env)
}
}
c.Env = append([]corev1.EnvVar{e}, c.Env...)
c.Env = newEnvVars
}

// AddLWSVariables adds LWS_LEADER_ADDRESS environment variable to every container.
// AddLWSVariables adds environment variable to every container.
func AddLWSVariables(pod *corev1.Pod) error {
lwsName, found := pod.Labels[leaderworkerset.SetNameLabelKey]
if !found {
return fmt.Errorf("Failure constructing environment variables, no name label found for pod %v", pod.Name)
return fmt.Errorf("Failure constructing environment variables, no name label found for pod %v", klog.KObj(pod))
}

groupIndex, found := pod.Labels[leaderworkerset.GroupIndexLabelKey]
if !found {
return fmt.Errorf("Failure constructing environment variables, no group index label found for pod %v", pod.Name)
return fmt.Errorf("Failure constructing environment variables, no group index label found for pod %v", klog.KObj(pod))
}

// The headless service name is assumed to be the same as the LWS name.
Expand All @@ -119,11 +138,24 @@ func AddLWSVariables(pod *corev1.Pod) error {
Value: fmt.Sprintf("%s-%s.%s.%s", lwsName, groupIndex, lwsName, pod.ObjectMeta.Namespace),
}

size, found := pod.Annotations[leaderworkerset.SizeAnnotationKey]
if !found {
return fmt.Errorf("Failure constructing environment variables, no size annotation found for pod %v", klog.KObj(pod))
}

// The group size is assumed to be the same as the number of replicas.
sizeEnvVar := corev1.EnvVar{
Name: leaderworkerset.LwsGroupSize,
Value: size,
}

// The order of injection needs attention, see
// https://github.com/kubernetes-sigs/lws/pull/152
for i := range pod.Spec.Containers {
addEnvVarIfNotExists(&pod.Spec.Containers[i], leaderAddressEnvVar)
addEnvVarsIfNotExists(&pod.Spec.Containers[i], leaderAddressEnvVar, sizeEnvVar)
}
for i := range pod.Spec.InitContainers {
addEnvVarIfNotExists(&pod.Spec.InitContainers[i], leaderAddressEnvVar)
addEnvVarsIfNotExists(&pod.Spec.InitContainers[i], leaderAddressEnvVar, sizeEnvVar)
}

return nil
Expand Down
24 changes: 18 additions & 6 deletions pkg/utils/pod/pod_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pod

import (
"strconv"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -104,36 +105,43 @@ func TestAddLWSVariables(t *testing.T) {
name string
pod *corev1.Pod
expectedLwsLeaderAddress string
expectedGroupSize int
}{
{
name: "Leader pod",
pod: testutils.MakePodWithLabels("test-sample", "0", "", "default"),
pod: testutils.MakePodWithLabels("test-sample", "0", "", "default", 3),
expectedLwsLeaderAddress: "test-sample-0.test-sample.default",
expectedGroupSize: 3,
},
{
name: "Worker pod",
pod: testutils.MakePodWithLabels("test-sample", "0", "1", "default"),
pod: testutils.MakePodWithLabels("test-sample", "0", "1", "default", 3),
expectedLwsLeaderAddress: "test-sample-0.test-sample.default",
expectedGroupSize: 3,
},
{
name: "Leader pod, group 1",
pod: testutils.MakePodWithLabels("test-sample", "1", "", "default"),
pod: testutils.MakePodWithLabels("test-sample", "1", "", "default", 2),
expectedLwsLeaderAddress: "test-sample-1.test-sample.default",
expectedGroupSize: 2,
},
{
name: "Worker pod, group 1",
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "default"),
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "default", 2),
expectedLwsLeaderAddress: "test-sample-1.test-sample.default",
expectedGroupSize: 2,
},
{
name: "Leader pod, group 1, non-default namespace",
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "lws"),
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "lws", 2),
expectedLwsLeaderAddress: "test-sample-1.test-sample.lws",
expectedGroupSize: 2,
},
{
name: "Worker pod, group 1, non-default namespace",
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "lws"),
pod: testutils.MakePodWithLabels("test-sample", "1", "3", "lws", 2),
expectedLwsLeaderAddress: "test-sample-1.test-sample.lws",
expectedGroupSize: 2,
},
}

Expand All @@ -157,6 +165,10 @@ func TestAddLWSVariables(t *testing.T) {
if diff := cmp.Diff(envVar.Value, tc.expectedLwsLeaderAddress); diff != "" {
t.Errorf("Unexpected lws leader address %s", diff)
}
envVar = container.Env[1]
if diff := cmp.Diff(envVar.Value, strconv.Itoa(tc.expectedGroupSize)); diff != "" {
t.Errorf("Unexpected lws group size %s", diff)
}
}
})
}
Expand Down
8 changes: 6 additions & 2 deletions test/integration/webhooks/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ var _ = ginkgo.Describe("leaderworkerset pod defaulting, creation and update", f
return nil
},
}),
ginkgo.Entry("Leader address env var should be populated and is the first env var", &testDefaultingCase{
ginkgo.Entry("Leader env var should be populated and leader address env should be the first env var", &testDefaultingCase{
makePod: func(ns *corev1.Namespace) corev1.Pod {
return corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -750,12 +750,16 @@ var _ = ginkgo.Describe("leaderworkerset pod defaulting, creation and update", f
},
checkExpectedPod: func(expected corev1.Pod, got corev1.Pod) error {
if !testutils.HasLWSEnvVarsPopulated(got) {
return fmt.Errorf("should expect leader address env var for pod %s", got.Name)
return fmt.Errorf("should expect lws env var for pod %s", got.Name)
}
expectedLeaderAddress := fmt.Sprintf("test-sample-1.test-sample.%s", expected.ObjectMeta.Namespace)
if err := testutils.CheckContainerHasCorrectEnvVar(got, corev1.EnvVar{Name: leaderworkerset.LwsLeaderAddress, Value: expectedLeaderAddress}); err != nil {
return err
}
expectedGroupSize := fmt.Sprintf("%d", 2)
if err := testutils.CheckContainerHasCorrectEnvVar(got, corev1.EnvVar{Name: leaderworkerset.LwsGroupSize, Value: expectedGroupSize}); err != nil {
return err
}
if err := testutils.IsContainerFirstEnvVarLWSLeaderAddress(got); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion test/testutils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func hasAllEnvVarPopulated(pod corev1.Pod, envVars []string) bool {
}

func HasLWSEnvVarsPopulated(pod corev1.Pod) bool {
return hasAllEnvVarPopulated(pod, []string{leaderworkerset.LwsLeaderAddress})
return hasAllEnvVarPopulated(pod, []string{leaderworkerset.LwsLeaderAddress, leaderworkerset.LwsGroupSize})
}

func CheckContainerHasCorrectEnvVar(pod corev1.Pod, expect corev1.EnvVar) error {
Expand Down
6 changes: 5 additions & 1 deletion test/testutils/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package testutils

import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -147,7 +148,7 @@ func BuildLeaderWorkerSet(nsName string) *LeaderWorkerSetWrapper {
}
}

func MakePodWithLabels(setName, groupIndex, workerIndex, namespace string) *corev1.Pod {
func MakePodWithLabels(setName, groupIndex, workerIndex, namespace string, size int) *corev1.Pod {
podName := fmt.Sprintf("%s-%s-%s", setName, groupIndex, workerIndex)
if workerIndex == "0" {
podName = fmt.Sprintf("%s-%s", setName, groupIndex)
Expand All @@ -161,6 +162,9 @@ func MakePodWithLabels(setName, groupIndex, workerIndex, namespace string) *core
leaderworkerset.GroupIndexLabelKey: groupIndex,
leaderworkerset.SetNameLabelKey: setName,
},
Annotations: map[string]string{
leaderworkerset.SizeAnnotationKey: strconv.Itoa(size),
},
},
}
}
Expand Down