From 1e8afb3ae2f4fe918494b5806a9500573b47085e Mon Sep 17 00:00:00 2001 From: Frost Ming Date: Fri, 6 Sep 2024 15:47:51 +0800 Subject: [PATCH] fix: retry for model and bento upload failure (#4961) Signed-off-by: Frost Ming --- src/bentoml/_internal/cloud/base.py | 5 +++ src/bentoml/_internal/cloud/bentocloud.py | 55 +++++++++++++++-------- 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/bentoml/_internal/cloud/base.py b/src/bentoml/_internal/cloud/base.py index 2f2b0d8d1e2..0e663228338 100644 --- a/src/bentoml/_internal/cloud/base.py +++ b/src/bentoml/_internal/cloud/base.py @@ -42,7 +42,12 @@ class CallbackIOWrapper(t.IO[bytes]): end: int | None = None def __attrs_post_init__(self) -> None: + self.reset() + + def reset(self) -> int: + read = self.tell() - (self.start or 0) self.file.seek(self.start or 0, 0) + return read def seek(self, offset: int, whence: int = 0) -> int: if whence == 2 and self.end is not None: diff --git a/src/bentoml/_internal/cloud/bentocloud.py b/src/bentoml/_internal/cloud/bentocloud.py index b9e674e5394..f65b26432e2 100644 --- a/src/bentoml/_internal/cloud/bentocloud.py +++ b/src/bentoml/_internal/cloud/bentocloud.py @@ -58,6 +58,9 @@ from .schemas.schemasv1 import ModelWithRepositoryListSchema +UPLOAD_RETRY_COUNT = 3 + + class BentoCloudClient(CloudClient): @inject def push_bento( @@ -319,16 +322,24 @@ def chunk_upload( else None, ) - resp = httpx.put( - remote_bento.presigned_upload_url, - content=chunk_io, - timeout=36000, - ) - if resp.status_code != 200: - return FinishUploadBentoSchema( - status=BentoUploadStatus.FAILED.value, - reason=resp.text, + for i in range(UPLOAD_RETRY_COUNT): + resp = httpx.put( + remote_bento.presigned_upload_url, + content=chunk_io, + timeout=36000, ) + if resp.status_code == 200: + break + if i == UPLOAD_RETRY_COUNT - 1: + return FinishUploadBentoSchema( + status=BentoUploadStatus.FAILED.value, + reason=resp.text, + ) + else: # retry and reset and update progress + read = chunk_io.reset() + self.spinner.transmission_progress.update( + upload_task_id, advance=-read + ) return resp.headers["ETag"], chunk_number futures_: list[ @@ -771,16 +782,24 @@ def chunk_upload( else None, ) - resp = httpx.put( - remote_model.presigned_upload_url, - content=chunk_io, - timeout=36000, - ) - if resp.status_code != 200: - return FinishUploadModelSchema( - status=ModelUploadStatus.FAILED.value, - reason=resp.text, + for i in range(UPLOAD_RETRY_COUNT): + resp = httpx.put( + remote_model.presigned_upload_url, + content=chunk_io, + timeout=36000, ) + if resp.status_code == 200: + break + if i == UPLOAD_RETRY_COUNT - 1: + return FinishUploadModelSchema( + status=ModelUploadStatus.FAILED.value, + reason=resp.text, + ) + else: # retry and reset and update progress + read = chunk_io.reset() + self.spinner.transmission_progress.update( + upload_task_id, advance=-read + ) return resp.headers["ETag"], chunk_number futures_: list[