Skip to content

Commit

Permalink
Merge branch 'development' into skip_matching_plot_process_parents
Browse files Browse the repository at this point in the history
  • Loading branch information
altendky authored May 15, 2021
2 parents e2607ca + 8d3f132 commit fa33c9d
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/plotman/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


"""Plotman module launcher.
This is a shim that allows you to run plotman via
This is a shim that allows you to run plotman via
python3 -m plotman
"""
plotman.main()
2 changes: 1 addition & 1 deletion src/plotman/_tests/manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_dstdirs_to_furthest_phase():
{ '/plots1' : (1, 5),
'/plots2' : (3, 1),
'/plots3' : (4, 1) } )


def test_dstdirs_to_youngest_phase():
all_jobs = [ job_w_dstdir_phase('/plots1', (1, 5)),
Expand Down
22 changes: 11 additions & 11 deletions src/plotman/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
# TODO : write-protect and delete-protect archived plots

def spawn_archive_process(dir_cfg, all_jobs):
'''Spawns a new archive process using the command created
'''Spawns a new archive process using the command created
in the archive() function. Returns archiving status and a log message to print.'''

log_message = None
archiving_status = None

# Look for running archive jobs. Be robust to finding more than one
# even though the scheduler should only run one at a time.
arch_jobs = get_running_archive_jobs(dir_cfg.archive)

if not arch_jobs:
(should_start, status_or_cmd) = archive(dir_cfg, all_jobs)
if not should_start:
Expand All @@ -38,7 +38,7 @@ def spawn_archive_process(dir_cfg, all_jobs):
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
start_new_session=True)
start_new_session=True)
log_message = 'Starting archive: ' + cmd
# At least for now it seems that even if we get a new running
# archive jobs list it doesn't contain the new rsync process.
Expand All @@ -52,7 +52,7 @@ def spawn_archive_process(dir_cfg, all_jobs):
archiving_status = 'pid: ' + ', '.join(map(str, arch_jobs))

return archiving_status, log_message

def compute_priority(phase, gb_free, n_plots):
# All these values are designed around dst buffer dirs of about
# ~2TB size and containing k32 plots. TODO: Generalize, and
Expand All @@ -72,7 +72,7 @@ def compute_priority(phase, gb_free, n_plots):
priority -= 16
elif (phase >= job.Phase(3, 7)):
priority -= 32

# If a drive is getting full, we should prioritize it
if (gb_free < 1000):
priority += 1 + int((1000 - gb_free) / 100)
Expand Down Expand Up @@ -125,7 +125,7 @@ def get_running_archive_jobs(arch_cfg):

def archive(dir_cfg, all_jobs):
'''Configure one archive job. Needs to know all jobs so it can avoid IO
contention on the plotting dstdir drives. Returns either (False, <reason>)
contention on the plotting dstdir drives. Returns either (False, <reason>)
if we should not execute an archive job or (True, <cmd>) with the archive
command if we should.'''
if dir_cfg.archive is None:
Expand All @@ -140,7 +140,7 @@ def archive(dir_cfg, all_jobs):
dir_plots = plot_util.list_k32_plots(d)
gb_free = plot_util.df_b(d) / plot_util.GB
n_plots = len(dir_plots)
priority = compute_priority(ph, gb_free, n_plots)
priority = compute_priority(ph, gb_free, n_plots)
if priority >= best_priority and dir_plots:
best_priority = priority
chosen_plot = dir_plots[0]
Expand All @@ -157,17 +157,17 @@ def archive(dir_cfg, all_jobs):
archdir_freebytes = get_archdir_freebytes(dir_cfg.archive)
if not archdir_freebytes:
return(False, 'No free archive dirs found.')

archdir = ''
available = [(d, space) for (d, space) in archdir_freebytes.items() if
available = [(d, space) for (d, space) in archdir_freebytes.items() if
space > 1.2 * plot_util.get_k32_plotsize()]
if len(available) > 0:
index = min(dir_cfg.archive.index, len(available) - 1)
(archdir, freespace) = sorted(available)[index]

if not archdir:
return(False, 'No archive directories found with enough free space')

msg = 'Found %s with ~%d GB free' % (archdir, freespace / plot_util.GB)

bwlimit = dir_cfg.archive.rsyncd_bwlimit
Expand Down
10 changes: 5 additions & 5 deletions src/plotman/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def curses_main(stdscr):
if last_refresh is None:
do_full_refresh = True
else:
elapsed = (datetime.datetime.now() - last_refresh).total_seconds()
elapsed = (datetime.datetime.now() - last_refresh).total_seconds()
do_full_refresh = elapsed >= cfg.scheduling.polling_time_s

if not do_full_refresh:
Expand Down Expand Up @@ -189,7 +189,7 @@ def curses_main(stdscr):
#
# Layout
#

tmp_h = len(tmp_report.splitlines())
tmp_w = len(max(tmp_report.splitlines(), key=len)) + 1
dst_h = len(dst_report.splitlines())
Expand Down Expand Up @@ -236,7 +236,7 @@ def curses_main(stdscr):
header_win.addnstr(' <A>rchival: ', linecap, curses.A_BOLD)
header_win.addnstr(
archiving_status_msg(archiving_configured,
archiving_active, archiving_status), linecap)
archiving_active, archiving_status), linecap)

# Oneliner progress display
header_win.addnstr(1, 0, 'Jobs (%d): ' % len(jobs), linecap)
Expand All @@ -255,10 +255,10 @@ def curses_main(stdscr):
header_win.addnstr(' archive=', linecap, curses.A_BOLD)
header_win.addnstr(arch_prefix, linecap)
header_win.addnstr(' (remote)', linecap)


# Jobs
jobs_win.addstr(0, 0, reporting.status_report(jobs, n_cols, jobs_h,
jobs_win.addstr(0, 0, reporting.status_report(jobs, n_cols, jobs_h,
tmp_prefix, dst_prefix))
jobs_win.chgat(0, 0, curses.A_REVERSE)

Expand Down
11 changes: 6 additions & 5 deletions src/plotman/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,12 @@ def __init__(self, proc, parsed_command, logroot):
if self.logfile:
# Initialize data that needs to be loaded from the logfile
self.init_from_logfile()
else:
print('Found plotting process PID {pid}, but could not find '
'logfile in its open files:'.format(pid = self.proc.pid))
for f in self.proc.open_files():
print(f.path)
# TODO: turn this into logging or somesuch
# else:
# print('Found plotting process PID {pid}, but could not find '
# 'logfile in its open files:'.format(pid = self.proc.pid))
# for f in self.proc.open_files():
# print(f.path)



Expand Down
4 changes: 2 additions & 2 deletions src/plotman/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg):
if phases_permit_new_job(phases, d, sched_cfg, dir_cfg) ]
rankable = [ (d, phases[0]) if phases else (d, job.Phase(known=False))
for (d, phases) in eligible ]

if not eligible:
wait_reason = 'no eligible tempdirs (%ds/%ds)' % (youngest_job_age, global_stagger)
else:
Expand All @@ -101,7 +101,7 @@ def maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg):
if d in dir_cfg.dst and ph is not None}
unused_dirs = [d for d in dir_cfg.dst if d not in dir2ph.keys()]
dstdir = ''
if unused_dirs:
if unused_dirs:
dstdir = random.choice(unused_dirs)
else:
dstdir = max(dir2ph, key=dir2ph.get)
Expand Down
2 changes: 1 addition & 1 deletion src/plotman/plot_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def list_k32_plots(d):
plots.append(plot)
except FileNotFoundError:
continue

return plots

def column_wrap(items, n_cols, filler=None):
Expand Down
10 changes: 7 additions & 3 deletions src/plotman/plotman.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def parse_args(self):
sp.add_parser('version', help='print the version')

sp.add_parser('status', help='show current plotting status')

sp.add_parser('dirs', help='show directories info')

sp.add_parser('interactive', help='run interactive control/monitoring mode')
Expand Down Expand Up @@ -163,7 +163,11 @@ def main():

# Status report
if args.cmd == 'status':
print(reporting.status_report(jobs, get_term_width()))
result = "{0}\n\n{1}".format(
reporting.status_report(jobs, get_term_width()),
reporting.summary(jobs)
)
print(result)

# Directories report
elif args.cmd == 'dirs':
Expand Down Expand Up @@ -192,7 +196,7 @@ def main():
elif args.cmd == 'dsched':
for (d, ph) in manager.dstdirs_to_furthest_phase(jobs).items():
print(' %s : %s' % (d, str(ph)))

#
# Job control commands
#
Expand Down
19 changes: 10 additions & 9 deletions src/plotman/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def abbr_path(path, putative_prefix):
return os.path.relpath(path, putative_prefix)
else:
return path

def phase_str(phase):
if not phase.known:
return '?:?'
Expand All @@ -38,12 +38,12 @@ def n_at_ph(jobs, ph):

def n_to_char(n):
n_to_char_map = dict(enumerate(" .:;!"))

if n < 0:
return 'X' # Should never be negative
elif n >= len(n_to_char_map):
n = len(n_to_char_map) - 1

return n_to_char_map[n]

def job_viz(jobs):
Expand Down Expand Up @@ -127,11 +127,12 @@ def status_report(jobs, width, height=None, tmp_prefix='', dst_prefix=''):
tab.set_max_width(width)
tab.set_deco(0) # No borders

result = tab.draw()
return tab.draw()

def summary(jobs, tmp_prefix=''):
"""Creates a small summary of running jobs"""

# Add some summarized info
summary = [
'\n',
'Total jobs: {0}'.format(len(jobs))
]

Expand All @@ -142,7 +143,7 @@ def status_report(jobs, width, height=None, tmp_prefix='', dst_prefix=''):
'Jobs in {0}: {1}'.format(key, len(list(group)))
)

return result + '\n'.join(summary)
return '\n'.join(summary)

def tmp_dir_report(jobs, dir_cfg, sched_cfg, width, start_row=None, end_row=None, prefix=''):
'''start_row, end_row let you split the table up if you want'''
Expand All @@ -163,7 +164,7 @@ def tmp_dir_report(jobs, dir_cfg, sched_cfg, width, start_row=None, end_row=None
tab.set_deco(tt.Texttable.BORDER | tt.Texttable.HEADER )
tab.set_deco(0) # No borders
return tab.draw()

def dst_dir_report(jobs, dstdirs, width, prefix=''):
tab = tt.Texttable()
dir2oldphase = manager.dstdirs_to_furthest_phase(jobs)
Expand All @@ -181,7 +182,7 @@ def dst_dir_report(jobs, dstdirs, width, prefix=''):
dir_plots = plot_util.list_k32_plots(d)
gb_free = int(plot_util.df_b(d) / plot_util.GB)
n_plots = len(dir_plots)
priority = archive.compute_priority(eldest_ph, gb_free, n_plots)
priority = archive.compute_priority(eldest_ph, gb_free, n_plots)
row = [abbr_path(d, prefix), n_plots, gb_free,
phases_str(phases, 5), priority]
tab.add_row(row)
Expand Down

0 comments on commit fa33c9d

Please sign in to comment.