diff --git a/CHANGES/4068.bugfix b/CHANGES/4068.bugfix new file mode 100644 index 0000000000..12f8639667 --- /dev/null +++ b/CHANGES/4068.bugfix @@ -0,0 +1 @@ +Taught pulp-import to never use more than 25% of available worker-threads. diff --git a/pulpcore/app/tasks/importer.py b/pulpcore/app/tasks/importer.py index caaf51e388..c60a51e636 100644 --- a/pulpcore/app/tasks/importer.py +++ b/pulpcore/app/tasks/importer.py @@ -8,6 +8,7 @@ from gettext import gettext as _ from logging import getLogger +from django.conf import settings from django.core.files.storage import default_storage from django.db.models import F from naya.json import stream_array, tokenize @@ -35,6 +36,7 @@ RepositoryResource, ) from pulpcore.constants import TASK_STATES +from pulpcore.tasking.pulpcore_worker import Worker from pulpcore.tasking.tasks import dispatch from pulpcore.plugin.importexport import BaseContentResource @@ -506,6 +508,15 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): default_storage.save(base_path, f) # Now import repositories, in parallel. + + # We want to limit the number of available-workers that import will consume, so that + # pulp can continue to work while doing an import. We accomplish this by creating a + # reserved-resource string for each repo-import-task based on that repo's index in + # the dispatch loop, mod number-of-workers-to-consume. + import_workers_percent = int(settings.get("IMPORT_WORKERS_PERCENT", 25)) + total_workers = Worker.objects.online_workers().count() + import_workers = max(1, int(total_workers * (import_workers_percent / 100.0))) + with open(os.path.join(temp_dir, REPO_FILE), "r") as repo_data_file: data = json.load(repo_data_file) gpr = GroupProgressReport( @@ -517,14 +528,16 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): ) gpr.save() - for src_repo in data: + for index, src_repo in enumerate(data): + # Lock the repo we're importing-into dest_repo_name = _get_destination_repo_name(importer, src_repo["name"]) - + # pulpcore-worker limiter + worker_rsrc = f"import-worker-{index % import_workers}" try: dest_repo = Repository.objects.get(name=dest_repo_name) except Repository.DoesNotExist: if create_repositories: - exclusive_resources = [] + exclusive_resources = [worker_rsrc] dest_repo_pk = "" else: log.warning( @@ -534,7 +547,7 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): ) continue else: - exclusive_resources = [dest_repo] + exclusive_resources = [dest_repo, worker_rsrc] dest_repo_pk = dest_repo.pk dispatch(