Skip to content

Commit

Permalink
sync: Show elapsed time for the longest syncing project
Browse files Browse the repository at this point in the history
"Last synced: X" is printed only after a project finishes syncing.
Replace that with a message that shows the longest actively syncing
project.

Bug: https://crbug.com/gerrit/11293
Change-Id: I84c7873539d84999772cd554f426b44921521e85
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/372674
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Joanna Wang <jojwang@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
  • Loading branch information
Gavin Mak authored and LUCI committed May 18, 2023
1 parent 131fc96 commit 551285f
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 26 deletions.
49 changes: 30 additions & 19 deletions progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@ def __init__(
title,
total=0,
units="",
print_newline=False,
delay=True,
quiet=False,
show_elapsed=False,
elide=False,
):
self._title = title
self._total = total
self._done = 0
self._start = time.time()
self._show = not delay
self._units = units
self._print_newline = print_newline
self._elide = elide
# Only show the active jobs section if we run more than one in parallel.
self._show_jobs = False
self._active = 0
Expand All @@ -118,10 +118,18 @@ def __init__(

def _update_loop(self):
while True:
if self._update_event.is_set():
self.update(inc=0)
if self._update_event.wait(timeout=1):
return
self.update(inc=0, msg=self._last_msg)
time.sleep(1)

def _write(self, s):
s = "\r" + s
if self._elide:
col = os.get_terminal_size().columns
if len(s) > col:
s = s[: col - 1] + ".."
sys.stderr.write(s)
sys.stderr.flush()

def start(self, name):
self._active += 1
Expand All @@ -133,8 +141,16 @@ def finish(self, name):
self.update(msg="finished " + name)
self._active -= 1

def update(self, inc=1, msg=""):
def update(self, inc=1, msg=None):
"""Updates the progress indicator.
Args:
inc: The number of items completed.
msg: The message to display. If None, use the last message.
"""
self._done += inc
if msg is None:
msg = self._last_msg
self._last_msg = msg

if _NOT_TTY or IsTraceToStderr():
Expand All @@ -148,10 +164,9 @@ def update(self, inc=1, msg=""):
return

if self._total <= 0:
sys.stderr.write(
"\r%s: %d,%s" % (self._title, self._done, CSI_ERASE_LINE_AFTER)
self._write(
"%s: %d,%s" % (self._title, self._done, CSI_ERASE_LINE_AFTER)
)
sys.stderr.flush()
else:
p = (100 * self._done) / self._total
if self._show_jobs:
Expand All @@ -165,8 +180,8 @@ def update(self, inc=1, msg=""):
elapsed = f" {elapsed_str(elapsed_sec)} |"
else:
elapsed = ""
sys.stderr.write(
"\r%s: %2d%% %s(%d%s/%d%s)%s %s%s%s"
self._write(
"%s: %2d%% %s(%d%s/%d%s)%s %s%s"
% (
self._title,
p,
Expand All @@ -178,10 +193,8 @@ def update(self, inc=1, msg=""):
elapsed,
msg,
CSI_ERASE_LINE_AFTER,
"\n" if self._print_newline else "",
)
)
sys.stderr.flush()

def end(self):
self._update_event.set()
Expand All @@ -190,15 +203,14 @@ def end(self):

duration = duration_str(time.time() - self._start)
if self._total <= 0:
sys.stderr.write(
"\r%s: %d, done in %s%s\n"
self._write(
"%s: %d, done in %s%s\n"
% (self._title, self._done, duration, CSI_ERASE_LINE_AFTER)
)
sys.stderr.flush()
else:
p = (100 * self._done) / self._total
sys.stderr.write(
"\r%s: %3d%% (%d%s/%d%s), done in %s%s\n"
self._write(
"%s: %3d%% (%d%s/%d%s), done in %s%s\n"
% (
self._title,
p,
Expand All @@ -210,4 +222,3 @@ def end(self):
CSI_ERASE_LINE_AFTER,
)
)
sys.stderr.flush()
2 changes: 1 addition & 1 deletion subcmds/abandon.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _ProcessResults(_pool, pm, states):
success[branch].append(project)
else:
err[branch].append(project)
pm.update()
pm.update(msg="")

self.ExecuteInParallel(
opt.jobs,
Expand Down
2 changes: 1 addition & 1 deletion subcmds/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _ProcessResults(_pool, pm, results):
success.append(project)
else:
err.append(project)
pm.update()
pm.update(msg="")

self.ExecuteInParallel(
opt.jobs,
Expand Down
4 changes: 2 additions & 2 deletions subcmds/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ def Execute(self, opt, args):
sync_buf = SyncBuffer(self.manifest.manifestProject.config)
project.Sync_LocalHalf(sync_buf)
project.revisionId = gitc_project.old_revision
pm.update()
pm.update(msg="")
pm.end()

def _ProcessResults(_pool, pm, results):
for result, project in results:
if not result:
err.append(project)
pm.update()
pm.update(msg="")

self.ExecuteInParallel(
opt.jobs,
Expand Down
39 changes: 36 additions & 3 deletions subcmds/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _rlimit_nofile():
from error import RepoChangedException, GitError
import platform_utils
from project import SyncBuffer
from progress import Progress
from progress import Progress, elapsed_str
from repo_trace import Trace
import ssh
from wrapper import Wrapper
Expand Down Expand Up @@ -596,7 +596,7 @@ def _FetchProjectList(self, opt, projects):
The projects we're given share the same underlying git object store, so
we have to fetch them in serial.
Delegates most of the work to _FetchHelper.
Delegates most of the work to _FetchOne.
Args:
opt: Program options returned from optparse. See _Options().
Expand All @@ -615,6 +615,8 @@ def _FetchOne(self, opt, project):
Whether the fetch was successful.
"""
start = time.time()
k = f"{project.name} @ {project.relpath}"
self._sync_dict[k] = start
success = False
remote_fetched = False
buf = io.StringIO()
Expand Down Expand Up @@ -660,15 +662,31 @@ def _FetchOne(self, opt, project):
% (project.name, type(e).__name__, str(e)),
file=sys.stderr,
)
del self._sync_dict[k]
raise

finish = time.time()
del self._sync_dict[k]
return _FetchOneResult(success, project, start, finish, remote_fetched)

@classmethod
def _FetchInitChild(cls, ssh_proxy):
cls.ssh_proxy = ssh_proxy

def _GetLongestSyncMessage(self):
if len(self._sync_dict) == 0:
return None

earliest_time = float("inf")
earliest_proj = None
for project, t in self._sync_dict.items():
if t < earliest_time:
earliest_time = t
earliest_proj = project

elapsed = time.time() - earliest_time
return f"{elapsed_str(elapsed)} {earliest_proj}"

def _Fetch(self, projects, opt, err_event, ssh_proxy):
ret = True

Expand All @@ -681,8 +699,22 @@ def _Fetch(self, projects, opt, err_event, ssh_proxy):
delay=False,
quiet=opt.quiet,
show_elapsed=True,
elide=True,
)

self._sync_dict = multiprocessing.Manager().dict()
sync_event = _threading.Event()

def _MonitorSyncLoop():
while True:
pm.update(inc=0, msg=self._GetLongestSyncMessage())
if sync_event.wait(timeout=1):
return

sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
sync_progress_thread.daemon = True
sync_progress_thread.start()

objdir_project_map = dict()
for project in projects:
objdir_project_map.setdefault(project.objdir, []).append(project)
Expand Down Expand Up @@ -712,7 +744,7 @@ def _ProcessResults(results_sets):
ret = False
else:
fetched.add(project.gitdir)
pm.update(msg=f"Last synced: {project.name}")
pm.update()
if not ret and opt.fail_fast:
break
return ret
Expand Down Expand Up @@ -764,6 +796,7 @@ def _ProcessResults(results_sets):
# crash.
del Sync.ssh_proxy

sync_event.set()
pm.end()
self._fetch_times.Save()

Expand Down

0 comments on commit 551285f

Please sign in to comment.