Skip to content

Commit

Permalink
sync: Always use WORKER_BATCH_SIZE
Browse files Browse the repository at this point in the history
With 551285f, the comment about number
of workers no longer stands - dict is shared among multiprocesses and
real time information is available.

Using 2.7k projects as the baseline, using chunk size of 4 takes close
to 5 minutes. A chunk size of 32 takes this down to 40s - a reduction of
rougly 8 times which matches the increase.

R=gavinmak@google.com

Bug: b/371638995
Change-Id: Ida5fd8f7abc44b3b82c02aa0f7f7ae01dff5eb07
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/438523
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
Tested-by: Josip Sokcevic <sokcevic@google.com>
Reviewed-by: Gavin Mak <gavinmak@google.com>
  • Loading branch information
sokcevicG authored and LUCI committed Oct 7, 2024
1 parent f7f9dd4 commit 454fdaf
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 27 deletions.
19 changes: 9 additions & 10 deletions project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2396,26 +2396,25 @@ def _CheckForImmutableRevision(self):
try:
# if revision (sha or tag) is not present then following function
# throws an error.
revs = [f"{self.revisionExpr}^0"]
upstream_rev = None
if self.upstream:
upstream_rev = self.GetRemote().ToLocal(self.upstream)
revs.append(upstream_rev)

self.bare_git.rev_list(
"-1",
"--missing=allow-any",
"%s^0" % self.revisionExpr,
*revs,
"--",
log_as_error=False,
)

if self.upstream:
rev = self.GetRemote().ToLocal(self.upstream)
self.bare_git.rev_list(
"-1",
"--missing=allow-any",
"%s^0" % rev,
"--",
log_as_error=False,
)
self.bare_git.merge_base(
"--is-ancestor",
self.revisionExpr,
rev,
upstream_rev,
log_as_error=False,
)
return True
Expand Down
27 changes: 10 additions & 17 deletions subcmds/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ def _SafeCheckoutOrder(checkouts: List[Project]) -> List[List[Project]]:
return res


def _chunksize(projects: int, jobs: int) -> int:
"""Calculate chunk size for the given number of projects and jobs."""
return min(max(1, projects // jobs), WORKER_BATCH_SIZE)


class _FetchOneResult(NamedTuple):
"""_FetchOne return value.
Expand Down Expand Up @@ -819,7 +824,6 @@ def _GetSyncProgressMessage(self):
def _Fetch(self, projects, opt, err_event, ssh_proxy, errors):
ret = True

jobs = opt.jobs_network
fetched = set()
remote_fetched = set()
pm = Progress(
Expand Down Expand Up @@ -849,6 +853,8 @@ def _MonitorSyncLoop():
objdir_project_map.setdefault(project.objdir, []).append(project)
projects_list = list(objdir_project_map.values())

jobs = min(opt.jobs_network, len(projects_list))

def _ProcessResults(results_sets):
ret = True
for results in results_sets:
Expand Down Expand Up @@ -888,35 +894,22 @@ def _ProcessResults(results_sets):
Sync.ssh_proxy = None

# NB: Multiprocessing is heavy, so don't spin it up for one job.
if len(projects_list) == 1 or jobs == 1:
if jobs == 1:
self._FetchInitChild(ssh_proxy)
if not _ProcessResults(
self._FetchProjectList(opt, x) for x in projects_list
):
ret = False
else:
# Favor throughput over responsiveness when quiet. It seems that
# imap() will yield results in batches relative to chunksize, so
# even as the children finish a sync, we won't see the result until
# one child finishes ~chunksize jobs. When using a large --jobs
# with large chunksize, this can be jarring as there will be a large
# initial delay where repo looks like it isn't doing anything and
# sits at 0%, but then suddenly completes a lot of jobs all at once.
# Since this code is more network bound, we can accept a bit more
# CPU overhead with a smaller chunksize so that the user sees more
# immediate & continuous feedback.
if opt.quiet:
chunksize = WORKER_BATCH_SIZE
else:
if not opt.quiet:
pm.update(inc=0, msg="warming up")
chunksize = 4
with multiprocessing.Pool(
jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,)
) as pool:
results = pool.imap_unordered(
functools.partial(self._FetchProjectList, opt),
projects_list,
chunksize=chunksize,
chunksize=_chunksize(len(projects_list), jobs),
)
if not _ProcessResults(results):
ret = False
Expand Down
24 changes: 24 additions & 0 deletions tests/test_subcmds_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,30 @@ def test_complex_nested(self):
)


class Chunksize(unittest.TestCase):
"""Tests for _chunksize."""

def test_single_project(self):
"""Single project."""
self.assertEqual(sync._chunksize(1, 1), 1)

def test_low_project_count(self):
"""Multiple projects, low number of projects to sync."""
self.assertEqual(sync._chunksize(10, 1), 10)
self.assertEqual(sync._chunksize(10, 2), 5)
self.assertEqual(sync._chunksize(10, 4), 2)
self.assertEqual(sync._chunksize(10, 8), 1)
self.assertEqual(sync._chunksize(10, 16), 1)

def test_high_project_count(self):
"""Multiple projects, high number of projects to sync."""
self.assertEqual(sync._chunksize(2800, 1), 32)
self.assertEqual(sync._chunksize(2800, 16), 32)
self.assertEqual(sync._chunksize(2800, 32), 32)
self.assertEqual(sync._chunksize(2800, 64), 32)
self.assertEqual(sync._chunksize(2800, 128), 21)


class GetPreciousObjectsState(unittest.TestCase):
"""Tests for _GetPreciousObjectsState."""

Expand Down

0 comments on commit 454fdaf

Please sign in to comment.