From b4bae5003a5870dc65066c753c26d195410915ca Mon Sep 17 00:00:00 2001 From: Grant Gainey Date: Wed, 26 Jul 2023 15:37:42 -0400 Subject: [PATCH] Taught import to be able to use less than all-available workers. IMPORT_WORKERS_PERCENT is configurable in settings. We will document/expose this in a future PR, to keep this one maximally backportable. Default behavior remains "all workers". fixes #4068. (cherry picked from commit b5cb1d19a130b67aff5c818e1a8b1ce1e8655a67) --- CHANGES/4068.bugfix | 1 + pulpcore/app/settings.py | 4 ++++ pulpcore/app/tasks/importer.py | 24 ++++++++++++++++++++---- 3 files changed, 25 insertions(+), 4 deletions(-) create mode 100644 CHANGES/4068.bugfix diff --git a/CHANGES/4068.bugfix b/CHANGES/4068.bugfix new file mode 100644 index 0000000000..0c5ef8aa0b --- /dev/null +++ b/CHANGES/4068.bugfix @@ -0,0 +1 @@ +Taught pulp-import to be able to use a subset of available worker-threads. diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index 318fe18bde..cce959318a 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -300,6 +300,10 @@ "from pulpcore.tasking.util import cancel_task", ] +# What percentage of available-workers will pulpimport use at a time, max +# By default, use all available workers. +IMPORT_WORKERS_PERCENT = 100 + # HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py) # Read more at https://dynaconf.readthedocs.io/en/latest/guides/django.html from dynaconf import DjangoDynaconf, Validator # noqa diff --git a/pulpcore/app/tasks/importer.py b/pulpcore/app/tasks/importer.py index caaf51e388..9e1886f78b 100644 --- a/pulpcore/app/tasks/importer.py +++ b/pulpcore/app/tasks/importer.py @@ -8,6 +8,7 @@ from gettext import gettext as _ from logging import getLogger +from django.conf import settings from django.core.files.storage import default_storage from django.db.models import F from naya.json import stream_array, tokenize @@ -35,6 +36,7 @@ RepositoryResource, ) from pulpcore.constants import TASK_STATES +from pulpcore.tasking.pulpcore_worker import Worker from pulpcore.tasking.tasks import dispatch from pulpcore.plugin.importexport import BaseContentResource @@ -506,6 +508,18 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): default_storage.save(base_path, f) # Now import repositories, in parallel. + + # We want to be able to limit the number of available-workers that import will consume, + # so that pulp can continue to work while doing an import. We accomplish this by creating + # a reserved-resource string for each repo-import-task based on that repo's index in + # the dispatch loop, mod number-of-workers-to-consume. + # + # By default (setting is not-set), import will continue to use 100% of the available + # workers. + import_workers_percent = int(settings.get("IMPORT_WORKERS_PERCENT", 100)) + total_workers = Worker.objects.online_workers().count() + import_workers = max(1, int(total_workers * (import_workers_percent / 100.0))) + with open(os.path.join(temp_dir, REPO_FILE), "r") as repo_data_file: data = json.load(repo_data_file) gpr = GroupProgressReport( @@ -517,14 +531,16 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): ) gpr.save() - for src_repo in data: + for index, src_repo in enumerate(data): + # Lock the repo we're importing-into dest_repo_name = _get_destination_repo_name(importer, src_repo["name"]) - + # pulpcore-worker limiter + worker_rsrc = f"import-worker-{index % import_workers}" + exclusive_resources = [worker_rsrc] try: dest_repo = Repository.objects.get(name=dest_repo_name) except Repository.DoesNotExist: if create_repositories: - exclusive_resources = [] dest_repo_pk = "" else: log.warning( @@ -534,7 +550,7 @@ def safe_extract(tar, path=".", members=None, *, numeric_owner=False): ) continue else: - exclusive_resources = [dest_repo] + exclusive_resources.append(dest_repo) dest_repo_pk = dest_repo.pk dispatch(