Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve qvm-check mechanism on invalid domains #297

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions qubesadmin/tests/tools/qvm_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,20 @@ def test_013_networked_multi(self):
self.assertEqual(logger.output,
['INFO:qvm-check:some-vm2: networked'])
self.assertAllCalled()

def test_014_does_not_exist(self):
self.app.expected_calls[
('dom0', 'admin.vm.List', None, None)] = \
b'0\x00some-vm class=AppVM state=Running\n'
with self.assertLogs() as logger:
self.assertEqual(
qubesadmin.tools.qvm_check.main(['invalid-vm'], app=self.app), 1)
self.assertEqual(logger.output,
['WARNING:qvm-check:invalid-vm: non-existent!'])

def test_15_custom_argparse_error_handling(self):
self.app.expected_calls[
('dom0', 'admin.vm.List', None, None)] = \
b'0\x00some-vm class=AppVM state=Running\n'
with self.assertRaises(SystemExit):
qubesadmin.tools.qvm_check.main(['--invalid-option'], app=self.app)
56 changes: 52 additions & 4 deletions qubesadmin/tools/qvm_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,56 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
""" Exits sucessfull if the provided domains exists, else returns failure """
""" Exits sucessfull if the provided domain(s) exist, else returns failure """

import sys

import qubesadmin.tools
import qubesadmin.vm

parser = qubesadmin.tools.QubesArgumentParser(description=__doc__,
vmname_nargs='+')
class QvmCheckVmNameAction(qubesadmin.tools.VmNameAction):
""" Action for parsing one or multiple valid/invalid VMNAMEs """

def __init__(self, option_strings, nargs=1, dest='vmnames', help=None,
**kwargs):
# pylint: disable=redefined-builtin
super().__init__(option_strings, dest=dest, help=help,
nargs=nargs, **kwargs)

def parse_qubes_app(self, parent_parser, namespace):
# pylint: disable=arguments-renamed
assert hasattr(namespace, 'app')
setattr(namespace, 'domains', [])
setattr(namespace, 'invalid_domains', [])
app = namespace.app
if hasattr(namespace, 'all_domains') and namespace.all_domains:
namespace.domains = [
vm
for vm in app.domains
if not vm.klass == 'AdminVM' and
vm.name not in namespace.exclude
]
else:
if hasattr(namespace, 'exclude') and namespace.exclude:
parent_parser.error('--exclude can only be used with --all')

Check warning on line 54 in qubesadmin/tools/qvm_check.py

View check run for this annotation

Codecov / codecov/patch

qubesadmin/tools/qvm_check.py#L54

Added line #L54 was not covered by tests

for vm_name in getattr(namespace, self.dest):
try:
namespace.domains += [app.domains[vm_name]]
except KeyError:
namespace.invalid_domains += [vm_name]


class QvmCheckArgumentParser(qubesadmin.tools.QubesArgumentParser):
""" Extended argument parser for qvm-check to collect invalid domains """
def __init__(self, description):
super().__init__(description=description, vmname_nargs=None)
vm_name_group = qubesadmin.tools.VmNameGroup(
self, required='+', vm_action=QvmCheckVmNameAction)
self._mutually_exclusive_groups.append(vm_name_group)


parser = QvmCheckArgumentParser(description=__doc__)
parser.add_argument("--running", action="store_true", dest="running",
default=False,
help="Determine if (any of given) VM is running")
Expand All @@ -44,7 +85,7 @@


def print_msg(log, domains, status):
"""Print message in appropriate form about given domain(s)"""
"""Print message in appropriate form about given valid domain(s)"""
if not domains:
log.info("None of qubes: {!s}".format(', '.join(status)))
else:
Expand Down Expand Up @@ -74,6 +115,7 @@
"""Main function of qvm-check tool"""
args = parser.parse_args(args, app=app)
domains = args.domains
invalid_domains = set(args.invalid_domains)
return_code = 0

log = args.app.log
Expand Down Expand Up @@ -103,6 +145,12 @@
elif args.verbose:
print_msg(log, domains, ["exists"])

if invalid_domains:
if args.verbose:
for vm in invalid_domains:
log.warning("{!s}: {!s}".format(vm, 'non-existent!'))
return_code = 1

return return_code


Expand Down