From 525e9b9103c2cc675c4ca81d9254fbc68d6c7970 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Mon, 8 Jul 2024 20:00:18 +0200 Subject: [PATCH] Some generic deleter improvements and fixed autoserialization with "typed" list, creating a simple mechanic to restore original types --- .../generics/test_dynamic_deferred_removal.py | 8 +-- server/src/uds/REST/methods/system.py | 4 +- .../services/generics/dynamic/publication.py | 25 ++++----- .../services/generics/dynamic/userservice.py | 9 ++-- server/src/uds/core/util/autoserializable.py | 53 +++++++++++++++---- server/src/uds/core/util/storage.py | 3 ++ .../src/uds/core/workers/deferred_deletion.py | 27 +++++++--- .../src/uds/services/Proxmox/proxmox/types.py | 4 +- 8 files changed, 90 insertions(+), 43 deletions(-) diff --git a/server/src/tests/core/services/generics/test_dynamic_deferred_removal.py b/server/src/tests/core/services/generics/test_dynamic_deferred_removal.py index bd9c416dd..a0283f5cb 100644 --- a/server/src/tests/core/services/generics/test_dynamic_deferred_removal.py +++ b/server/src/tests/core/services/generics/test_dynamic_deferred_removal.py @@ -397,7 +397,7 @@ def test_deletion_fails_add(self) -> None: self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 1) self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 0) # Test that MAX_TOTAL_RETRIES works fine - deferred_deletion.MAX_TOTAL_RETRIES = 2 + deferred_deletion.MAX_RETRAYABLE_ERROR_RETRIES = 2 # reset last_check, or it will not retry self.set_last_check_expired() job.run() @@ -448,7 +448,7 @@ def test_deletion_fails_is_deleted(self) -> None: self.assertEqual(self.count_entries_on_storage(deferred_deletion.TO_DELETE_GROUP), 0) self.assertEqual(self.count_entries_on_storage(deferred_deletion.DELETING_GROUP), 1) # Test that MAX_TOTAL_RETRIES works fine - deferred_deletion.MAX_TOTAL_RETRIES = 2 + deferred_deletion.MAX_RETRAYABLE_ERROR_RETRIES = 2 # reset last_check, or it will not retry self.set_last_check_expired() job.run() @@ -575,7 +575,7 @@ def _running(*args: typing.Any, **kwargs: typing.Any) -> bool: def test_stop_retry_stop(self) -> None: deferred_deletion.RETRIES_TO_RETRY = 2 - deferred_deletion.MAX_TOTAL_RETRIES = 4 + deferred_deletion.MAX_RETRAYABLE_ERROR_RETRIES = 4 with self.patch_for_worker( is_running=helpers.returns_true, @@ -659,7 +659,7 @@ def test_stop_retry_stop(self) -> None: def test_delete_retry_delete(self) -> None: deferred_deletion.RETRIES_TO_RETRY = 2 - deferred_deletion.MAX_TOTAL_RETRIES = 4 + deferred_deletion.MAX_RETRAYABLE_ERROR_RETRIES = 4 with self.patch_for_worker( is_running=helpers.returns_true, diff --git a/server/src/uds/REST/methods/system.py b/server/src/uds/REST/methods/system.py index ad78107e6..b470dbde8 100644 --- a/server/src/uds/REST/methods/system.py +++ b/server/src/uds/REST/methods/system.py @@ -54,7 +54,7 @@ # Enclosed methods under /stats path POINTS = 70 -SINCE = 180 # Days, if higer values used, ensure mysql/mariadb has a bigger sort buffer +SINCE = 90 # Days, if higer values used, ensure mysql/mariadb has a bigger sort buffer USE_MAX = True CACHE_TIME = 60 * 60 # 1 hour @@ -91,7 +91,7 @@ def get_servicepools_counters( val = [ { 'stamp': x.stamp, - 'value': (x.sum // x.count if x.count > 0 else 0) if not USE_MAX else x.max, + 'value': (x.sum / x.count if x.count > 0 else 0) if not USE_MAX else x.max, } for x in stats ] diff --git a/server/src/uds/core/services/generics/dynamic/publication.py b/server/src/uds/core/services/generics/dynamic/publication.py index 7ffdc8f58..73a024855 100644 --- a/server/src/uds/core/services/generics/dynamic/publication.py +++ b/server/src/uds/core/services/generics/dynamic/publication.py @@ -57,7 +57,7 @@ class DynamicPublication(services.Publication, autoserializable.AutoSerializable _name = autoserializable.StringField(default='') _vmid = autoserializable.StringField(default='') - _queue = autoserializable.ListField[Operation]() + _queue = autoserializable.ListField[Operation](cast=Operation.from_int) _reason = autoserializable.StringField(default='') _is_flagged_for_destroy = autoserializable.BoolField(default=False) @@ -83,12 +83,12 @@ def _reset_checks_counter(self) -> None: data['exec_count'] = 0 @typing.final - def _inc_checks_counter(self, info: typing.Optional[str] = None) -> typing.Optional[types.states.TaskState]: + def _inc_checks_counter(self, op: Operation) -> typing.Optional[types.states.TaskState]: with self.storage.as_dict() as data: count = data.get('exec_count', 0) + 1 data['exec_count'] = count if count > self.max_state_checks: - return self._error(f'Max checks reached on {info or "unknown"}') + return self._error(f'Max checks reached on {op}') return None @typing.final @@ -132,7 +132,7 @@ def _error(self, reason: typing.Union[str, Exception]) -> types.states.TaskState Returns: State.ERROR, so we can do "return self._error(reason)" """ - self._error_debug_info = self._debug(repr(reason)) + self._error_debug_info = self._debug(f'{repr(reason)} {getattr(reason, "__backtrace__", "")}') reason = str(reason) logger.error(reason) @@ -175,7 +175,7 @@ def _execute_queue(self) -> types.states.TaskState: # This is a retryable error, so we will retry later return self.retry_later() except Exception as e: - logger.exception('Unexpected FixedUserService exception: %s', e) + logger.debug('Exception on %s: %s', op, e, exc_info=True) return self._error(str(e)) @typing.final @@ -202,11 +202,11 @@ def generate_name(self) -> str: else: # Get the service pool name, and remove all {} macros name = self.servicepool_name() - + return self.service().sanitized_name(f'UDS-Pub-{name}-v{self.revision()}') - + def generate_annotation(self) -> str: - return (f'UDS publication for {self.servicepool_name()} created on {time.strftime("%Y-%m-%d %H:%M:%S")}') + return f'UDS publication for {self.servicepool_name()} created on {time.strftime("%Y-%m-%d %H:%M:%S")}' def check_space(self) -> bool: """ @@ -245,7 +245,7 @@ def check_state(self) -> types.states.TaskState: if op != Operation.WAIT: # All operations except WAIT will check against checks counter - counter_state = self._inc_checks_counter(self._op2str(op)) + counter_state = self._inc_checks_counter(op) if counter_state is not None: return counter_state # Error, Finished or None @@ -272,6 +272,7 @@ def check_state(self) -> types.states.TaskState: # And it has not been removed from the queue return types.states.TaskState.RUNNING except Exception as e: + logger.debug('Exception on %s: %s', op, e, exc_info=True) return self._error(e) @typing.final @@ -530,12 +531,8 @@ def op_unsupported(self) -> None: def op_unsupported_checker(self) -> types.states.TaskState: raise Exception('Operation not defined') - @staticmethod - def _op2str(op: Operation) -> str: - return op.name - def _debug(self, txt: str) -> str: - msg = f'Queue at {txt} for {self._name}: {self._queue}, vmid:{self._vmid}' + msg = f'{txt} on {self._name}: {self._queue}, vmid:{self._vmid}' logger.debug( msg, ) diff --git a/server/src/uds/core/services/generics/dynamic/userservice.py b/server/src/uds/core/services/generics/dynamic/userservice.py index 3b034d504..c8aca55fd 100644 --- a/server/src/uds/core/services/generics/dynamic/userservice.py +++ b/server/src/uds/core/services/generics/dynamic/userservice.py @@ -90,7 +90,8 @@ class DynamicUserService(services.UserService, autoserializable.AutoSerializable _ip = autoserializable.StringField(default='') _vmid = autoserializable.StringField(default='') _reason = autoserializable.StringField(default='') - _queue = autoserializable.ListField[types.services.Operation]() # Default is empty list + # cast is used to ensure that when data is reloaded, it's casted to the correct type + _queue = autoserializable.ListField[types.services.Operation](cast=types.services.Operation.from_int) _is_flagged_for_destroy = autoserializable.BoolField(default=False) # Extra info, not serializable, to keep information in case of exception and debug it @@ -313,7 +314,7 @@ def get_vmname(self) -> str: return consts.NO_MORE_NAMES return self.service().sanitized_name(f'UDS_{name}') # Default implementation - + # overridable, to allow receiving notifications from, for example, services def notify(self, message: str, data: typing.Any = None) -> None: pass @@ -463,7 +464,9 @@ def check_state(self) -> types.states.TaskState: if state == types.states.TaskState.FINISHED: # Remove finished operation from queue top_op = self._queue.pop(0) - if top_op != types.services.Operation.RETRY: # Inserted if a retrayable error occurs on execution queue + if ( + top_op != types.services.Operation.RETRY + ): # Inserted if a retrayable error occurs on execution queue self._reset_retries_counter() return self._execute_queue() diff --git a/server/src/uds/core/util/autoserializable.py b/server/src/uds/core/util/autoserializable.py index 87d5f072d..74b4a6063 100644 --- a/server/src/uds/core/util/autoserializable.py +++ b/server/src/uds/core/util/autoserializable.py @@ -131,7 +131,7 @@ class _MarshalInfo: - 2 bytes -> name length, little endian - 2 bytes -> type name length, little endian - 4 bytes -> data length, little endian - + (Previous is defined by PACKED_LENGHS struct) - n bytes -> name - n bytes -> type name @@ -212,10 +212,10 @@ def __get__( instance {SerializableFields} -- Instance of class with field """ - if hasattr(instance, '_fields'): + if hasattr(instance, '_fields'): if self.name in getattr(instance, '_fields'): return getattr(instance, '_fields')[self.name] - + if self.default is None: raise AttributeError(f"Field {self.name} is not set") # Set default using setter @@ -310,16 +310,26 @@ def unmarshal(self, instance: 'AutoSerializable', data: bytes) -> None: class ListField(_SerializableField[list[T]], list[T]): """List field + + Args: + default: Default value for the field. Can be a list or a callable that returns a list. + cast: Optional function to cast the values of the list to the desired type. If not provided, the values will be "deserialized" as they are. (see notes) Note: All elements in the list must be serializable in JSON, but can be of different types. + In case of serilization of enumerations, they will be serialized as integers or strings. + (Take into account this when using enumerations in lists. The values will be compatible, but not the types) """ + _cast: typing.Optional[typing.Callable[[typing.Any], T]] + def __init__( self, default: typing.Union[list[T], collections.abc.Callable[[], list[T]]] = lambda: [], + cast: typing.Optional[typing.Callable[[typing.Any], T]] = None, ): super().__init__(list, default) + self._cast = cast def marshal(self, instance: 'AutoSerializable') -> bytes: # \x01 is the version of this field marshal format, so we can change it in the future @@ -328,19 +338,31 @@ def marshal(self, instance: 'AutoSerializable') -> bytes: def unmarshal(self, instance: 'AutoSerializable', data: bytes) -> None: if data[0] != 1: raise ValueError('Invalid list data') - self.__set__(instance, json.loads(data[1:])) + + self.__set__( + instance, [self._cast(i) for i in json.loads(data[1:])] if self._cast else json.loads(data[1:]) + ) class DictField(_SerializableField[dict[T, V]], dict[T, V]): """Dict field + + Args: + default: Default value for the field. Can be a dict or a callable that returns a dict. + cast: Optional function to cast the values of the dict to the desired type. If not provided, the values will be "deserialized" as they are. (see notes) Note: All elements in the dict must be serializable. + Note that due to the use of json as serialization format, keys Will be converted to strings. + Also, values of enumerations will be serialized as integers or strings. """ + _cast: typing.Optional[typing.Callable[[T, V], tuple[T, V]]] + def __init__( self, default: typing.Union[dict[T, V], collections.abc.Callable[[], dict[T, V]]] = lambda: {}, + cast: typing.Optional[typing.Callable[[typing.Any], tuple[T, V]]] = None, ): super().__init__(dict, default) @@ -351,7 +373,10 @@ def marshal(self, instance: 'AutoSerializable') -> bytes: def unmarshal(self, instance: 'AutoSerializable', data: bytes) -> None: if data[0] != 1: raise ValueError('Invalid dict data') - self.__set__(instance, json.loads(data[1:])) + self.__set__( + instance, + dict(self._cast(k, v) for k, v in json.loads(data[1:])) if self._cast else json.loads(data[1:]), + ) class ObjectField(_SerializableField[T]): @@ -475,7 +500,7 @@ class AutoSerializable(Serializable, metaclass=_FieldNameSetter): """ _fields: dict[str, typing.Any] - + serialization_version: int = 0 # So autoserializable classes can keep their version if needed def _autoserializable_fields(self) -> collections.abc.Iterator[tuple[str, _SerializableField[typing.Any]]]: @@ -546,7 +571,11 @@ def marshal(self) -> bytes: # Calculate checksum checksum = zlib.crc32(data) # Compose header, that is V1_HEADER + checksum (4 bytes, big endian) - header = HEADER_BASE + self.serialization_version.to_bytes(VERSION_SIZE, 'big') + checksum.to_bytes(CRC_SIZE, 'big') + header = ( + HEADER_BASE + + self.serialization_version.to_bytes(VERSION_SIZE, 'big') + + checksum.to_bytes(CRC_SIZE, 'big') + ) # Return data processed with header return header + self.process_data(header, data) @@ -559,9 +588,13 @@ def unmarshal(self, data: bytes) -> None: header = data[: len(HEADER_BASE) + VERSION_SIZE + CRC_SIZE] # extract version - self._serialization_version = int.from_bytes(header[len(HEADER_BASE) : len(HEADER_BASE) + VERSION_SIZE], 'big') + self._serialization_version = int.from_bytes( + header[len(HEADER_BASE) : len(HEADER_BASE) + VERSION_SIZE], 'big' + ) # Extract checksum - checksum = int.from_bytes(header[len(HEADER_BASE) + VERSION_SIZE : len(HEADER_BASE) + VERSION_SIZE + CRC_SIZE], 'big') + checksum = int.from_bytes( + header[len(HEADER_BASE) + VERSION_SIZE : len(HEADER_BASE) + VERSION_SIZE + CRC_SIZE], 'big' + ) # Unprocess data data = self.unprocess_data(header, data[len(header) :]) @@ -614,7 +647,7 @@ def __str__(self) -> str: return ', '.join( [f"{k}={v.obj_type.__name__}({v.__get__(self)})" for k, v in self._autoserializable_fields()] ) - + def as_dict(self) -> dict[str, typing.Any]: return {k: v.__get__(self) for k, v in self._autoserializable_fields()} diff --git a/server/src/uds/core/util/storage.py b/server/src/uds/core/util/storage.py index 30c12cefe..e67d1e753 100644 --- a/server/src/uds/core/util/storage.py +++ b/server/src/uds/core/util/storage.py @@ -183,6 +183,9 @@ def __len__(self) -> int: # Optimized methods, avoid re-reading from DB def items(self) -> typing.Iterator[tuple[str, typing.Any]]: # type: ignore # compatible type return iter(_decode_value(i.key, i.data) for i in self._filtered) + + def keys(self) -> typing.Iterator[str]: # type: ignore # compatible type + return iter(_decode_value(i.key, i.data)[0] for i in self._filtered) def values(self) -> typing.Iterator[typing.Any]: # type: ignore # compatible type return iter(_decode_value(i.key, i.data)[1] for i in self._filtered) diff --git a/server/src/uds/core/workers/deferred_deletion.py b/server/src/uds/core/workers/deferred_deletion.py index b0a3b0b3f..8f702a5d1 100644 --- a/server/src/uds/core/workers/deferred_deletion.py +++ b/server/src/uds/core/workers/deferred_deletion.py @@ -48,12 +48,12 @@ logger = logging.getLogger(__name__) -MAX_FATAL_ERROR_RETRIES: typing.Final[int] = 8 -MAX_TOTAL_RETRIES: typing.Final[int] = 1024 +MAX_FATAL_ERROR_RETRIES: typing.Final[int] = 16 +MAX_RETRAYABLE_ERROR_RETRIES: typing.Final[int] = 8192 # Max retries before giving up at most 72 hours RETRIES_TO_RETRY: typing.Final[int] = ( - 16 # Retries to stop again or to shutdown again in STOPPING_GROUP or DELETING_GROUP + 32 # Retries to stop again or to shutdown again in STOPPING_GROUP or DELETING_GROUP ) -MAX_DELETIONS_AT_ONCE: typing.Final[int] = 16 +MAX_DELETIONS_AT_ONCE: typing.Final[int] = 32 MAX_DELETIONS_CHECKED_AT_ONCE: typing.Final[int] = MAX_DELETIONS_AT_ONCE * 2 # This interval is how long will take to check again for deletion, stopping, etc... @@ -121,7 +121,7 @@ def get_from_storage( key=lambda x: x[1].last_check, ): # if max retries reached, remove it - if info.total_retries >= MAX_TOTAL_RETRIES: + if info.total_retries >= MAX_RETRAYABLE_ERROR_RETRIES: logger.error( 'Too many retries deleting %s from service %s, removing from deferred deletion', info.vmid, @@ -153,13 +153,14 @@ def get_from_storage( class DeferredDeletionWorker(Job): - frecuency = 19 # Frequency for this job, in seconds + frecuency = 11 # Frequency for this job, in seconds friendly_name = 'Deferred deletion runner' deferred_storage: typing.ClassVar[storage.Storage] = storage.Storage('deferdel_worker') @staticmethod def add(service: 'DynamicService', vmid: str, execute_later: bool = False) -> None: + logger.debug('Adding %s from service %s to deferred deletion', vmid, service.type_name) # If sync, execute now if not execute_later: try: @@ -221,7 +222,7 @@ def _process_exception( ) return # Do not readd it info.total_retries += 1 - if info.total_retries >= MAX_TOTAL_RETRIES: + if info.total_retries >= MAX_RETRAYABLE_ERROR_RETRIES: logger.error( 'Too many retries deleting %s from service %s, removing from deferred deletion', info.vmid, @@ -232,6 +233,7 @@ def _process_exception( def process_to_stop(self) -> None: services, to_stop = DeletionInfo.get_from_storage(TO_STOP_GROUP) + logger.debug('Processing %s to stop', to_stop) # Now process waiting stops for key, info in to_stop: @@ -260,6 +262,7 @@ def process_to_stop(self) -> None: def process_stopping(self) -> None: services, stopping = DeletionInfo.get_from_storage(STOPPING_GROUP) + logger.debug('Processing %s stopping', stopping) # Now process waiting for finishing stops for key, info in stopping: @@ -285,11 +288,18 @@ def process_stopping(self) -> None: def process_to_delete(self) -> None: services, to_delete = DeletionInfo.get_from_storage(TO_DELETE_GROUP) + logger.debug('Processing %s to delete', to_delete) # Now process waiting deletions for key, info in to_delete: + service = services[info.service_uuid] try: - services[info.service_uuid].execute_delete(info.vmid) + # If must be stopped before deletion, and is running, put it on to_stop + if service.must_stop_before_deletion and service.is_running(None, info.vmid): + info.sync_to_storage(TO_STOP_GROUP) + continue + + service.execute_delete(info.vmid) # And store it for checking later if it has been deleted, reseting counters info.last_check = sql_now() info.retries = 0 @@ -305,6 +315,7 @@ def process_deleting(self) -> None: Note: Very similar to process_to_delete, but this one is for objects that are already being deleted """ services, deleting = DeletionInfo.get_from_storage(DELETING_GROUP) + logger.debug('Processing %s deleting', deleting) # Now process waiting for finishing deletions for key, info in deleting: diff --git a/server/src/uds/services/Proxmox/proxmox/types.py b/server/src/uds/services/Proxmox/proxmox/types.py index ab4d19a39..2d39c42fc 100644 --- a/server/src/uds/services/Proxmox/proxmox/types.py +++ b/server/src/uds/services/Proxmox/proxmox/types.py @@ -201,7 +201,7 @@ def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'T exitstatus=data['exitstatus'], user=data['user'], upid=data['upid'], - id=dictionary['id'], + id=data['id'], ) def is_running(self) -> bool: @@ -279,7 +279,7 @@ def from_dict(dictionary: collections.abc.MutableMapping[str, typing.Any]) -> 'V return VMInfo( status=VMStatus.from_str(dictionary['status']), - id=dictionary.get('vmid', 0), + id=int(dictionary.get('vmid', 0)), node=dictionary.get('node', ''), template=dictionary.get('template', False), agent=dictionary.get('agent', None),