diff --git a/src/huggingface_hub/_upload_large_folder.py b/src/huggingface_hub/_upload_large_folder.py index fe05139c31..bb336de172 100644 --- a/src/huggingface_hub/_upload_large_folder.py +++ b/src/huggingface_hub/_upload_large_folder.py @@ -41,6 +41,8 @@ logger = logging.getLogger(__name__) WAITING_TIME_IF_NO_TASKS = 10 # seconds +MAX_NB_REGULAR_FILES_PER_COMMIT = 75 +MAX_NB_LFS_FILES_PER_COMMIT = 150 def upload_large_folder_internal( @@ -373,17 +375,18 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob, if ( status.nb_workers_commit == 0 and status.queue_commit.qsize() > 0 - and (status.last_commit_attempt is None or time.time() - status.last_commit_attempt > 5 * 60) + and status.last_commit_attempt is not None + and time.time() - status.last_commit_attempt > 5 * 60 ): status.nb_workers_commit += 1 logger.debug("Job: commit (more than 5 minutes since last commit attempt)") - return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25)) + return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit)) - # 2. Commit if at least 25 files are ready to commit - elif status.nb_workers_commit == 0 and status.queue_commit.qsize() >= 25: + # 2. Commit if at least 100 files are ready to commit + elif status.nb_workers_commit == 0 and status.queue_commit.qsize() >= 150: status.nb_workers_commit += 1 - logger.debug("Job: commit (>25 files ready)") - return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25)) + logger.debug("Job: commit (>100 files ready)") + return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit)) # 3. Get upload mode if at least 10 files elif status.queue_get_upload_mode.qsize() >= 10: @@ -430,18 +433,39 @@ def _determine_next_job(status: LargeUploadStatus) -> Optional[Tuple[WorkerJob, logger.debug("Job: get upload mode") return (WorkerJob.GET_UPLOAD_MODE, _get_n(status.queue_get_upload_mode, 50)) - # 10. Commit if at least 1 file - elif status.nb_workers_commit == 0 and status.queue_commit.qsize() > 0: + # 10. Commit if at least 1 file and 1 min since last commit attempt + elif ( + status.nb_workers_commit == 0 + and status.queue_commit.qsize() > 0 + and status.last_commit_attempt is not None + and time.time() - status.last_commit_attempt > 1 * 60 + ): + status.nb_workers_commit += 1 + logger.debug("Job: commit (1 min since last commit attempt)") + return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit)) + + # 11. Commit if at least 1 file all other queues are empty and all workers are waiting + # e.g. when it's the last commit + elif ( + status.nb_workers_commit == 0 + and status.queue_commit.qsize() > 0 + and status.queue_sha256.qsize() == 0 + and status.queue_get_upload_mode.qsize() == 0 + and status.queue_preupload_lfs.qsize() == 0 + and status.nb_workers_sha256 == 0 + and status.nb_workers_get_upload_mode == 0 + and status.nb_workers_preupload_lfs == 0 + ): status.nb_workers_commit += 1 logger.debug("Job: commit") - return (WorkerJob.COMMIT, _get_n(status.queue_commit, 25)) + return (WorkerJob.COMMIT, _get_items_to_commit(status.queue_commit)) - # 11. If all queues are empty, exit + # 12. If all queues are empty, exit elif all(metadata.is_committed or metadata.should_ignore for _, metadata in status.items): logger.info("All files have been processed! Exiting worker.") return None - # 12. If no task is available, wait + # 13. If no task is available, wait else: status.nb_workers_waiting += 1 logger.debug(f"No task available, waiting... ({WAITING_TIME_IF_NO_TASKS}s)") @@ -547,6 +571,30 @@ def _get_n(queue: "queue.Queue[JOB_ITEM_T]", n: int) -> List[JOB_ITEM_T]: return [queue.get() for _ in range(min(queue.qsize(), n))] +def _get_items_to_commit(queue: "queue.Queue[JOB_ITEM_T]") -> List[JOB_ITEM_T]: + """Special case for commit job: the number of items to commit depends on the type of files.""" + # Can take at most 50 regular files and/or 100 LFS files in a single commit + items: List[JOB_ITEM_T] = [] + nb_lfs, nb_regular = 0, 0 + while True: + # If empty queue => commit everything + if queue.qsize() == 0: + return items + + # If we have enough items => commit them + if nb_lfs >= MAX_NB_LFS_FILES_PER_COMMIT or nb_regular >= MAX_NB_REGULAR_FILES_PER_COMMIT: + return items + + # Else, get a new item and increase counter + item = queue.get() + items.append(item) + _, metadata = item + if metadata.upload_mode == "lfs": + nb_lfs += 1 + else: + nb_regular += 1 + + def _print_overwrite(report: str) -> None: """Print a report, overwriting the previous lines. diff --git a/src/huggingface_hub/hf_api.py b/src/huggingface_hub/hf_api.py index e8ee98c381..1292e4b1aa 100644 --- a/src/huggingface_hub/hf_api.py +++ b/src/huggingface_hub/hf_api.py @@ -5342,7 +5342,7 @@ def upload_large_folder( Order of priority: 1. Commit if more than 5 minutes since last commit attempt (and at least 1 file). - 2. Commit if at least 25 files are ready to commit. + 2. Commit if at least 150 files are ready to commit. 3. Get upload mode if at least 10 files have been hashed. 4. Pre-upload LFS file if at least 1 file and no worker is pre-uploading. 5. Hash file if at least 1 file and no worker is hashing. @@ -5350,7 +5350,8 @@ def upload_large_folder( 7. Pre-upload LFS file if at least 1 file (exception: if hf_transfer is enabled, only 1 worker can preupload LFS at a time). 8. Hash file if at least 1 file to hash. 9. Get upload mode if at least 1 file to get upload mode. - 10. Commit if at least 1 file to commit. + 10. Commit if at least 1 file to commit and at least 1 min since last commit attempt. + 11. Commit if at least 1 file to commit and all other queues are empty. Special rules: - If `hf_transfer` is enabled, only 1 LFS uploader at a time. Otherwise the CPU would be bloated by `hf_transfer`.