Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing all string formatting with f-strings. #3536

8 changes: 4 additions & 4 deletions sos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ def __init__(self, args):
_com_subparser = self.subparsers.add_parser(
comp,
aliases=self._components[comp][1],
prog="sos %s" % comp
prog=f"sos {comp}"
)
_com_subparser.usage = "sos %s [options]" % comp
_com_subparser.usage = f"sos {comp} [options]"
_com_subparser.register('action', 'extend', SosListOption)
self._add_common_options(_com_subparser)
self._components[comp][0].add_parser_options(parser=_com_subparser)
Expand Down Expand Up @@ -174,15 +174,15 @@ def _init_component(self):
"""
_com = self.args.component
if _com not in self._components.keys():
print("Unknown subcommand '%s' specified" % _com)
print(f"Unknown subcommand '{_com}' specified")
try:
_to_load = self._components[_com][0]
if _to_load.root_required and not os.getuid() == 0:
raise Exception("Component must be run with root privileges")
self._component = _to_load(self.parser, self.args, self.cmdline)

except Exception as err:
print("Could not initialize '%s': %s" % (_com, err))
print(f"Could not initialize '{_com}': {err}")
if self.args.debug:
raise err
sys.exit(1)
Expand Down
120 changes: 70 additions & 50 deletions sos/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def archive_type(cls):
_path_lock = Lock()

def _format_msg(self, msg):
return "[archive:%s] %s" % (self.archive_type(), msg)
return f"[archive:{self.archive_type()}] {msg}"

def set_debug(self, debug):
self._debug = debug
Expand Down Expand Up @@ -148,8 +148,9 @@ def __init__(self, name, tmpdir, policy, threads, enc_opts, sysroot,
self._archive_root = os.path.join(tmpdir, name)
with self._path_lock:
os.makedirs(self._archive_root, 0o700)
self.log_info("initialised empty FileCacheArchive at '%s'" %
(self._archive_root,))
self.log_info(
f"initialised empty FileCacheArchive at '{self._archive_root}'"
)

def dest_path(self, name):
if os.path.isabs(name):
Expand Down Expand Up @@ -184,7 +185,7 @@ def _make_leading_paths(self, src, mode=0o700):
or more symbolic links in intermediate components
of the path have altered the path destination.
"""
self.log_debug("Making leading paths for %s" % src)
self.log_debug(f"Making leading paths for {src}")
root = self._archive_root
dest = src

Expand Down Expand Up @@ -224,7 +225,7 @@ def in_archive(path):
src_path = os.path.join(src_path, comp)

if not os.path.exists(abs_path):
self.log_debug("Making path %s" % abs_path)
self.log_debug(f"Making path {abs_path}")
if os.path.islink(src_path) and os.path.isdir(src_path):
target = os.readlink(src_path)

Expand All @@ -245,11 +246,12 @@ def in_archive(path):
if os.path.isabs(target):
target = os.path.relpath(target, target_dir)

self.log_debug("Making symlink '%s' -> '%s'" %
(abs_path, target))
self.log_debug(
f"Making symlink '{abs_path}' -> '{target}'"
)
os.symlink(target, abs_path)
else:
self.log_debug("Making directory %s" % abs_path)
self.log_debug(f"Making directory {abs_path}")
os.mkdir(abs_path, mode)
dest = src_path

Expand Down Expand Up @@ -296,9 +298,10 @@ def check_path(self, src, path_type, dest=None, force=False):

# Check containing directory presence and path type
if os.path.exists(dest_dir) and not os.path.isdir(dest_dir):
raise ValueError("path '%s' exists and is not a directory" %
dest_dir)
elif not os.path.exists(dest_dir):
raise ValueError(
f"path '{dest_dir}' exists and is not a directory"
)
if not os.path.exists(dest_dir):
src_dir = src if path_type == P_DIR else os.path.split(src)[0]
self._make_leading_paths(src_dir)

Expand Down Expand Up @@ -341,8 +344,7 @@ def _copy_attributes(self, src, dest):
shutil.copystat(src, dest)
os.chown(dest, stat.st_uid, stat.st_gid)
except Exception as e:
self.log_debug("caught '%s' setting attributes of '%s'"
% (e, dest))
self.log_debug(f"caught '{e}' setting attributes of '{dest}'")

def add_file(self, src, dest=None, force=False):
with self._path_lock:
Expand All @@ -364,10 +366,10 @@ def add_file(self, src, dest=None, force=False):
if src.startswith("/sys/") or src.startswith("/proc/"):
pass
else:
self.log_info("File %s not collected: '%s'" % (src, e))
self.log_info(f"File {src} not collected: '{e}'")

self._copy_attributes(src, dest)
file_name = "'%s'" % src
file_name = f"'{src}'"
else:
# Open file case: first rewind the file to obtain
# everything written to it.
Expand All @@ -377,8 +379,9 @@ def add_file(self, src, dest=None, force=False):
f.write(line)
file_name = "open file"

self.log_debug("added %s to FileCacheArchive '%s'" %
(file_name, self._archive_root))
self.log_debug(
f"added {file_name} to FileCacheArchive '{self._archive_root}'"
)

def add_string(self, content, dest, mode='w'):
with self._path_lock:
Expand All @@ -396,8 +399,10 @@ def add_string(self, content, dest, mode='w'):
f.write(content)
if os.path.exists(src):
self._copy_attributes(src, dest)
self.log_debug("added string at '%s' to FileCacheArchive '%s'"
% (src, self._archive_root))
self.log_debug(
f"added string at '{src}' to FileCacheArchive"
f" '{self._archive_root}'"
)

def add_binary(self, content, dest):
with self._path_lock:
Expand All @@ -407,26 +412,32 @@ def add_binary(self, content, dest):

with codecs.open(dest, 'wb', encoding=None) as f:
f.write(content)
self.log_debug("added binary content at '%s' to archive '%s'"
% (dest, self._archive_root))
self.log_debug(
f"added binary content at '{dest}' to archive"
f" '{self._archive_root}'"
)

def add_link(self, source, link_name):
self.log_debug("adding symlink at '%s' -> '%s'" % (link_name, source))
self.log_debug(f"adding symlink at '{link_name}' -> '{source}'")
with self._path_lock:
dest = self.check_path(link_name, P_LINK)
if not dest:
return

if not os.path.lexists(dest):
os.symlink(source, dest)
self.log_debug("added symlink at '%s' to '%s' in archive '%s'"
% (dest, source, self._archive_root))
self.log_debug(
f"added symlink at '{dest}' to '{source}' in archive"
f" '{self._archive_root}'"
)

# Follow-up must be outside the path lock: we recurse into
# other monitor methods that will attempt to reacquire it.

self.log_debug("Link follow up: source=%s link_name=%s dest=%s" %
(source, link_name, dest))
self.log_debug(
"Link follow up:"
f" source={source} link_name={link_name} dest={dest}"
)

source_dir = os.path.dirname(link_name)
host_path_name = os.path.realpath(os.path.join(source_dir, source))
Expand Down Expand Up @@ -465,21 +476,24 @@ def is_loop(link_name, source):
source = os.path.join(dest_dir, os.readlink(host_path_name))
source = os.path.relpath(source, dest_dir)
if is_loop(link_name, source):
self.log_debug("Link '%s' - '%s' loops: skipping..." %
(link_name, source))
self.log_debug(
f"Link '{link_name}' - '{source}' loops: skipping..."
)
return
self.log_debug("Adding link %s -> %s for link follow up" %
(link_name, source))
self.log_debug(
f"Adding link {link_name} -> {source} for link follow up"
)
self.add_link(source, link_name)
elif os.path.isdir(host_path_name):
self.log_debug("Adding dir %s for link follow up" % source)
self.log_debug(f"Adding dir {source} for link follow up")
self.add_dir(host_path_name)
elif os.path.isfile(host_path_name):
self.log_debug("Adding file %s for link follow up" % source)
self.log_debug(f"Adding file {source} for link follow up")
self.add_file(host_path_name)
else:
self.log_debug("No link follow up: source=%s link_name=%s" %
(source, link_name))
self.log_debug(
f"No link follow up: source={source} link_name={link_name}"
)

def add_dir(self, path):
"""Create a directory in the archive.
Expand All @@ -501,7 +515,7 @@ def add_node(self, path, mode, device):
except OSError as e:
if e.errno == errno.EPERM:
msg = "Operation not permitted"
self.log_info("add_node: %s - mknod '%s'" % (msg, dest))
self.log_info(f"add_node: {msg} - mknod '{dest}'")
return
raise e
self._copy_attributes(path, dest)
Expand All @@ -525,8 +539,10 @@ def makedirs(self, path, mode=0o700):
Used by sos.sosreport to set up sos_* directories.
"""
os.makedirs(os.path.join(self._archive_root, path), mode=mode)
self.log_debug("created directory at '%s' in FileCacheArchive '%s'"
% (path, self._archive_root))
self.log_debug(
f"created directory at '{path}' in FileCacheArchive"
f" '{self._archive_root}'"
)

def open_file(self, path):
path = self.dest_path(path)
Expand Down Expand Up @@ -597,13 +613,14 @@ def do_file_sub(self, path, regexp, subst):
return replacements

def finalize(self, method):
self.log_info("finalizing archive '%s' using method '%s'"
% (self._archive_root, method))
self.log_info(
f"finalizing archive '{self._archive_root}' using method"
f" '{method}'"
)
try:
res = self._build_archive(method)
except Exception as err:
self.log_error("An error occurred compressing the archive: %s"
% err)
self.log_error(f"An error occurred compressing the archive: {err}")
return self.name()

self.cleanup()
Expand All @@ -615,7 +632,7 @@ def finalize(self, method):
return self._encrypt(res)
except Exception as e:
exp_msg = "An error occurred encrypting the archive:"
self.log_error("%s %s" % (exp_msg, e))
self.log_error(f"{exp_msg} {e}")
return res
else:
return res
Expand All @@ -634,12 +651,12 @@ def _encrypt(self, archive):
"""
arc_name = archive.replace("sosreport-", "secured-sosreport-")
arc_name += ".gpg"
enc_cmd = "gpg --batch -o %s " % arc_name
enc_cmd = f"gpg --batch -o {arc_name} "
env = None
if self.enc_opts["key"]:
# need to assume a trusted key here to be able to encrypt the
# archive non-interactively
enc_cmd += "--trust-model always -e -r %s " % self.enc_opts["key"]
enc_cmd += f'--trust-model always -e -r {self.enc_opts["key"]} '
enc_cmd += archive
if self.enc_opts["password"]:
# prevent change of gpg options using a long password, but also
Expand All @@ -660,7 +677,7 @@ def _encrypt(self, archive):
else:
# TODO: report the actual error from gpg. Currently, we cannot as
# sos_get_command_output() does not capture stderr
msg = "gpg exited with code %s" % r["status"]
msg = f'gpg exited with code {r["status"]}'
raise Exception(msg)


Expand Down Expand Up @@ -718,7 +735,7 @@ def get_selinux_context(self, path):
return None

def name(self):
return "%s.%s" % (self._archive_root, self._suffix)
return f"{self._archive_root}.{self._suffix}"

def name_max(self):
# GNU Tar format supports unlimited file name length. Just return
Expand All @@ -729,15 +746,18 @@ def _build_archive(self, method):
if method == 'auto':
method = 'xz' if find_spec('lzma') is not None else 'gzip'
_comp_mode = method.strip('ip')
self._archive_name = self._archive_name + ".%s" % _comp_mode
self._archive_name = f"{self._archive_name}.{_comp_mode}"
# tarfile does not currently have a consistent way to define comnpress
# level for both xz and gzip ('preset' for xz, 'compresslevel' for gz)
if method == 'gzip':
kwargs = {'compresslevel': 6}
else:
kwargs = {'preset': 3}
tar = tarfile.open(self._archive_name, mode="w:%s" % _comp_mode,
**kwargs)
tar = tarfile.open(
self._archive_name,
mode=f"w:{_comp_mode}",
**kwargs
)
# add commonly reviewed files first, so that they can be more easily
# read from memory without needing to extract the whole archive
for _content in ['version.txt', 'sos_reports', 'sos_logs']:
Expand All @@ -752,7 +772,7 @@ def _build_archive(self, method):
tar.add(self._archive_root, arcname=self._name,
filter=self.copy_permissions_filter)
tar.close()
self._suffix += ".%s" % _comp_mode
self._suffix += f".{_comp_mode}"
return self.name()


Expand Down
Loading
Loading