Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring sidecar #66

Merged
merged 1 commit into from
Nov 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sidecar-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func main() {

logger := log.WithFields(log.Fields{"GameServerName": gameServerName, "GameServerNamespace": crdNamespace})

h := NewHttpHandler(k8sClient, gameServerName, crdNamespace, logger)
sm := NewSidecarManager(k8sClient, gameServerName, crdNamespace, logger)

http.HandleFunc("/v1/sessionHosts/", h.heartbeatHandler)
http.HandleFunc("/v1/sessionHosts/", sm.heartbeatHandler)

http.ListenAndServe(fmt.Sprintf(":%d", SidecarPort), nil)
}
112 changes: 58 additions & 54 deletions sidecar-go/httphandler.go → sidecar-go/sidecarmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ var (

const logEveryHeartbeat = false

type httpHandler struct {
// sidecarManager is responsible for all the sidecar related operations
// mainly accepting heartbeats from the game server process and updating the GameServer CR by communicating with the Kubernetes API server
type sidecarManager struct {
k8sClient dynamic.Interface
previousGameState GameState
previousGameHealth string
Expand All @@ -53,8 +55,9 @@ type httpHandler struct {
logger *log.Entry
}

func NewHttpHandler(k8sClient dynamic.Interface, gameServerName, gameServerNamespace string, logger *log.Entry) httpHandler {
hh := httpHandler{
// NewSidecarManager creates a new sidecarManager
func NewSidecarManager(k8sClient dynamic.Interface, gameServerName, gameServerNamespace string, logger *log.Entry) sidecarManager {
sm := sidecarManager{
previousGameState: GameStateInitializing,
previousGameHealth: "N/A",
k8sClient: k8sClient,
Expand All @@ -63,28 +66,28 @@ func NewHttpHandler(k8sClient dynamic.Interface, gameServerName, gameServerNames
logger: logger,
}

hh.setupWatch()
sm.setupWatch()

return hh
return sm
}

// setupWatch sets up the informer to watch the GameServer CRD
func (h *httpHandler) setupWatch() {
func (sm *sidecarManager) setupWatch() {
// great article for reference https://firehydrant.io/blog/dynamic-kubernetes-informers/
listOptions := dynamicinformer.TweakListOptionsFunc(func(options *metav1.ListOptions) {
options.FieldSelector = fmt.Sprintf("metadata.name=%s", h.gameServerName)
options.FieldSelector = fmt.Sprintf("metadata.name=%s", sm.gameServerName)
})
dynInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(h.k8sClient, 0, h.gameServerNamespace, listOptions)
dynInformer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(sm.k8sClient, 0, sm.gameServerNamespace, listOptions)
informer := dynInformer.ForResource(gameserverGVR).Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: h.gameServerUpdated,
UpdateFunc: sm.gameServerUpdated,
})

go informer.Run(watchStopper)
}

// gameServerUpdated runs when the GameServer CRD has been updated
func (h *httpHandler) gameServerUpdated(oldObj, newObj interface{}) {
// gameServerUpdated runs when the GameServer CR has been updated
func (sm *sidecarManager) gameServerUpdated(oldObj, newObj interface{}) {
// dynamic client returns an unstructured object
old := oldObj.(*unstructured.Unstructured)
new := newObj.(*unstructured.Unstructured)
Expand All @@ -94,29 +97,29 @@ func (h *httpHandler) gameServerUpdated(oldObj, newObj interface{}) {
newState, newStateExists, newStateErr := unstructured.NestedString(new.Object, "status", "state")

if oldStateErr != nil {
h.logger.Errorf("error getting old state %s", oldStateErr.Error())
sm.logger.Errorf("error getting old state %s", oldStateErr.Error())
return
}

if newStateErr != nil {
h.logger.Errorf("error getting new state %s", newStateErr.Error())
sm.logger.Errorf("error getting new state %s", newStateErr.Error())
return
}

if !oldStateExists || !newStateExists {
h.logger.Warnf("state does not exist, oldStateExists:%t, newStateExists:%t", oldStateExists, newStateExists)
sm.logger.Warnf("state does not exist, oldStateExists:%t, newStateExists:%t", oldStateExists, newStateExists)
return
}

h.logger.Infof("GameServer CR updated %s:%s,%s,%s", old.GetName(), oldState, new.GetName(), newState)
sm.logger.Infof("GameServer CR updated %s:%s,%s,%s", old.GetName(), oldState, new.GetName(), newState)

// if the GameServer was allocated
if oldState == string(GameStateStandingBy) && newState == string(GameStateActive) {
sessionID, sessionCookie := h.parseSessionDetails(new)
h.logger.Infof("Got values from allocation, sessionID:%s, sessionCookie:%s", sessionID, sessionCookie)
sessionID, sessionCookie := sm.parseSessionDetails(new)
sm.logger.Infof("Got values from allocation, sessionID:%s, sessionCookie:%s", sessionID, sessionCookie)

initialPlayers := h.getInitialPlayers()
h.logger.Infof("Got values from allocation, initialPlayers:%#v", initialPlayers)
initialPlayers := sm.getInitialPlayers()
sm.logger.Infof("Got values from allocation, initialPlayers:%#v", initialPlayers)

sessionDetailsMutex.Lock()
sessionDetails = &SessionDetails{
Expand All @@ -133,48 +136,49 @@ func (h *httpHandler) gameServerUpdated(oldObj, newObj interface{}) {
}
}

// getInitialPlayers returns the initial players from the GameServerDetail CRD
func (h *httpHandler) getInitialPlayers() []string {
obj, err := h.k8sClient.Resource(gameserverDetailGVR).Namespace(h.gameServerNamespace).Get(context.Background(), h.gameServerName, metav1.GetOptions{})
// getInitialPlayers returns the initial players from the unstructured GameServerDetail CR
func (sm *sidecarManager) getInitialPlayers() []string {
obj, err := sm.k8sClient.Resource(gameserverDetailGVR).Namespace(sm.gameServerNamespace).Get(context.Background(), sm.gameServerName, metav1.GetOptions{})
if err != nil {
h.logger.Warnf("error getting initial players details %s", err.Error())
sm.logger.Warnf("error getting initial players details %s", err.Error())
return []string{}
}

initialPlayers, initialPlayersExist, err := unstructured.NestedStringSlice(obj.Object, "spec", "initialPlayers")
if err != nil {
h.logger.Warnf("error getting initial players %s", err.Error())
sm.logger.Warnf("error getting initial players %s", err.Error())
return []string{}
}
if !initialPlayersExist {
h.logger.Warnf("initial players does not exist")
sm.logger.Warnf("initial players does not exist")
return []string{}
}

return initialPlayers
}

// parseSessionDetails returns the sessionID and sessionCookie from the unstructured GameServer CRD
func (h *httpHandler) parseSessionDetails(u *unstructured.Unstructured) (string, string) {
// parseSessionDetails returns the sessionID and sessionCookie from the unstructured GameServer CR
func (sm *sidecarManager) parseSessionDetails(u *unstructured.Unstructured) (string, string) {
sessionID, sessionIDExists, sessionIDErr := unstructured.NestedString(u.Object, "status", "sessionID")
sessionCookie, sessionCookieExists, SessionCookieErr := unstructured.NestedString(u.Object, "status", "sessionCookie")

if !sessionIDExists || !sessionCookieExists {
h.logger.Warnf("sessionID or sessionCookie do not exist, sessionIDExists:%t, sessionCookieExists:%t", sessionIDExists, sessionCookieExists)
sm.logger.Warnf("sessionID or sessionCookie do not exist, sessionIDExists:%t, sessionCookieExists:%t", sessionIDExists, sessionCookieExists)
}

if sessionIDErr != nil {
h.logger.Warnf("error getting sessionID %s", sessionIDErr.Error())
sm.logger.Warnf("error getting sessionID %s", sessionIDErr.Error())
}

if SessionCookieErr != nil {
h.logger.Warnf("error getting sessionCookie %s", SessionCookieErr.Error())
sm.logger.Warnf("error getting sessionCookie %s", SessionCookieErr.Error())
}

return sessionID, sessionCookie
}

func (h *httpHandler) heartbeatHandler(w http.ResponseWriter, req *http.Request) {
// heartbeathandler is the http handler for the heartbeats coming from the game server process, facilitate through GSDK
func (sm *sidecarManager) heartbeatHandler(w http.ResponseWriter, req *http.Request) {
ctx := context.Background()
re := regexp.MustCompile(`.*/v1/sessionHosts\/(.*?)(/heartbeats|$)`)
match := re.FindStringSubmatch(req.RequestURI)
Expand All @@ -189,32 +193,32 @@ func (h *httpHandler) heartbeatHandler(w http.ResponseWriter, req *http.Request)
}

if logEveryHeartbeat {
h.logger.Debugf("heartbeat received from sessionHostId %s, data %#v", sessionHostId, hb)
sm.logger.Debugf("heartbeat received from sessionHostId %s, data %#v", sessionHostId, hb)
}

if err := validateHeartbeatRequestArgs(&hb); err != nil {
h.logger.Warnf("error validating heartbeat request %s", err.Error())
sm.logger.Warnf("error validating heartbeat request %s", err.Error())
badRequest(w, err, "invalid heartbeat request")
return
}

if err := h.updateHealthIfNeeded(ctx, &hb); err != nil {
h.logger.Errorf("error updating health %s", err.Error())
if err := sm.updateHealthIfNeeded(ctx, &hb); err != nil {
sm.logger.Errorf("error updating health %s", err.Error())
internalServerError(w, err, "error updating health")
return
}

// game has reached the standingBy state (GSDK ReadyForPlayers has been called)
if h.previousGameState != hb.CurrentGameState && hb.CurrentGameState == GameStateStandingBy {
if err := h.transitionStateToStandingBy(ctx, &hb); err != nil {
h.logger.Errorf("error updating state %s", err.Error())
if sm.previousGameState != hb.CurrentGameState && hb.CurrentGameState == GameStateStandingBy {
if err := sm.transitionStateToStandingBy(ctx, &hb); err != nil {
sm.logger.Errorf("error updating state %s", err.Error())
internalServerError(w, err, "error updating state")
return
}
}

if err := h.updateConnectedPlayersCountIfNeeded(ctx, &hb); err != nil {
h.logger.Errorf("error updating connected players count %s", err.Error())
if err := sm.updateConnectedPlayersCountIfNeeded(ctx, &hb); err != nil {
sm.logger.Errorf("error updating connected players count %s", err.Error())
internalServerError(w, err, "error updating connected players count")
return
}
Expand Down Expand Up @@ -253,47 +257,47 @@ func (h *httpHandler) heartbeatHandler(w http.ResponseWriter, req *http.Request)
}

// updateHealthIfNeeded updates the health of the GameServer CRD if the game health has changed
func (h *httpHandler) updateHealthIfNeeded(ctx context.Context, hb *HeartbeatRequest) error {
if h.previousGameHealth != hb.CurrentGameHealth {
h.logger.Infof("Health is different than before, updating. Old health %s, new health %s", h.previousGameHealth, hb.CurrentGameHealth)
func (sm *sidecarManager) updateHealthIfNeeded(ctx context.Context, hb *HeartbeatRequest) error {
if sm.previousGameHealth != hb.CurrentGameHealth {
sm.logger.Infof("Health is different than before, updating. Old health %s, new health %s", sm.previousGameHealth, hb.CurrentGameHealth)
payload := fmt.Sprintf("{\"status\":{\"health\":\"%s\"}}", hb.CurrentGameHealth)
payloadBytes := []byte(payload)
_, err := h.k8sClient.Resource(gameserverGVR).Namespace(h.gameServerNamespace).Patch(ctx, h.gameServerName, types.MergePatchType, payloadBytes, metav1.PatchOptions{}, "status")
_, err := sm.k8sClient.Resource(gameserverGVR).Namespace(sm.gameServerNamespace).Patch(ctx, sm.gameServerName, types.MergePatchType, payloadBytes, metav1.PatchOptions{}, "status")

if err != nil {
return err
}
h.previousGameHealth = hb.CurrentGameHealth
sm.previousGameHealth = hb.CurrentGameHealth
}
return nil
}

// updateConnectedPlayersCountIfNeeded updates the connected players count of the GameServer CRD if the connected players count has changed
func (h *httpHandler) updateConnectedPlayersCountIfNeeded(ctx context.Context, hb *HeartbeatRequest) error {
func (sm *sidecarManager) updateConnectedPlayersCountIfNeeded(ctx context.Context, hb *HeartbeatRequest) error {
// we're not interested in updating the connected players count if the game is not active
if hb.CurrentGameState == GameStateActive && h.connectedPlayersCount != len(hb.CurrentPlayers) {
h.logger.Infof("ConnectedPlayersCount is different than before, updating. Old connectedPlayersCount %d, new connectedPlayersCount %d", h.connectedPlayersCount, len(hb.CurrentPlayers))
if hb.CurrentGameState == GameStateActive && sm.connectedPlayersCount != len(hb.CurrentPlayers) {
sm.logger.Infof("ConnectedPlayersCount is different than before, updating. Old connectedPlayersCount %d, new connectedPlayersCount %d", sm.connectedPlayersCount, len(hb.CurrentPlayers))
payload := fmt.Sprintf("{\"spec\":{\"connectedPlayersCount\":%d}}", len(hb.CurrentPlayers))
payloadBytes := []byte(payload)
_, err := h.k8sClient.Resource(gameserverDetailGVR).Namespace(h.gameServerNamespace).Patch(ctx, h.gameServerName, types.MergePatchType, payloadBytes, metav1.PatchOptions{})
_, err := sm.k8sClient.Resource(gameserverDetailGVR).Namespace(sm.gameServerNamespace).Patch(ctx, sm.gameServerName, types.MergePatchType, payloadBytes, metav1.PatchOptions{})
if err != nil {
return err
}
// storing the current number in memory
h.connectedPlayersCount = len(hb.CurrentPlayers)
sm.connectedPlayersCount = len(hb.CurrentPlayers)
}
return nil
}

// transitionStateToStandingBy transitions the state of the GameServer CRD to standingBy
func (h *httpHandler) transitionStateToStandingBy(ctx context.Context, hb *HeartbeatRequest) error {
h.logger.Infof("State is different than before, updating. Old state %s, new state StandingBy", h.previousGameState)
func (sm *sidecarManager) transitionStateToStandingBy(ctx context.Context, hb *HeartbeatRequest) error {
sm.logger.Infof("State is different than before, updating. Old state %s, new state StandingBy", sm.previousGameState)
payload := fmt.Sprintf("{\"status\":{\"state\":\"%s\"}}", hb.CurrentGameState)
payloadBytes := []byte(payload)
_, err := h.k8sClient.Resource(gameserverGVR).Namespace(h.gameServerNamespace).Patch(ctx, h.gameServerName, types.MergePatchType, payloadBytes, metav1.PatchOptions{}, "status")
_, err := sm.k8sClient.Resource(gameserverGVR).Namespace(sm.gameServerNamespace).Patch(ctx, sm.gameServerName, types.MergePatchType, payloadBytes, metav1.PatchOptions{}, "status")
if err != nil {
return err
}
h.previousGameState = hb.CurrentGameState
sm.previousGameState = hb.CurrentGameState
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ var _ = Describe("API server tests", func() {
It("heartbeat with empty body should return error", func() {
req := httptest.NewRequest(http.MethodPost, "/v1/sessionHosts/sessionHostID", nil)
w := httptest.NewRecorder()
h := NewHttpHandler(newDynamicInterface(), testGameServerName, testGameServerNamespace, &log.Entry{
sm := NewSidecarManager(newDynamicInterface(), testGameServerName, testGameServerNamespace, &log.Entry{
Logger: log.New(),
})
h.heartbeatHandler(w, req)
sm.heartbeatHandler(w, req)
res := w.Result()
defer res.Body.Close()
Expect(res.StatusCode).To(Equal(http.StatusBadRequest))
Expand All @@ -46,10 +46,10 @@ var _ = Describe("API server tests", func() {
b, _ := json.Marshal(hb)
req := httptest.NewRequest(http.MethodPost, "/v1/sessionHosts/sessionHostID", bytes.NewReader(b))
w := httptest.NewRecorder()
h := NewHttpHandler(newDynamicInterface(), testGameServerName, testGameServerNamespace, &log.Entry{
sm := NewSidecarManager(newDynamicInterface(), testGameServerName, testGameServerNamespace, &log.Entry{
Logger: log.New(),
})
h.heartbeatHandler(w, req)
sm.heartbeatHandler(w, req)
res := w.Result()
defer res.Body.Close()
Expect(res.StatusCode).To(Equal(http.StatusBadRequest))
Expand All @@ -64,15 +64,15 @@ var _ = Describe("API server tests", func() {
b, _ := json.Marshal(hb)
req := httptest.NewRequest(http.MethodPost, "/v1/sessionHosts/sessionHostID", bytes.NewReader(b))
w := httptest.NewRecorder()
h := NewHttpHandler(newDynamicInterface(), testGameServerName, testGameServerNamespace, &log.Entry{
sm := NewSidecarManager(newDynamicInterface(), testGameServerName, testGameServerNamespace, &log.Entry{
Logger: log.New(),
})
gs := createUnstructuredTestGameServer(testGameServerName, testGameServerNamespace)

_, err := h.k8sClient.Resource(gameserverGVR).Namespace(testGameServerNamespace).Create(context.Background(), gs, metav1.CreateOptions{})
_, err := sm.k8sClient.Resource(gameserverGVR).Namespace(testGameServerNamespace).Create(context.Background(), gs, metav1.CreateOptions{})

Expect(err).ToNot(HaveOccurred())
h.heartbeatHandler(w, req)
sm.heartbeatHandler(w, req)
res := w.Result()
defer res.Body.Close()
Expect(res.StatusCode).To(Equal(http.StatusOK))
Expand Down