-
Notifications
You must be signed in to change notification settings - Fork 2
/
qubesvmtools.py
159 lines (151 loc) · 5.19 KB
/
qubesvmtools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# python3
import subprocess
from subprocess import Popen, DEVNULL
import re
from pathlib import Path
import os
class Vm:
def __init__(self, name):
pattern = re.compile("\\A[a-zA-Z0-9-]+\\Z")
if not pattern.match(name):
raise Exception("bad name: "+name)
self.name = name
def is_running(self):
ret = subprocess.call(["qvm-check", "--running", self.name], stdout=subprocess.DEVNULL)
if ret == 0:
return True
elif ret == 1:
return False
else:
raise Exception("unexpected return code: "+str(ret))
def private_volume_path(self):
return Path("/var/lib/qubes/appvms/"+self.name+"/private.img")
def get_name(self):
return self.name
def instance_if_running(self):
if self.is_running():
return VmInstance(self.name)
else:
return None
def private_volume(self):
path = self.private_volume_path()
if path.is_symlink():
pat = re.compile("^/dev/([^/]+)/([^/]+)$")
location = os.readlink(str(path))
parts = pat.match(location)
if parts:
vg = parts.group(1)
lv = parts.group(2)
return LvmVolume(self, vg, lv)
else:
raise Exception("Cannot parse link to LVM volume: "+location)
elif path.is_file():
return FileVolume(self, path)
else:
raise Exception("unexpected file type")
class VmInstance:
def __init__(self, name):
self.name = name
def get_name(self):
return self.name
def attach(self, name, volume):
subprocess.check_output(["qvm-block", "--attach-file", self.name, volume.xen_path(), "-f", name])
def detach_all(self):
subprocess.check_output(["qvm-block", "--detach", self.name])
def create_command(self, command):
return ["qvm-run", "-a", "-p", self.name, command]
def popen(self, command, stdin = None, stdout = None, stderr = None):
return Popen(self.create_command(command), stdin = stdin, stdout = stdout, stderr = stderr)
def try_sync(self):
try:
self.sync()
except: # TODO: Be more specific on the exception caught
pass
def sync(self):
self.check_call("sync") # TODO: Sync in more universal way. See #46
def check_call(self, command, stdin = None, input = None, stdout = DEVNULL, stderr = DEVNULL):
if stdin == None:
stdin_type = subprocess.PIPE
elif input == None:
stdin_type = stdin
else:
raise Exception("cannot handle both stdin and input")
command_native = self.create_command(command)
with Popen(command_native, stdin = stdin_type, stdout = stdout, stderr = stderr) as proc:
if input is not None:
proc.stdin.write(input)
proc.stdin.close()
ret = proc.wait()
if ret != 0:
raise subprocess.CalledProcessError(ret, command_native)
def shutdown(self):
subprocess.check_call(["qvm-shutdown", "--wait", self.name])
class DvmInstance(VmInstance):
def __init__(self, name):
super().__init__(name)
@staticmethod
def create(color = "red"):
vm_name = subprocess.check_output(["/usr/lib/qubes/qfile-daemon-dvm", "LAUNCH", "dom0", "", color]).decode("ascii").rstrip("\n")
if vm_name == '': # This theoretically should not happen, but I've seen this to happen when low on memory
raise Exception("Unable to start DVM")
# Disable network connection. I know this is way not suitable for general-purpose library.
subprocess.check_call(["qvm-prefs", "-s", vm_name, "netvm", "none"])
return DvmInstance(vm_name)
def close(self):
subprocess.check_output(["/usr/lib/qubes/qfile-daemon-dvm", "FINISH", self.name])
class Dvm: # Syntactic sugar for DvmInstance
def __enter__(self):
self.dvm = DvmInstance.create()
return self.dvm
def __exit__(self, type, value, traceback):
self.dvm.close()
class Volume:
def __init__(self, vm):
self.vm = vm
def clone(self, mark):
raise Exception("not implemented")
def remove(self):
raise Exception("not implemented")
def xen_path(self):
raise Exception("not implemented")
class LvmVolume(Volume):
def __init__(self, vm, vg, lv):
super().__init__(vm)
self.lv = lv
self.vg = vg
def __str__(self):
return "LvmVolume("+str(self.vm)+", "+str(self.lv)+", "+str(self.vg)+")"
def clone(self, mark):
clone_lv = mark+"-"+self.lv
clone_volume = LvmVolume(self.vm, self.vg, clone_lv)
if clone_volume.exists():
clone_volume.remove()
subprocess.check_output(["sudo", "lvcreate", "-L512M", "-s", "-n", clone_lv, self.lvm_path()])
return clone_volume
def remove(self):
subprocess.check_output(["sudo", "lvremove", "-f", self.file_path()])
def exists(self):
return os.path.exists(self.file_path())
def lvm_path(self):
return self.vg+"/"+self.lv
def file_path(self):
return "/dev/"+self.lvm_path()
def xen_path(self):
return "dom0:"+self.file_path()
class FileVolume(Volume):
def __init__(self, vm, path):
super().__init__(vm)
self.path = path
def __str__(self):
return "FileVolume("+str(self.vm)+", "+str(self.path)+")"
def clone(self, mark):
if self.vm.is_running():
raise Exception("VM is running, backup is not supported for this type of VM when running")
clone_path = Path(str(self.path)+"."+mark)
#shutil.copyfile(str(self.path), str(clone_path)) <— does not seem to support sparse files, so it is slooooooow
subprocess.check_output(["cp", "--sparse=always", "--", str(self.path), str(clone_path)])
return FileVolume(self.vm, clone_path)
def remove(self):
os.remove(str(self.path))
def xen_path(self):
return "dom0:"+str(self.path)