Skip to content

Commit

Permalink
Gameserver update: add processing of disappeared processes & update s…
Browse files Browse the repository at this point in the history
…tatistics (#78)

* Add handling in case the process disappears

* Update statistics collector: use it only for one epoch

* Add synchronization to the `fail` method

* Restructure & add processing of process killing by system

* Add `FailedMaps` instead of `Status` & refactor

* Add info about failed maps to stats table

* Fix logging of exception

* `GameError`: Replace error with error_name & add error's description to `GameInterruptedError`

* Fix types

* Remove statscollector from GameError

* Save 'FunctionTimedOut'ed maps

* Remove extra exception handler.

* Move `avg_by_attr` from StatisticsCollector

* Fix calculation of average coverage

---------

Co-authored-by: Anya497 <chi.vinny0702@gmail.com>
  • Loading branch information
Parzival-05 and Anya497 committed Sep 15, 2024
1 parent 03003dc commit fac6464
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 141 deletions.
18 changes: 0 additions & 18 deletions AIAgent/common/errors.py

This file was deleted.

15 changes: 15 additions & 0 deletions AIAgent/common/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import TypeVar

T = TypeVar("T")


def inheritors(cls: T) -> set[T]:
subclasses: set[T] = set()
work = [cls]
while work:
parent = work.pop()
for child in parent.__subclasses__():
if child not in subclasses:
subclasses.add(child)
work.append(child)
return subclasses
23 changes: 16 additions & 7 deletions AIAgent/connection/broker_conn/socket_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@
import time
from contextlib import contextmanager, suppress

import psutil
import websocket
from config import GameServerConnectorConfig
from connection.broker_conn.classes import ServerInstanceInfo, SVMInfo
from connection.broker_conn.requests import acquire_instance, return_instance
from connection.errors_connection import ProcessStoppedError


@contextmanager
def process_running(pid):
if not psutil.pid_exists(pid):
raise ProcessStoppedError
yield


def wait_for_connection(server_instance: ServerInstanceInfo):
Expand All @@ -18,7 +27,7 @@ def wait_for_connection(server_instance: ServerInstanceInfo):
ConnectionRefusedError,
ConnectionResetError,
websocket.WebSocketTimeoutException,
):
), process_running(server_instance.pid):
ws.settimeout(GameServerConnectorConfig.CREATE_CONNECTION_TIMEOUT_SEC)
ws.connect(
server_instance.ws_url,
Expand All @@ -39,12 +48,12 @@ def wait_for_connection(server_instance: ServerInstanceInfo):
@contextmanager
def game_server_socket_manager(svm_info: SVMInfo):
server_instance = acquire_instance(svm_info)

socket = wait_for_connection(server_instance)

try:
socket.settimeout(GameServerConnectorConfig.RESPONCE_TIMEOUT_SEC)
yield socket
socket = wait_for_connection(server_instance)
try:
socket.settimeout(GameServerConnectorConfig.RESPONCE_TIMEOUT_SEC)
yield socket
finally:
socket.close()
finally:
socket.close()
return_instance(server_instance)
22 changes: 22 additions & 0 deletions AIAgent/connection/errors_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from abc import ABC, abstractmethod


class GameInterruptedError(Exception, ABC):
"""Game was unexpectedly interrupted due to external reasons"""

@property
@abstractmethod
def desc(self):
pass


class ProcessStoppedError(GameInterruptedError):
"""SVM's process unexpectedly stopped"""

desc = "SVM's process unexpectedly stopped"


class ConnectionLostError(GameInterruptedError):
"""Connection to SVM was lost"""

desc = "Connection to SVM was lost"
28 changes: 24 additions & 4 deletions AIAgent/connection/game_server_conn/connector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from functools import wraps
import logging
import logging.config
from typing import Optional

import websocket
from connection.errors_connection import ConnectionLostError
from common.game import GameMap, GameState

from .messages import (
Expand Down Expand Up @@ -54,12 +56,30 @@ def __init__(

start_message = ClientMessage(StartMessageBody(**map.to_dict()))
logging.debug(f"--> StartMessage : {start_message}")
self.ws.send(start_message.to_json())
self.send(start_message.to_json())
self._current_step = 0
self.game_is_over = False
self.map = map
self.steps = steps

def catch_losing_of_connection(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ConnectionResetError as e:
raise ConnectionLostError from e

return wrapper

@catch_losing_of_connection
def receive(self):
return self.ws.recv()

@catch_losing_of_connection
def send(self, msg):
return self.ws.send(msg)

def _raise_if_gameover(self, msg) -> GameOverServerMessage | str:
if self.game_is_over:
raise Connector.GameOver
Expand All @@ -83,7 +103,7 @@ def _raise_if_gameover(self, msg) -> GameOverServerMessage | str:
return msg

def recv_state_or_throw_gameover(self) -> GameState:
received = self.ws.recv()
received = self.receive()
data = GameStateServerMessage.from_json_handle(
self._raise_if_gameover(received),
expected=GameStateServerMessage,
Expand All @@ -98,11 +118,11 @@ def send_step(self, next_state_id: int, predicted_usefullness: int):
)
)
logging.debug(f"--> ClientMessage : {do_step_message}")
self.ws.send(do_step_message.to_json())
self.send(do_step_message.to_json())
self._sent_state_id = next_state_id

def recv_reward_or_throw_gameover(self) -> Reward:
received = self.ws.recv()
received = self.receive()
decoded = RewardServerMessage.from_json_handle(
self._raise_if_gameover(received),
expected=RewardServerMessage,
Expand Down
11 changes: 8 additions & 3 deletions AIAgent/launch_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,15 @@ async def run():


def kill_server(server_instance: ServerInstanceInfo):
os.kill(server_instance.pid, signal.SIGKILL)
PROCS.remove(server_instance.pid)

proc_info = psutil.Process(server_instance.pid)
try:
os.kill(server_instance.pid, signal.SIGKILL)
proc_info = psutil.Process(server_instance.pid)
except (ProcessLookupError, psutil.NoSuchProcess):
logging.warning(
f"Failed to kill the process with ID={server_instance.pid}: the process doesn't exist"
)
return
wait_for_reset_retries = FeatureConfig.ON_GAME_SERVER_RESTART.wait_for_reset_retries

while wait_for_reset_retries:
Expand Down
22 changes: 22 additions & 0 deletions AIAgent/ml/game/errors_game.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from func_timeout import FunctionTimedOut
from connection.errors_connection import GameInterruptedError
from common.utils import inheritors
from common.game import GameMap2SVM


class GameError(Exception):

def __init__(
self,
game_map2svm: GameMap2SVM,
error_name: str,
) -> None:
self._map = game_map2svm
self._error_name = error_name

super().__init__(game_map2svm, error_name)

def need_to_save_map(self):
gie_inheritors = inheritors(GameInterruptedError)
need_to_save_classes = list(gie_inheritors) + [FunctionTimedOut]
return self._error_name in map(lambda it: it.__name__, need_to_save_classes)
29 changes: 22 additions & 7 deletions AIAgent/ml/play_game.py → AIAgent/ml/game/play_game.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
import traceback
from typing import TypeAlias

from common.errors import GameError
from common.classes import GameResult, Map2Result
from common.game import GameState, GameMap2SVM
from config import FeatureConfig
from connection.errors_connection import GameInterruptedError
from connection.broker_conn.socket_manager import game_server_socket_manager
from connection.game_server_conn.connector import Connector
from func_timeout import FunctionTimedOut, func_set_timeout
from ml.protocols import Predictor
from ml.training.dataset import Result, TrainingDataset, convert_input_to_tensor
from ml.game.errors_game import GameError

TimeDuration: TypeAlias = float

Expand Down Expand Up @@ -165,14 +166,28 @@ def play_game(
)
map2result = Map2Result(game_map2svm, game_result)
except (FunctionTimedOut, Exception) as error:
need_to_save = True
name_of_predictor = with_predictor.name()

if isinstance(error, FunctionTimedOut):
log_message = f"<{with_predictor.name()}> timeouted on map {game_map2svm.GameMap.MapName} with {error.timedOutAfter}s"
log_message = f"<{name_of_predictor}> timeouted on map {game_map2svm.GameMap.MapName} with {error.timedOutAfter}s"
elif isinstance(error, GameInterruptedError):
log_message = f"<{name_of_predictor}> failed on map {game_map2svm.GameMap.MapName} with {error.__class__.__name__}: {error.desc}"
need_to_save = False
else:
log_message = f"<{with_predictor.name()}> failed on map {game_map2svm.GameMap.MapName}:\n{traceback.format_exc()}"
log_message = (
f"<{name_of_predictor}> failed on map {game_map2svm.GameMap.MapName}:\n"
+ "\n".join(
traceback.format_exception(
type(error), value=error, tb=error.__traceback__
)
)
)
logging.warning(log_message)
FeatureConfig.SAVE_IF_FAIL_OR_TIMEOUT.save_model(
with_predictor.model(), with_name=f"{with_predictor.name()}"
)
raise GameError(game_map2svm, error)

if need_to_save:
FeatureConfig.SAVE_IF_FAIL_OR_TIMEOUT.save_model(
with_predictor.model(), with_name=name_of_predictor
)
raise GameError(game_map2svm=game_map2svm, error_name=error.__class__.__name__)
return map2result
Loading

0 comments on commit fac6464

Please sign in to comment.