Skip to content

Commit

Permalink
fix(storage): add retry mechanism to recover from a full input queue
Browse files Browse the repository at this point in the history
If the client writes data too quickly, the agent may see its input queue
full. Give ourselves up to 30 seconds to submit the data to be written to
the shared storage before giving up. Refactor the code between local and
s3 sources to make error handling consistent.

Signed-off-by: Cedric Hombourger <cedric.hombourger@siemens.com>
  • Loading branch information
chombourger committed Sep 29, 2024
1 parent 0e5eb5d commit 4a7edda
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 29 deletions.
51 changes: 22 additions & 29 deletions mtda/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,24 @@ def progress(self):
def size(self):
return None

def _write_to_storage(self, data):
max_tries = int(CONSTS.STORAGE.TIMEOUT / CONSTS.STORAGE.RETRY_INTERVAL)

for _ in range(max_tries):
result = self._agent.storage_write(data, self._session)
if result != 0:
break
time.sleep(CONSTS.STORAGE.RETRY_INTERVAL)

if result > 0:
return result
elif result < 0:
exc = 'write or decompression error from shared storage'
raise IOError(exc)
else:
exc = 'timeout from shared storage'
raise IOError(exc)


class ImageLocal(ImageFile):
""" An image from the local file-system to be copied over to the shared
Expand Down Expand Up @@ -519,24 +537,10 @@ def copy(self):
inputstream = image

try:
data = inputstream.read(self._blksz)
while data:
while (data := inputstream.read(self._blksz)):
self._totalread = image.tell()
self.progress()

# Write block to shared storage device
bytes_wanted = self._agent.storage_write(data, self._session)

# Check what to do next
if bytes_wanted < 0:
raise IOError('write or decompression error from the '
'shared storage')
elif bytes_wanted > 0:
# Read next block
data = inputstream.read(bytes_wanted)
else:
# Agent may continue without further data
data = b''
self._write_to_storage(data)

finally:
if comp_on_the_fly:
Expand Down Expand Up @@ -612,19 +616,8 @@ def write(self, data):
self._totalread += dataread

# Write block to shared storage device
bytes_wanted = 0
while bytes_wanted == 0:
# Report progress
self.progress()

# Write downloaded data to storage
bytes_wanted = self._agent.storage_write(data, self._session)
if bytes_wanted == 0:
# Agent may continue without further data
data = b''
elif bytes_wanted < 0:
# Write failure
raise IOError('write or decompression error')
self.progress()
self._write_to_storage(data)

return dataread

Expand Down
2 changes: 2 additions & 0 deletions mtda/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class STORAGE:
LOCKED = "LOCKED"
UNLOCKED = "UNLOCKED"
UNKNOWN = "???"
RETRY_INTERVAL = 0.5
TIMEOUT = 30


class WRITER:
Expand Down

0 comments on commit 4a7edda

Please sign in to comment.