-
-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathqvm_start.py
215 lines (183 loc) · 7.63 KB
/
qvm_start.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# -*- encoding: utf8 -*-
#
# The Qubes OS Project, http://www.qubes-os.org
#
# Copyright (C) 2017 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.
'''qvm-start - start a domain'''
import argparse
import string
import sys
import subprocess
import time
from qubesadmin.device_protocol import DeviceAssignment, UnknownDevice
import qubesadmin.exc
import qubesadmin.tools
class DriveAction(argparse.Action):
'''Action for argument parser that stores drive image path.'''
# pylint: disable=redefined-builtin,too-few-public-methods
def __init__(self,
option_strings,
dest='drive',
*,
prefix='cdrom:',
metavar='IMAGE',
required=False,
help='Attach drive'):
super().__init__(option_strings, dest,
metavar=metavar, help=help)
self.prefix = prefix
def __call__(self, parser, namespace, values, option_string=None):
# pylint: disable=redefined-outer-name
setattr(namespace, self.dest, self.prefix + values)
parser = qubesadmin.tools.QubesArgumentParser(
description='start a domain', vmname_nargs='+')
parser.add_argument('--skip-if-running',
action='store_true', default=False,
help='Do not fail if the qube is already runnning')
parser_drive = parser.add_mutually_exclusive_group()
parser_drive.add_argument('--drive', metavar='DRIVE',
help='temporarily attach specified drive as CD/DVD or hard disk (can be'
' specified with prefix "hd:" or "cdrom:", default is cdrom)')
parser_drive.add_argument('--hddisk',
action=DriveAction, dest='drive', prefix='hd:',
help='temporarily attach specified drive as hard disk')
parser_drive.add_argument('--cdrom', metavar='IMAGE',
action=DriveAction, dest='drive', prefix='cdrom:',
help='temporarily attach specified drive as CD/DVD')
parser_drive.add_argument('--install-windows-tools',
action='store_const', dest='drive', default=False,
const='cdrom:dom0:/usr/lib/qubes/qubes-windows-tools.iso',
help='temporarily attach Windows tools CDROM to the domain')
def get_drive_assignment(app, drive_str):
"""
Prepare :py:class:`qubesadmin.device_protocol.DeviceAssignment` object for a
given drive.
If running in dom0, it will also take care about creating the appropriate
loop device (if necessary). Otherwise, only existing block devices are
supported.
:param app: Qubes() instance
:param drive_str: drive argument
:return: DeviceAssignment matching *drive_str*
"""
devtype = 'cdrom'
if drive_str.startswith('cdrom:'):
devtype = 'cdrom'
drive_str = drive_str[len('cdrom:'):]
elif drive_str.startswith('hd:'):
devtype = 'disk'
drive_str = drive_str[len('hd:'):]
try:
backend_domain_name, port_id = drive_str.split(':', 1)
except ValueError:
raise ValueError("Incorrect image name: image must be in the format "
"of VMNAME:full_path, for example "
"dom0:/home/user/test.iso")
try:
backend_domain = app.domains[backend_domain_name]
except KeyError:
raise qubesadmin.exc.QubesVMNotFoundError(
'No such VM: %s', backend_domain_name)
if port_id.startswith('/'):
# it is a path - if we're running in dom0, try to call losetup to
# export the device, otherwise reject
if app.qubesd_connection_type == 'qrexec':
raise qubesadmin.exc.QubesException(
'Existing block device identifier needed when running from '
'outside of dom0 (see qvm-block)')
try:
if backend_domain.klass == 'AdminVM':
loop_name = subprocess.check_output(
['sudo', 'losetup', '-f', '--show', port_id])
loop_name = loop_name.strip()
else:
untrusted_loop_name, _ = backend_domain.run_with_args(
'losetup', '-f', '--show', port_id,
user='root')
untrusted_loop_name = untrusted_loop_name.strip()
allowed_chars = string.ascii_lowercase + string.digits + '/'
allowed_chars = allowed_chars.encode('ascii')
if not all(c in allowed_chars for c in untrusted_loop_name):
raise qubesadmin.exc.QubesException(
'Invalid loop device name received from {}'.format(
backend_domain.name))
loop_name = untrusted_loop_name
del untrusted_loop_name
except subprocess.CalledProcessError:
raise qubesadmin.exc.QubesException(
'Failed to setup loop device for %s', port_id)
assert loop_name.startswith(b'/dev/loop')
port_id = loop_name.decode().split('/')[2]
# wait for device to appear
# FIXME: convert this to waiting for event
timeout = 10
while isinstance(
backend_domain.devices['block'][port_id], UnknownDevice
):
if timeout == 0:
raise qubesadmin.exc.QubesException(
'Timeout waiting for {}:{} device to appear'.format(
backend_domain.name, port_id))
timeout -= 1
time.sleep(1)
options = {
'devtype': devtype,
'read-only': devtype == 'cdrom'
}
assignment = DeviceAssignment.new(
backend_domain=backend_domain, port_id=port_id, devclass='block',
options=options, mode="required")
return assignment
def main(args=None, app=None):
'''Main routine of :program:`qvm-start`.
:param list args: Optional arguments to override those delivered from \
command line.
'''
args = parser.parse_args(args, app=app)
exit_code = 0
for domain in args.domains:
if domain.is_running():
if args.skip_if_running:
continue
exit_code = 1
parser.print_error(
'domain {} is already running'.format(domain.name))
return exit_code
drive_assignment = None
try:
if args.drive:
drive_assignment = get_drive_assignment(args.app, args.drive)
try:
domain.devices['block'].assign(drive_assignment)
except Exception:
drive_assignment = None
raise
domain.start()
if drive_assignment:
# don't reconnect this device after VM reboot
domain.devices['block'].unassign(drive_assignment)
except (IOError, OSError, qubesadmin.exc.QubesException,
ValueError) as e:
if drive_assignment:
try:
domain.devices['block'].detach(drive_assignment)
except qubesadmin.exc.QubesException:
pass
exit_code = 1
parser.print_error(str(e))
return exit_code
if __name__ == '__main__':
sys.exit(main())