Skip to content

Commit

Permalink
refactor(runtime): make topologySpread configurable (#643)
Browse files Browse the repository at this point in the history
Read the TopologySpreadConstraints config from the YAML config
so we can tune based on cluster/game. By default it will be enabled.
Using a low value for maxSkew leads to underutilized nodes which
impacts in cost.
  • Loading branch information
hspedro authored Oct 16, 2024
1 parent bde7835 commit 9682f8f
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 37 deletions.
5 changes: 5 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ adapters:
inCluster: false
masterUrl: "https://127.0.0.1:6443"
kubeconfig: "./kubeconfig/kubeconfig.yaml"
topologySpreadConstraint:
enabled: true
maxSkew: 5
topologyKey: "topology.kubernetes.io/zone"
whenUnsatisfiableScheduleAnyway: false
grpc:
keepAlive:
time: 30s
Expand Down
2 changes: 1 addition & 1 deletion internal/adapters/runtime/kubernetes/game_room.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
)

func (k *kubernetes) CreateGameRoomInstance(ctx context.Context, scheduler *entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec) (*game_room.Instance, error) {
pod, err := convertGameRoomSpec(*scheduler, gameRoomName, gameRoomSpec)
pod, err := convertGameRoomSpec(*scheduler, gameRoomName, gameRoomSpec, k.config)
if err != nil {
return nil, errors.NewErrInvalidArgument("invalid game room spec: %s", err)
}
Expand Down
41 changes: 23 additions & 18 deletions internal/adapters/runtime/kubernetes/game_room_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,13 @@ var invalidPodWaitingStates = []string{
"RunContainerError",
}

func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec) (*v1.Pod, error) {
func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec, config KubernetesConfig) (*v1.Pod, error) {
defaultAnnotations := map[string]string{safeToEvictAnnotation: safeToEvictValue}
defaultLabels := map[string]string{
maestroLabelKey: maestroLabelValue,
schedulerLabelKey: scheduler.Name,
versionLabelKey: gameRoomSpec.Version,
}

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: gameRoomName,
Expand All @@ -87,25 +86,31 @@ func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, game
Containers: []v1.Container{},
Tolerations: convertSpecTolerations(gameRoomSpec),
Affinity: convertSpecAffinity(gameRoomSpec),
// TODO: make it configurable
// 1. Add to proto/API
// 2. Generate message
// 3. Read from it
// 4. Add to game_room.Spec
// 5. Add a conversion function
TopologySpreadConstraints: []v1.TopologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: v1.ScheduleAnyway,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
schedulerLabelKey: scheduler.Name,
},
},
}
if config.TopologySpreadConstraintConfig.Enabled {
whenUnsatisfiable := v1.DoNotSchedule
if config.TopologySpreadConstraintConfig.WhenUnsatisfiableScheduleAnyway {
whenUnsatisfiable = v1.ScheduleAnyway
}
// TODO: make it configurable on scheduler
// 1. Add to proto/API
// 2. Generate message
// 3. Read from it
// 4. Add to game_room.Spec
// 5. Add a conversion function
pod.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{
{
MaxSkew: int32(config.TopologySpreadConstraintConfig.MaxSkew),
TopologyKey: config.TopologySpreadConstraintConfig.TopologyKey,
WhenUnsatisfiable: whenUnsatisfiable,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
schedulerLabelKey: scheduler.Name,
},
},
},
},
}
}
for _, container := range gameRoomSpec.Containers {
podContainer, err := convertContainer(container, scheduler.Name, pod.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func TestConvertGameSpec(t *testing.T) {
// Name: test.scheduler,
// Annotations: test.expectedPod.ObjectMeta.Annotations,
//}
res, err := convertGameRoomSpec(test.scheduler, test.roomName, test.gameSpec)
res, err := convertGameRoomSpec(test.scheduler, test.roomName, test.gameSpec, KubernetesConfig{})
if test.withError {
require.Error(t, err)
return
Expand Down
6 changes: 3 additions & 3 deletions internal/adapters/runtime/kubernetes/game_room_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestGameRoomCreation(t *testing.T) {
ctx := context.Background()
gameRoomName := "some-game-room-name"
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

t.Run("successfully create a room", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestGameRoomDeletion(t *testing.T) {
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
gameRoomName := "some-game-room"
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

t.Run("successfully delete a room", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestCreateGameRoomName(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

t.Run("When scheduler name is greater than max name length minus randomLength", func(t *testing.T) {
t.Run("return the name with randomLength", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestGameRoomsWatch(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

scheduler := &entities.Scheduler{Name: "watch-room-addition"}
err := kubernetesRuntime.CreateScheduler(ctx, scheduler)
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestGameRoomsWatch(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

scheduler := &entities.Scheduler{Name: "watch-room-ready"}
err := kubernetesRuntime.CreateScheduler(ctx, scheduler)
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestGameRoomsWatch(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

scheduler := &entities.Scheduler{Name: "watch-room-error"}
err := kubernetesRuntime.CreateScheduler(ctx, scheduler)
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestGameRoomsWatch(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

scheduler := &entities.Scheduler{Name: "watch-room-delete"}
err := kubernetesRuntime.CreateScheduler(ctx, scheduler)
Expand Down
26 changes: 25 additions & 1 deletion internal/adapters/runtime/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,40 @@ import (

var _ ports.Runtime = (*kubernetes)(nil)

const (
DefaultMaxSkew = 5
DefaultTopologyKey = "topology.kubernetes.io/zone"
)

type TopologySpreadConstraintConfig struct {
Enabled bool
MaxSkew int
TopologyKey string
WhenUnsatisfiableScheduleAnyway bool
}

type KubernetesConfig struct {
TopologySpreadConstraintConfig TopologySpreadConstraintConfig
}

type kubernetes struct {
clientSet kube.Interface
logger *zap.Logger
eventRecorder record.EventRecorder
config KubernetesConfig
}

func New(clientSet kube.Interface) *kubernetes {
func New(clientSet kube.Interface, config KubernetesConfig) *kubernetes {
if config.TopologySpreadConstraintConfig.MaxSkew <= 0 {
config.TopologySpreadConstraintConfig.MaxSkew = DefaultMaxSkew
}
if config.TopologySpreadConstraintConfig.TopologyKey == "" {
config.TopologySpreadConstraintConfig.TopologyKey = DefaultTopologyKey
}
k := &kubernetes{
clientSet: clientSet,
logger: zap.L().With(zap.String(logs.LogFieldRuntime, "kubernetes")),
config: config,
}

eventBroadcaster := record.NewBroadcaster()
Expand Down
6 changes: 3 additions & 3 deletions internal/adapters/runtime/kubernetes/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
func TestSchedulerCreation(t *testing.T) {
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

t.Run("create single scheduler", func(t *testing.T) {
scheduler := &entities.Scheduler{Name: "single-scheduler-test", PdbMaxUnavailable: "5%"}
Expand All @@ -66,7 +66,7 @@ func TestSchedulerCreation(t *testing.T) {
func TestSchedulerDeletion(t *testing.T) {
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

t.Run("delete scheduler", func(t *testing.T) {
scheduler := &entities.Scheduler{Name: "delete-scheduler-test", PdbMaxUnavailable: "5%"}
Expand All @@ -92,7 +92,7 @@ func TestSchedulerDeletion(t *testing.T) {
func TestPDBCreationAndDeletion(t *testing.T) {
ctx := context.Background()
client := test.GetKubernetesClientSet(t, kubernetesContainer)
kubernetesRuntime := New(client)
kubernetesRuntime := New(client, KubernetesConfig{})

t.Run("create pdb from scheduler without autoscaling", func(t *testing.T) {
if !kubernetesRuntime.isPDBSupported() {
Expand Down
23 changes: 17 additions & 6 deletions internal/service/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@ const (
grpcKeepAliveTimePath = "adapters.grpc.keepalive.time"
grpcKeepAliveTimeoutPath = "adapters.grpc.keepalive.timeout"
// Kubernetes runtime
runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl"
runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig"
runtimeKubernetesInClusterPath = "adapters.runtime.kubernetes.inCluster"
runtimeKubernetesQPS = "adapters.runtime.kubernetes.qps"
runtimeKubernetesBurst = "adapters.runtime.kubernetes.burst"
runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl"
runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig"
runtimeKubernetesInClusterPath = "adapters.runtime.kubernetes.inCluster"
runtimeKubernetesQPS = "adapters.runtime.kubernetes.qps"
runtimeKubernetesBurst = "adapters.runtime.kubernetes.burst"
runtimeKubernetesTopologySpreadEnabled = "adapters.runtime.kubernetes.topologySpreadConstraint.enabled"
runtimeKubernetesTopologySpreadMaxSkew = "adapters.runtime.kubernetes.topologySpreadConstraint.maxSkew"
runtimeKubernetesTopologySpreadTopologyKey = "adapters.runtime.kubernetes.topologySpreadConstraint.topologyKey"
runtimeKubernetesTopologySpreadWhenUnsatisfiableScheduleAnyway = "adapters.runtime.kubernetes.topologySpreadConstraint.whenUnsatisfiableScheduleAnyway"
// Redis operation storage
operationStorageRedisURLPath = "adapters.operationStorage.redis.url"
operationLeaseStorageRedisURLPath = "adapters.operationLeaseStorage.redis.url"
Expand Down Expand Up @@ -146,7 +150,14 @@ func NewRuntimeKubernetes(c config.Config) (ports.Runtime, error) {
return nil, fmt.Errorf("failed to initialize Kubernetes runtime: %w", err)
}

return kubernetesRuntime.New(clientSet), nil
return kubernetesRuntime.New(clientSet, kubernetesRuntime.KubernetesConfig{
TopologySpreadConstraintConfig: kubernetesRuntime.TopologySpreadConstraintConfig{
Enabled: c.GetBool(runtimeKubernetesTopologySpreadEnabled),
MaxSkew: c.GetInt(runtimeKubernetesTopologySpreadMaxSkew),
TopologyKey: c.GetString(runtimeKubernetesTopologySpreadTopologyKey),
WhenUnsatisfiableScheduleAnyway: c.GetBool(runtimeKubernetesTopologySpreadWhenUnsatisfiableScheduleAnyway),
},
}), nil
}

// NewOperationStorageRedis instantiates redis as operation storage.
Expand Down

0 comments on commit 9682f8f

Please sign in to comment.