From 5859740131fb8425a5f435cc4e685de1a9d0f277 Mon Sep 17 00:00:00 2001 From: Cedric Hombourger Date: Fri, 27 Sep 2024 16:50:22 +0200 Subject: [PATCH] fix(storage): add retry mechanism to recover from a full input queue If the client writes data too quickly, the agent may see its input queue full. Give ourselves up to 30 seconds to submit the data to be written to the shared storage before giving up. Refactor the code between local and s3 sources to make error handling consistent. Signed-off-by: Cedric Hombourger --- mtda/client.py | 51 ++++++++++++++++++++--------------------------- mtda/constants.py | 2 ++ 2 files changed, 24 insertions(+), 29 deletions(-) diff --git a/mtda/client.py b/mtda/client.py index 5d0a5eb8..b2b53fca 100644 --- a/mtda/client.py +++ b/mtda/client.py @@ -491,6 +491,24 @@ def progress(self): def size(self): return None + def _write_to_storage(self, data): + max_tries = int(CONSTS.STORAGE.TIMEOUT / CONSTS.STORAGE.RETRY_INTERVAL) + + for _ in range(max_tries): + result = self._agent.storage_write(data, self._session) + if result != 0: + break + time.sleep(CONSTS.STORAGE.RETRY_INTERVAL) + + if result > 0: + return result + elif result < 0: + exc = 'write or decompression error from shared storage' + raise IOError(exc) + else: + exc = 'timeout from shared storage' + raise IOError(exc) + class ImageLocal(ImageFile): """ An image from the local file-system to be copied over to the shared @@ -519,24 +537,10 @@ def copy(self): inputstream = image try: - data = inputstream.read(self._blksz) - while data: + while (data := inputstream.read(self._blksz)): self._totalread = image.tell() self.progress() - - # Write block to shared storage device - bytes_wanted = self._agent.storage_write(data, self._session) - - # Check what to do next - if bytes_wanted < 0: - raise IOError('write or decompression error from the ' - 'shared storage') - elif bytes_wanted > 0: - # Read next block - data = inputstream.read(bytes_wanted) - else: - # Agent may continue without further data - data = b'' + self._write_to_storage(data) finally: if comp_on_the_fly: @@ -612,19 +616,8 @@ def write(self, data): self._totalread += dataread # Write block to shared storage device - bytes_wanted = 0 - while bytes_wanted == 0: - # Report progress - self.progress() - - # Write downloaded data to storage - bytes_wanted = self._agent.storage_write(data, self._session) - if bytes_wanted == 0: - # Agent may continue without further data - data = b'' - elif bytes_wanted < 0: - # Write failure - raise IOError('write or decompression error') + self.progress() + self._write_to_storage(data) return dataread diff --git a/mtda/constants.py b/mtda/constants.py index 475d8ca2..ee74eb0c 100644 --- a/mtda/constants.py +++ b/mtda/constants.py @@ -67,6 +67,8 @@ class STORAGE: LOCKED = "LOCKED" UNLOCKED = "UNLOCKED" UNKNOWN = "???" + RETRY_INTERVAL = 0.5 + TIMEOUT = 30 class WRITER: