diff --git a/CHANGES/4068.bugfix b/CHANGES/4068.bugfix new file mode 100644 index 00000000000..12f86396672 --- /dev/null +++ b/CHANGES/4068.bugfix @@ -0,0 +1 @@ +Taught pulp-import to never use more than 25% of available worker-threads. diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index 8544cb56b81..e3d7a99c7a6 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -302,6 +302,9 @@ "from pulpcore.tasking.util import cancel_task", ] +# What percentage of available-workers will pulpimport use at a time, max? +IMPORT_WORKERS_PERCENT = 25 + # HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py) # Read more at https://dynaconf.readthedocs.io/en/latest/guides/django.html from dynaconf import DjangoDynaconf, Validator # noqa diff --git a/pulpcore/app/tasks/importer.py b/pulpcore/app/tasks/importer.py index caaf51e388e..2d3c86ade9d 100644 --- a/pulpcore/app/tasks/importer.py +++ b/pulpcore/app/tasks/importer.py @@ -8,6 +8,7 @@ from gettext import gettext as _ from logging import getLogger +from django.conf import settings from django.core.files.storage import default_storage from django.db.models import F from naya.json import stream_array, tokenize @@ -35,6 +36,7 @@ RepositoryResource, ) from pulpcore.constants import TASK_STATES +from pulpcore.tasking.pulpcore_worker import Worker from pulpcore.tasking.tasks import dispatch from pulpcore.plugin.importexport import BaseContentResource @@ -506,6 +508,15 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): default_storage.save(base_path, f) # Now import repositories, in parallel. + + # We want to limit the number of available-workers that import will consume, so that + # pulp can continue to work while doing an import. We accomplish this by creating a + # reserved-resource string for each repo-import-task based on that repo's index in + # the dispatch loop, mod number-of-workers-to-consume. + import_workers_percent = int(settings.get("IMPORT_WORKERS_PERCENT", 25)) + total_workers = Worker.objects.online_workers().count() + import_workers = max(1, int(total_workers * (import_workers_percent / 100.0))) + with open(os.path.join(temp_dir, REPO_FILE), "r") as repo_data_file: data = json.load(repo_data_file) gpr = GroupProgressReport( @@ -517,14 +528,16 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): ) gpr.save() - for src_repo in data: + for index, src_repo in enumerate(data): + # Lock the repo we're importing-into dest_repo_name = _get_destination_repo_name(importer, src_repo["name"]) - + # pulpcore-worker limiter + worker_rsrc = f"import-worker-{index % import_workers}" + exclusive_resources = [worker_rsrc] try: dest_repo = Repository.objects.get(name=dest_repo_name) except Repository.DoesNotExist: if create_repositories: - exclusive_resources = [] dest_repo_pk = "" else: log.warning( @@ -534,7 +547,7 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): ) continue else: - exclusive_resources = [dest_repo] + exclusive_resources.append(dest_repo) dest_repo_pk = dest_repo.pk dispatch(