Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various refactorings #172

Merged
merged 7 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ builddockerlocal: build
done

installkind:
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.11.1/kind-linux-amd64
curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.12.0/kind-linux-amd64
chmod +x ./kind
mkdir -p ./pkg/operator/testbin/bin
mv ./kind ./pkg/operator/testbin/bin/kind

createkindcluster:
./pkg/operator/testbin/bin/kind create cluster --config ./e2e/kind-config.yaml --image kindest/node:v1.22.5
./pkg/operator/testbin/bin/kind create cluster --config ./e2e/kind-config.yaml --image kindest/node:v1.23.4

deletekindcluster:
./pkg/operator/testbin/bin/kind delete cluster
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
k8s.io/api v0.23.0
k8s.io/apimachinery v0.23.0
k8s.io/client-go v0.23.0
sigs.k8s.io/controller-runtime v0.11.0
sigs.k8s.io/controller-runtime v0.11.1
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -982,8 +982,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.25/go.mod h1:Mlj9PNLmG9bZ6BHFwFKDo5afkpWyUISkb9Me0GnK66I=
sigs.k8s.io/controller-runtime v0.11.0 h1:DqO+c8mywcZLFJWILq4iktoECTyn30Bkj0CwgqMpZWQ=
sigs.k8s.io/controller-runtime v0.11.0/go.mod h1:KKwLiTooNGu+JmLZGn9Sl3Gjmfj66eMbCQznLP5zcqA=
sigs.k8s.io/controller-runtime v0.11.1 h1:7YIHT2QnHJArj/dk9aUkYhfqfK5cIxPOX5gPECfdZLU=
sigs.k8s.io/controller-runtime v0.11.1/go.mod h1:KKwLiTooNGu+JmLZGn9Sl3Gjmfj66eMbCQznLP5zcqA=
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 h1:fD1pz4yfdADVNfFmcP2aBEtudwUQ1AlLnRBALr33v3s=
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6/go.mod h1:p4QtZmO4uMYipTQNzagwnNoseA6OxSUutVw05NhYDRs=
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
Expand Down
10 changes: 5 additions & 5 deletions pkg/operator/controllers/gameserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@ func (r *GameServerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}

// other status updates on the GameServer state are provided by the daemonset
// which calls the K8s API server

// if a game server is active, there are players present.
// When using the cluster autoscaler, an annotation will be added
// to prevent the node from being scaled down.
Expand Down Expand Up @@ -199,7 +196,7 @@ func (r *GameServerReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// we're adding the Label here so the DaemonSet watch can get the update information about the GameServer
// unfortunately, we can't track CRDs on a Watch via .status
// unfortunately, we can't track CRDs on a Watch via .status yet. If this was the case, we could PATCH the NodeName via the patch code above
// https://github.com/kubernetes/kubernetes/issues/53459
if _, exists := gs.Labels[LabelNodeName]; !exists {
// code from: https://sdk.operatorframework.io/docs/building-operators/golang/references/client/#patch
Expand Down Expand Up @@ -265,10 +262,12 @@ func (r *GameServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

// addSafeToEvictAnnotationIfNecessary will set the safe-to-evict attribute to true on Initalizing or StandingBy GameServers
// and will set it to false for Active (so they don't go down during a potential cluster scale down)
func (r *GameServerReconciler) addSafeToEvictAnnotationIfNecessary(ctx context.Context, gs *mpsv1alpha1.GameServer, pod *corev1.Pod) error {
// we don't need to check if pod.ObjectMeta.Annotations is nil since the check below accomodates for that
// https://go.dev/play/p/O9QmzPnKsOK
if gs.Status.State == mpsv1alpha1.GameServerStateStandingBy {
if gs.Status.State == mpsv1alpha1.GameServerStateInitializing || gs.Status.State == mpsv1alpha1.GameServerStateStandingBy {
if _, ok := pod.ObjectMeta.Annotations[safeToEvictPodAttribute]; !ok {
return r.patchPodSafeToEvictAnnotation(ctx, pod, true)
}
Expand All @@ -281,6 +280,7 @@ func (r *GameServerReconciler) addSafeToEvictAnnotationIfNecessary(ctx context.C
return nil
}

// patchPodSafeToEvictAnnotation will set the safeToEvictPodAttribute annotation on the Pod
func (r *GameServerReconciler) patchPodSafeToEvictAnnotation(ctx context.Context, pod *corev1.Pod, safeToEvict bool) error {
patch := client.MergeFrom(pod.DeepCopy())
if pod.ObjectMeta.Annotations == nil {
Expand Down
133 changes: 72 additions & 61 deletions pkg/operator/controllers/gameserverbuild_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package controllers
import (
"context"
"fmt"
"sort"
"math"
"sync"

mpsv1alpha1 "github.com/playfab/thundernetes/pkg/operator/api/v1alpha1"
Expand Down Expand Up @@ -162,68 +162,34 @@ func (r *GameServerBuildReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
}

// we are sorting GameServers from newest to oldest, since newest have more chances of being in an initializing or pending state
// prioritizing deletion of newest GameServers, if this is needed
sort.SliceStable(gameServers.Items, func(i, j int) bool {
return gameServers.Items[i].GetCreationTimestamp().After(gameServers.Items[j].GetCreationTimestamp().Time)
})
nonActiveGameServersCount := standingByCount + initializingCount + pendingCount

// user has decreased standingBy numbers
if pendingCount+initializingCount+standingByCount > gsb.Spec.StandingBy {
for i := 0; i < pendingCount+initializingCount+standingByCount-gsb.Spec.StandingBy && i < maxNumberOfGameServersToDelete; i++ {
gs := gameServers.Items[i]
// we're deleting only initializing/pending/standingBy servers, never touching active
if gs.Status.State == "" || gs.Status.State == mpsv1alpha1.GameServerStateInitializing || gs.Status.State == mpsv1alpha1.GameServerStateStandingBy {
// we're requesting the GameServer to be deleted to have the same ResourceVersion
// since it might have been updated (e.g. allocated) and the cache hasn't been updated yet
if err := r.Delete(ctx, &gs, &client.DeleteOptions{
Preconditions: &metav1.Preconditions{
ResourceVersion: &gs.ResourceVersion,
},
}); err != nil {
if apierrors.IsConflict(err) { // this GameServer has been updated, skip it
continue
}
return ctrl.Result{}, err
}
GameServersDeletedCounter.WithLabelValues(gsb.Name).Inc()
addGameServerToUnderDeletionMap(gsb.Name, gs.Name)
r.Recorder.Eventf(&gsb, corev1.EventTypeNormal, "GameServer deleted", "GameServer %s deleted", gs.Name)
}
if nonActiveGameServersCount > gsb.Spec.StandingBy {
totalNumberOfGameServersToDelete := int(math.Min(float64(nonActiveGameServersCount-gsb.Spec.StandingBy), maxNumberOfGameServersToDelete))
err := r.deleteNonActiveGameServers(ctx, &gsb, &gameServers, totalNumberOfGameServersToDelete)
if err != nil {
return ctrl.Result{}, err
}
}

// we need to check if we are above the max
// this will happen if the user modifies the spec.Max during the GameServerBuild's lifetime
if pendingCount+initializingCount+standingByCount+activeCount > gsb.Spec.Max {
for i := 0; i < pendingCount+initializingCount+standingByCount+activeCount-gsb.Spec.Max && i < maxNumberOfGameServersToDelete; i++ {
gs := gameServers.Items[i]
// we're deleting only standingBy or initializing servers
if gs.Status.State == "" || gs.Status.State == mpsv1alpha1.GameServerStateInitializing || gs.Status.State == mpsv1alpha1.GameServerStateStandingBy {
// we're requesting the GameServer to be deleted to have the same ResourceVersion
// since it might have been updated (e.g. allocated) and the cache hasn't been updated yet
if err := r.Delete(ctx, &gs, &client.DeleteOptions{
Preconditions: &metav1.Preconditions{
ResourceVersion: &gs.ResourceVersion,
},
}); err != nil {
if apierrors.IsConflict(err) { // this GameServer has been updated, skip it
continue
}
return ctrl.Result{}, err
}
GameServersDeletedCounter.WithLabelValues(gsb.Name).Inc()
addGameServerToUnderDeletionMap(gsb.Name, gs.Name)
}
// this can happen if the user modifies the spec.Max during the GameServerBuild's lifetime
if nonActiveGameServersCount+activeCount > gsb.Spec.Max {
totalNumberOfGameServersToDelete := int(math.Min(float64(nonActiveGameServersCount+activeCount-gsb.Spec.Max), maxNumberOfGameServersToDelete))
err := r.deleteNonActiveGameServers(ctx, &gsb, &gameServers, totalNumberOfGameServersToDelete)
if err != nil {
return ctrl.Result{}, err
}
}

nonActiveGameServersCount := standingByCount + initializingCount + pendingCount
// we are in need of standingBy servers, so we're creating them here
// we're also limiting the number of game servers that are created to avoid issues like this https://github.com/kubernetes-sigs/controller-runtime/issues/1782
// we attempt to create the missing number of game servers, but we don't want to create more than the max
for i := 0; i < gsb.Spec.StandingBy-nonActiveGameServersCount &&
i+nonActiveGameServersCount+activeCount < gsb.Spec.Max &&
i < maxNumberOfGameServersToAdd; i++ {

newgs, err := NewGameServerForGameServerBuild(&gsb, r.PortRegistry)
if err != nil {
return ctrl.Result{}, err
Expand All @@ -240,8 +206,9 @@ func (r *GameServerBuildReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return r.updateStatus(ctx, &gsb, pendingCount, initializingCount, standingByCount, activeCount, crashesCount)
}

// updateStatus patches the GameServerBuild's status only if the status of at least one of its GameServers has changed
func (r *GameServerBuildReconciler) updateStatus(ctx context.Context, gsb *mpsv1alpha1.GameServerBuild, pendingCount, initializingCount, standingByCount, activeCount, crashesCount int) (ctrl.Result, error) {
// update GameServerBuild status only if one of the fields has changed
// patch GameServerBuild status only if one of the fields has changed
if gsb.Status.CurrentPending != pendingCount ||
gsb.Status.CurrentInitializing != initializingCount ||
gsb.Status.CurrentActive != activeCount ||
Expand All @@ -255,17 +222,8 @@ func (r *GameServerBuildReconciler) updateStatus(ctx context.Context, gsb *mpsv1
gsb.Status.CurrentActive = activeCount
gsb.Status.CurrentStandingBy = standingByCount

// try and get existing crashesCount from the map
// if it doesn't exist, create it with initial value the number of crashes we detected on this reconcile loop
key := getKeyForCrashesPerBuildMap(gsb)
val, ok := crashesPerBuild.LoadOrStore(key, crashesCount)
// if we have existing crashes, get the value
var existingCrashes int = 0
if ok {
existingCrashes = val.(int)
// and store the new one
crashesPerBuild.Store(key, crashesCount+existingCrashes)
}
existingCrashes := r.getExistingCrashes(gsb, crashesCount)

// update the crashesCount status with the new value of total crashes
gsb.Status.CrashesCount = existingCrashes + crashesCount
gsb.Status.CurrentStandingByReadyDesired = fmt.Sprintf("%d/%d", standingByCount, gsb.Spec.StandingBy)
Expand Down Expand Up @@ -390,6 +348,59 @@ func (r *GameServerBuildReconciler) gameServersUnderCreationWereCreated(ctx cont
return true, nil
}

// getKeyForCrashesPerBuildMap returns the key for the map of crashes per build
// key is namespace/name
func getKeyForCrashesPerBuildMap(gsb *mpsv1alpha1.GameServerBuild) string {
return fmt.Sprintf("%s/%s", gsb.Namespace, gsb.Name)
}

// deleteNonActiveGameServers loops through all the GameServers CRs and deletes non-Active ones
func (r *GameServerBuildReconciler) deleteNonActiveGameServers(ctx context.Context,
gsb *mpsv1alpha1.GameServerBuild,
gameServers *mpsv1alpha1.GameServerList,
totalNumberOfGameServersToDelete int) error {
deletedGameServersCount := 0
for i := 0; i < len(gameServers.Items) && deletedGameServersCount < totalNumberOfGameServersToDelete; i++ {
gs := gameServers.Items[i]
// we're deleting only initializing/pending/standingBy servers, never touching active
if gs.Status.State == "" || gs.Status.State == mpsv1alpha1.GameServerStateInitializing || gs.Status.State == mpsv1alpha1.GameServerStateStandingBy {
if err := r.deleteGameServer(ctx, &gs); err != nil {
if apierrors.IsConflict(err) { // this GameServer has been updated, skip it
continue
}
return err
}
deletedGameServersCount = deletedGameServersCount + 1
GameServersDeletedCounter.WithLabelValues(gsb.Name).Inc()
addGameServerToUnderDeletionMap(gsb.Name, gs.Name)
r.Recorder.Eventf(gsb, corev1.EventTypeNormal, "GameServer deleted", "GameServer %s deleted", gs.Name)
}
}
return nil
}

// deleteGameServer deletes the provided GameServer
func (r *GameServerBuildReconciler) deleteGameServer(ctx context.Context, gs *mpsv1alpha1.GameServer) error {
// we're requesting the GameServer to be deleted to have the same ResourceVersion
// since it might have been updated (e.g. allocated) and the cache hasn't been updated yet
return r.Client.Delete(ctx, gs, &client.DeleteOptions{
Preconditions: &metav1.Preconditions{
ResourceVersion: &gs.ResourceVersion,
}})
}

// getTotalCrashes returns the total number of crashes for this GameServerBuild
func (r *GameServerBuildReconciler) getExistingCrashes(gsb *mpsv1alpha1.GameServerBuild, newCrashesCount int) int {
// try and get existing crashesCount from the map
// if it doesn't exist, create it with initial value the number of crashes we detected on this reconcile loop
key := getKeyForCrashesPerBuildMap(gsb)
val, ok := crashesPerBuild.LoadOrStore(key, newCrashesCount)
// if we have existing crashes, get the value
var existingCrashes int = 0
if ok {
existingCrashes = val.(int)
// and store the new one
crashesPerBuild.Store(key, newCrashesCount+existingCrashes)
}
return existingCrashes
}
42 changes: 36 additions & 6 deletions pkg/operator/http/allocate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"context"
"encoding/json"
"errors"
"math/rand"
Expand All @@ -18,7 +19,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type allocateHandler struct {
Expand All @@ -33,6 +36,7 @@ func (h *allocateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func (h *allocateHandler) handle(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := log.FromContext(ctx)

if r.Method != http.MethodPost && r.Method != http.MethodPatch {
badRequestError(ctx, w, errors.New("invalid method"), "Only POST and PATCH are accepted")
Expand Down Expand Up @@ -72,7 +76,7 @@ func (h *allocateHandler) handle(w http.ResponseWriter, r *http.Request) {

// check if this server is already allocated
var gameserversForSessionID mpsv1alpha1.GameServerList
err = h.client.List(r.Context(), &gameserversForSessionID, &client.ListOptions{
err = h.client.List(ctx, &gameserversForSessionID, &client.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"status.sessionID": args.SessionID}),
LabelSelector: labels.SelectorFromSet(labels.Set{controllers.LabelBuildID: args.BuildID}),
})
Expand All @@ -87,6 +91,7 @@ func (h *allocateHandler) handle(w http.ResponseWriter, r *http.Request) {
return
}

// found a GameServer in this GameServerBuild with the same sessionID
if len(gameserversForSessionID.Items) == 1 {
// return it
gs := gameserversForSessionID.Items[0]
Expand All @@ -101,7 +106,7 @@ func (h *allocateHandler) handle(w http.ResponseWriter, r *http.Request) {

// get the standingBy GameServers for this BuildID
var gameserversStandingBy mpsv1alpha1.GameServerList
err = h.client.List(r.Context(), &gameserversStandingBy, &client.ListOptions{
err = h.client.List(ctx, &gameserversStandingBy, &client.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"status.state": "StandingBy"}),
LabelSelector: labels.SelectorFromSet(labels.Set{controllers.LabelBuildID: args.BuildID}),
})
Expand All @@ -115,24 +120,36 @@ func (h *allocateHandler) handle(w http.ResponseWriter, r *http.Request) {
return
}

// pick a random one
// pick a random GameServer to allocate
gs := gameserversStandingBy.Items[rand.Intn(len(gameserversStandingBy.Items))]

// get a GameServerDetail object for this GameServer
gsd := createGameServerDetailForGameServer(&gs, args.InitialPlayers)

err = h.client.Create(r.Context(), &gsd)
// create it
err = h.client.Create(ctx, &gsd)
if err != nil {
internalServerError(ctx, w, err, "cannot create GameServerDetail")
return
}

// set the relevant status fields
// set the relevant status fields for the GameServer
gs.Status.State = mpsv1alpha1.GameServerStateActive
gs.Status.SessionID = args.SessionID
gs.Status.SessionCookie = args.SessionCookie

err = h.client.Status().Update(r.Context(), &gs)
// we use .Update instead of .Patch to avoid simultaneous allocations updating the same GameServer
// this is a very unlikely scenario, since the .Create on the GameServerDetail would fail
err = h.client.Status().Update(ctx, &gs)
if err != nil {
// we are in a semi-corrupt state, since a GameServerDetail object has been created but we failed to update the GameServer object
// we launch a goroutine to delete the GameServerDetail object
go func() {
err := h.deleteGameServerDetail(ctx, &gsd)
if err != nil {
log.Error(err, fmt.Sprintf("Failed to delete GameServerDetail object %s", gsd.Name))
}
}()
internalServerError(ctx, w, err, "cannot update game server")
return
}
Expand All @@ -150,6 +167,7 @@ func (h *allocateHandler) handle(w http.ResponseWriter, r *http.Request) {
controllers.AllocationsCounter.WithLabelValues(gs.Labels[controllers.LabelBuildName]).Inc()
}

// createGameServerDetailForGameServer returns a new GameServerDetail object for the given GameServer containing the initialPlayers string slice
func createGameServerDetailForGameServer(gs *mpsv1alpha1.GameServer, initialPlayers []string) mpsv1alpha1.GameServerDetail {
return mpsv1alpha1.GameServerDetail{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -170,3 +188,15 @@ func createGameServerDetailForGameServer(gs *mpsv1alpha1.GameServer, initialPlay
},
}
}

// deleteFameServerDetail deletes the GameServerDetail object for the given GameServer with backoff retries
func (h *allocateHandler) deleteGameServerDetail(ctx context.Context, gsd *mpsv1alpha1.GameServerDetail) error {
err := retry.OnError(retry.DefaultBackoff,
func(_ error) bool {
return true // TODO: check if we can do something better here, like check for timeouts?
}, func() error {
err := h.client.Delete(ctx, gsd)
return err
})
return err
}