Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PR #4160/b5cb1d19 backport][3.21] Taught import to be able to use less than all-available workers. #4184

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/4068.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Taught pulp-import to be able to use a subset of available worker-threads.
4 changes: 4 additions & 0 deletions pulpcore/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@

TELEMETRY = True

# What percentage of available-workers will pulpimport use at a time, max
# By default, use all available workers.
IMPORT_WORKERS_PERCENT = 100

# HERE STARTS DYNACONF EXTENSION LOAD (Keep at the very bottom of settings.py)
# Read more at https://dynaconf.readthedocs.io/en/latest/guides/django.html
from dynaconf import DjangoDynaconf, Validator # noqa
Expand Down
24 changes: 20 additions & 4 deletions pulpcore/app/tasks/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from gettext import gettext as _
from logging import getLogger

from django.conf import settings
from django.core.files.storage import default_storage
from django.db.models import F
from naya.json import stream_array, tokenize
Expand Down Expand Up @@ -35,6 +36,7 @@
RepositoryResource,
)
from pulpcore.constants import TASK_STATES
from pulpcore.tasking.pulpcore_worker import Worker
from pulpcore.tasking.tasks import dispatch

from pulpcore.plugin.importexport import BaseContentResource
Expand Down Expand Up @@ -489,6 +491,18 @@ def validate_and_assemble(toc_filename):
default_storage.save(base_path, f)

# Now import repositories, in parallel.

# We want to be able to limit the number of available-workers that import will consume,
# so that pulp can continue to work while doing an import. We accomplish this by creating
# a reserved-resource string for each repo-import-task based on that repo's index in
# the dispatch loop, mod number-of-workers-to-consume.
#
# By default (setting is not-set), import will continue to use 100% of the available
# workers.
import_workers_percent = int(settings.get("IMPORT_WORKERS_PERCENT", 100))
total_workers = Worker.objects.online_workers().count()
import_workers = max(1, int(total_workers * (import_workers_percent / 100.0)))

with open(os.path.join(temp_dir, REPO_FILE), "r") as repo_data_file:
data = json.load(repo_data_file)
gpr = GroupProgressReport(
Expand All @@ -500,14 +514,16 @@ def validate_and_assemble(toc_filename):
)
gpr.save()

for src_repo in data:
for index, src_repo in enumerate(data):
# Lock the repo we're importing-into
dest_repo_name = _get_destination_repo_name(importer, src_repo["name"])

# pulpcore-worker limiter
worker_rsrc = f"import-worker-{index % import_workers}"
exclusive_resources = [worker_rsrc]
try:
dest_repo = Repository.objects.get(name=dest_repo_name)
except Repository.DoesNotExist:
if create_repositories:
exclusive_resources = []
dest_repo_pk = ""
else:
log.warning(
Expand All @@ -517,7 +533,7 @@ def validate_and_assemble(toc_filename):
)
continue
else:
exclusive_resources = [dest_repo]
exclusive_resources.append(dest_repo)
dest_repo_pk = dest_repo.pk

dispatch(
Expand Down