-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathutils.py
226 lines (188 loc) · 7.26 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import os
import base64
import pexpect
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
BLOCK_SIZE = 32 # Bytes
master_password_regex = 'Enter the password for [a-zA-Z0-9._%+-]+\@[a-zA-Z0-9-]+\.[a-zA-z]{2,4} at ' \
'[a-zA-Z0-9-.]+\.1password+\.[a-zA-z]{2,4}'
def read_bash_return(cmd, single=True):
process = os.popen(cmd)
preprocessed = process.read()
process.close()
if single:
return str(preprocessed.split("\n")[0])
else:
return str(preprocessed)
def docker_check():
f = None
user_home = os.environ.get('HOME')
for rcfile in ['.bashrc', '.bash_profile', '.zshrc', '.zprofile']:
rcpath = os.path.join(user_home, rcfile)
if os.path.exists(rcpath):
f = open(os.path.join(user_home, rcpath), "r")
break
if not f:
raise Exception("No sehll rc or profile files exist.")
bash_profile = f.read()
try:
docker_flag = bash_profile.split('DOCKER_FLAG="')[1][0]
if docker_flag == "t":
return True
else:
return False
except IndexError:
return False
def domain_from_email(address):
"""
Method to extract a domain without sld or tld from an email address
:param address: email address to extract from
:type address: str
:return: domain (str)
"""
return address.split("@")[1].split(".")[0]
def get_session_key(process_resp_before):
new_line_response = [x for x in process_resp_before.decode("utf-8").split("\n") if "\r" not in x]
if len(new_line_response) != 1:
raise IndexError("Session keys not parsed correctly from response: {}.".format(process_resp_before))
else:
return new_line_response[0]
def _spawn_signin(command, m_password) -> str | bool:
if command != "":
p = pexpect.spawn(command)
index = p.expect(master_password_regex)
if index == 0:
p.sendline(m_password)
index = p.expect([pexpect.EOF, "\(401\) Unauthorized"])
if index == 0:
sess_key = get_session_key(p.before)
p.close()
return sess_key
elif index == 1:
print("Input master password is not correct. Please try again")
return False
else:
raise IOError("Onepassword command not valid")
else:
raise IOError("Spawn command not valid")
class BashProfile:
def __init__(self):
f = None
user_home = os.environ.get('HOME')
for rcfile in ['.bashrc', '.bash_profile', '.zshrc', '.zprofile']:
rcpath = os.path.join(user_home, rcfile)
if os.path.exists(rcpath):
f = open(os.path.join(user_home, rcpath), "r")
break
if not f:
raise Exception("No shell rc or profile files exist.")
self.other_profile_flag = False
if docker_check():
f2 = None
try:
f2 = open(os.path.join(user_home, ".profile"), "r")
except IOError:
print("Profile file does not exist.")
self.other_profile = f2.read()
self.other_profile_filename = f2.name
self.other_profile_flag = True
self.other_profile_lines = f2.readlines()
f2.close()
self.profile_lines = f.readlines()
self.profile = f.read()
self.profile_filename = f.name
f.close()
def get_key_value(self, key, fuzzy=False):
key_lines = self.get_key_line(key, fuzzy=fuzzy)
if key_lines:
key_values = []
for ky in key_lines:
k = ky.split("=")[0].split(" ")[1]
v = ky.split("=")[1]
key_values.append({k: v})
return key_values
else:
raise ValueError("Environment variable does not exist.")
def get_key_line(self, key, fuzzy=False):
key_line = None
if self.other_profile_flag:
prof = [self.other_profile_lines, self.profile_lines]
else:
prof = [self.profile_lines]
key_lines = []
for prof_lines in prof:
if len(prof_lines) > 0:
for p in prof_lines:
if (~fuzzy) & (" {}=".format(key) in p):
key_line = p
elif fuzzy & (key in p):
key_line = p
key_lines.append(key_line.replace("\n", ""))
return key_lines
def write_profile(self, updated_lines):
if self.other_profile_flag:
prof_name = [self.other_profile_filename, self.profile_filename]
else:
prof_name = [self.profile_filename]
for p in prof_name:
with open(p, 'w') as writer:
writer.writelines(updated_lines)
writer.close()
def update_profile(self, key, value):
for lines in self.profile_lines:
if key in lines:
self.profile_lines.remove(lines)
if isinstance(value, str):
new_line = 'export {}="{}"\n'.format(key, value)
self.profile_lines.append(new_line)
self.write_profile(self.profile_lines)
class Encryption:
def __init__(self, secret_key):
if isinstance(secret_key, str):
self.secret_key = str.encode(secret_key)[0:BLOCK_SIZE]
else:
self.secret_key = secret_key[0:BLOCK_SIZE]
self.cipher = AES.new(self.secret_key, AES.MODE_ECB)
def decode(self, encoded):
return self.cipher.decrypt(base64.b64decode(encoded)).decode('UTF-8').replace("\x1f", "")
def encode(self, input_str):
return base64.b64encode(self.cipher.encrypt(pad(input_str, BLOCK_SIZE)))
def bump_version(version_type="patch"):
"""
Only run in the project root directory, this is for github to bump the version file only!
:return:
"""
__root__ = os.path.abspath("")
with open(os.path.join(__root__, 'VERSION')) as version_file:
version = version_file.read().strip()
all_version = version.replace('"', "").split(".")
if version_type == "patch":
new_all_version = version.split(".")[:-1]
new_all_version.append(str(int(all_version[-1]) + 1))
elif version_type == "minor":
new_all_version = [version.split(".")[0]]
new_all_version.extend([str(int(all_version[1]) + 1), '0'])
new_line = '.'.join(new_all_version) + "\n"
with open("{}/VERSION".format(__root__), "w") as fp:
fp.write(new_line)
fp.close()
def generate_uuid():
"""
Generates a random UUID to be used for op in initial set up only for more details read here
https://1password.community/discussion/114059/device-uuid
:return: (str)
"""
return read_bash_return("head -c 16 /dev/urandom | base32 | tr -d = | tr '[:upper:]' '[:lower:]'")
def get_device_uuid(bp):
"""
Attempts to get the device_uuid from the given BashProfile. If the device_uuid is not
set in the BashProfile generates a new device_uuid and sets it in the given
BashProfile.
:return: (str)
"""
try:
device_uuid = bp.get_key_value("OP_DEVICE")[0]['OP_DEVICE'].strip('"')
except (AttributeError, ValueError):
device_uuid = generate_uuid()
bp.update_profile("OP_DEVICE", device_uuid)
return device_uuid