Skip to content

Commit 0fdb88a

Browse files
committed
remote: s3: adjust jobs number basing on file descriptors number
1 parent bdfeba8 commit 0fdb88a

File tree

4 files changed

+40
-6
lines changed

4 files changed

+40
-6
lines changed

dvc/remote/base.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
from shortuuid import uuid
1515

16+
from funcy import cached_property
17+
1618
import dvc.prompt as prompt
1719
from dvc.config import Config
1820
from dvc.exceptions import (
@@ -75,7 +77,6 @@ class RemoteBASE(object):
7577
scheme = "base"
7678
path_cls = URLInfo
7779
REQUIRES = {}
78-
JOBS = 4 * cpu_count()
7980

8081
PARAM_RELPATH = "relpath"
8182
CHECKSUM_DIR_SUFFIX = ".dir"
@@ -84,6 +85,10 @@ class RemoteBASE(object):
8485

8586
state = StateNoop()
8687

88+
@cached_property
89+
def jobs(self):
90+
return cpu_count() * 4
91+
8792
def __init__(self, repo, config):
8893
self.repo = repo
8994

@@ -746,6 +751,11 @@ def changed_cache(self, checksum):
746751
return self._changed_dir_cache(checksum)
747752
return self.changed_cache_file(checksum)
748753

754+
def _adjust_jobs(self, jobs=None):
755+
if not jobs:
756+
jobs = self.jobs
757+
return jobs
758+
749759
def cache_exists(self, checksums, jobs=None, name=None):
750760
"""Check if the given checksums are stored in the remote.
751761
@@ -784,7 +794,9 @@ def exists_with_progress(path_info):
784794
pbar.update_desc(str(path_info))
785795
return ret
786796

787-
with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor:
797+
with ThreadPoolExecutor(
798+
max_workers=self._adjust_jobs(jobs)
799+
) as executor:
788800
path_infos = map(self.checksum_to_path_info, checksums)
789801
in_remote = executor.map(exists_with_progress, path_infos)
790802
ret = list(itertools.compress(checksums, in_remote))

dvc/remote/local.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ def _process(
339339
status = STATUS_NEW
340340

341341
if jobs is None:
342-
jobs = remote.JOBS
342+
jobs = remote.jobs
343343

344344
status_info = self.status(
345345
named_cache,

dvc/remote/s3.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import logging
55
import os
6+
import resource
67
import threading
78

89
from funcy import cached_property, wrap_prop
@@ -335,3 +336,20 @@ def _append_aws_grants_to_extra_args(self, config):
335336
)
336337

337338
self.extra_args[extra_args_key] = config.get(grant_option)
339+
340+
def _adjust_jobs(self, jobs=None):
341+
jobs = super(RemoteS3, self)._adjust_jobs(jobs)
342+
343+
descriptor_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
344+
estimated_descriptors_num = jobs * 20
345+
if estimated_descriptors_num <= descriptor_limit - 10:
346+
return jobs
347+
348+
jobs = (descriptor_limit - 10) // 20
349+
logger.warning(
350+
"Parallelization reduced to '{}' jobs. Increase open "
351+
"file descriptors limit to more than '{}' to prevent "
352+
"the "
353+
"reduction.".format(jobs, estimated_descriptors_num + 10)
354+
)
355+
return jobs

dvc/remote/ssh/__init__.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ class RemoteSSH(RemoteBASE):
4040
scheme = Schemes.SSH
4141
REQUIRES = {"paramiko": "paramiko"}
4242

43-
JOBS = 4
4443
PARAM_CHECKSUM = "md5"
4544
DEFAULT_PORT = 22
4645
TIMEOUT = 1800
@@ -51,6 +50,10 @@ class RemoteSSH(RemoteBASE):
5150

5251
DEFAULT_CACHE_TYPES = ["copy"]
5352

53+
@property
54+
def jobs(self):
55+
return 4
56+
5457
def __init__(self, repo, config):
5558
super(RemoteSSH, self).__init__(repo, config)
5659
url = config.get(Config.SECTION_REMOTE_URL)
@@ -325,9 +328,10 @@ def cache_exists(self, checksums, jobs=None, name=None):
325328
def exists_with_progress(chunks):
326329
return self.batch_exists(chunks, callback=pbar.update_desc)
327330

328-
with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor:
331+
jobs = self._adjust_jobs(jobs)
332+
with ThreadPoolExecutor(max_workers=jobs) as executor:
329333
path_infos = [self.checksum_to_path_info(x) for x in checksums]
330-
chunks = to_chunks(path_infos, num_chunks=self.JOBS)
334+
chunks = to_chunks(path_infos, num_chunks=jobs)
331335
results = executor.map(exists_with_progress, chunks)
332336
in_remote = itertools.chain.from_iterable(results)
333337
ret = list(itertools.compress(checksums, in_remote))

0 commit comments

Comments
 (0)