Skip to content

Commit

Permalink
Exit disruption watcher on non recoverable errors (#609)
Browse files Browse the repository at this point in the history
* fix(watcher): exit disruption worker on error
* refactor(watcher): log watch lifecycle
* docs(scheduler) termination grace period as string
  • Loading branch information
hspedro authored Mar 17, 2024
1 parent c5bd870 commit 62a1b40
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 22 deletions.
8 changes: 4 additions & 4 deletions docs/reference/Scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ portRange:
end: 60000
maxSurge: 30%
spec:
terminationGracePeriod: '100'
terminationGracePeriod: '100s'
containers:
- name: alpine
image: alpine
Expand Down Expand Up @@ -138,7 +138,7 @@ autoscaling:
},
"maxSurge": "30%",
"spec": {
"terminationGracePeriod": '100',
"terminationGracePeriod": '100s',
"containers": [
{
"name": "alpine",
Expand Down Expand Up @@ -301,12 +301,12 @@ It might be important to understand the basics of kubernetes before deep diving
It is represented as:
```yaml
terminationGracePeriod: Integer
terminationGracePeriod: String
containers: Containers
toleration: String
affinity: String
```
- **terminationGracePeriod**: Required integer value. Must be greater than 0. When a game room receives the signal to be deleted, it will take this value (in milliseconds) to be completely deleted;
- **terminationGracePeriod**: Required string value. Must be greater than 0 and have the unit set, i.e "100s". When a game room receives the signal to be deleted, it will take this value (in seconds) to be completely deleted;
- **containers**: Contain the information about the game room, such as the image and environment variables. This is a list since the game room can be compounded by
more than two containers;
- **toleration**: Kubernetes specific. Represents the toleration value for all GRUs on the scheduler. See [more](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/);
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorials/Development.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ curl --request POST \
},
"maxSurge": "10%",
"spec": {
"terminationGracePeriod": "100",
"terminationGracePeriod": "100s",
"containers": [
{
"name": "alpine",
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorials/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ curl --request POST \
},
"maxSurge": "10%",
"spec": {
"terminationGracePeriod": "100",
"terminationGracePeriod": "100s",
"containers": [
{
"name": "game-container",
Expand Down
38 changes: 23 additions & 15 deletions internal/core/worker/runtimewatcher/runtime_watcher_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ import (

var _ worker.Worker = (*runtimeWatcherWorker)(nil)

const WorkerName = "runtime_watcher"
const (
WorkerName = "runtime_watcher"
// Apply disruptions only if the total number of rooms is greater than
// MinRoomsToApplyDisruption. It prevents a 0% disruption budget that may
// block node drains entirely.
MinRoomsToApplyDisruption = 2
)

// runtimeWatcherWorker is a work that will watch for changes on the Runtime
// and apply to keep the Maestro state up-to-date. It is not expected to perform
Expand Down Expand Up @@ -93,6 +99,7 @@ func (w *runtimeWatcherWorker) spawnUpdateRoomWatchers(resultChan chan game_room
reportEventProcessingStatus(event, true)
}
case <-w.ctx.Done():
w.logger.Info("context closed, exiting update rooms watcher")
return
}
}
Expand All @@ -108,33 +115,28 @@ func (w *runtimeWatcherWorker) mitigateDisruptions() error {
zap.String("scheduler", w.scheduler.Name),
zap.Error(err),
)
return err
return nil
}
mitigateForRoomsAmount := 0
if totalRoomsAmount >= 2 {
mitigateForRoomsAmount, err = w.roomStorage.GetRoomCountByStatus(w.ctx, w.scheduler.Name, game_room.GameStatusOccupied)
mitigationQuota := 0
if totalRoomsAmount >= MinRoomsToApplyDisruption {
mitigationQuota, err = w.roomStorage.GetRoomCountByStatus(w.ctx, w.scheduler.Name, game_room.GameStatusOccupied)
if err != nil {
w.logger.Error(
"failed to get occupied rooms for scheduler",
zap.String("scheduler", w.scheduler.Name),
zap.Error(err),
)
return err
return nil
}
}
err = w.runtime.MitigateDisruption(w.ctx, w.scheduler, mitigateForRoomsAmount, w.config.DisruptionSafetyPercentage)
err = w.runtime.MitigateDisruption(w.ctx, w.scheduler, mitigationQuota, w.config.DisruptionSafetyPercentage)
if err != nil {
w.logger.Error(
"failed to mitigate disruption",
zap.String("scheduler", w.scheduler.Name),
zap.Error(err),
)
return err
}
w.logger.Debug(
"mitigated disruption for occupied rooms",
zap.String("scheduler", w.scheduler.Name),
zap.Int("mitigateForRoomsAmount", mitigateForRoomsAmount),
zap.Int("mitigationQuota", mitigationQuota),
)

return nil
Expand All @@ -153,13 +155,15 @@ func (w *runtimeWatcherWorker) spawnDisruptionWatcher() {
case <-ticker.C:
err := w.mitigateDisruptions()
if err != nil {
w.logger.Warn(
"Mitigate Disruption watcher run failed",
w.logger.Error(
"unrecoverable error mitigating disruption",
zap.String("scheduler", w.scheduler.Name),
zap.Error(err),
)
return
}
case <-w.ctx.Done():
w.logger.Info("context closed, exiting disruption watcher")
return
}
}
Expand All @@ -175,6 +179,7 @@ func (w *runtimeWatcherWorker) spawnWatchers(
}

func (w *runtimeWatcherWorker) Start(ctx context.Context) error {
w.logger.Info("starting runtime watcher", zap.String("scheduler", w.scheduler.Name))
watcher, err := w.runtime.WatchGameRoomInstances(ctx, w.scheduler)
if err != nil {
return fmt.Errorf("failed to start watcher: %w", err)
Expand All @@ -184,13 +189,16 @@ func (w *runtimeWatcherWorker) Start(ctx context.Context) error {
defer w.cancelFunc()

w.spawnWatchers(watcher.ResultChan())
w.logger.Info("spawned all goroutines", zap.String("scheduler", w.scheduler.Name))

w.workerWaitGroup.Wait()
w.logger.Info("wait group ended, all goroutines stopped", zap.String("scheduler", w.scheduler.Name))
watcher.Stop()
return nil
}

func (w *runtimeWatcherWorker) Stop(_ context.Context) {
w.logger.Info("stopping runtime watcher", zap.String("scheduler", w.scheduler.Name))
if w.cancelFunc != nil {
w.cancelFunc()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ func TestRuntimeWatcher_Start(t *testing.T) {
runtimeWatcher.EXPECT().Stop()

runtime.EXPECT().MitigateDisruption(gomock.Any(), gomock.Any(), 0, 0.0).Return(nil).MinTimes(0)
roomStorage.EXPECT().GetRoomCount(gomock.Any(), gomock.Any()).Return(2, nil).MinTimes(0)
roomStorage.EXPECT().GetRoomCount(gomock.Any(), gomock.Any()).Return(MinRoomsToApplyDisruption, nil).MinTimes(0)
// We only call GetRoomCountByStatus to apply mitigation if there is more than 1 room
roomStorage.EXPECT().GetRoomCountByStatus(gomock.Any(), gomock.Any(), gomock.Any()).Return(0, nil).MinTimes(0)

ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down

0 comments on commit 62a1b40

Please sign in to comment.