-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
350 lines (295 loc) · 15.1 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
import tkinter as tk
from tkinter import messagebox, filedialog, simpledialog
import pkcs11
from pkcs11 import KeyType, Mechanism
import pkcs11.util.rsa
from cryptography import x509
from asn1crypto import csr, keys, x509 as ac_x509
from asn1crypto.keys import RSAPublicKey
from asn1crypto import pem
from pkcs11.util.x509 import decode_x509_certificate
class TokenManagerApp:
def __init__(self, root):
self.root = root
self.root.title("Token Manager")
# Label to display token status
self.token_label = tk.Label(root, text="Token Not Detected")
self.token_label.grid(row=0, column=0, columnspan=3, pady=10)
# Buttons for various operations
self.detect_token_button = tk.Button(root, text="Detect Token", command=self.detect_token, width=20)
self.detect_token_button.grid(row=1, column=0, padx=5, pady=5)
self.login_button = tk.Button(root, text="Login", command=self.login_token, state=tk.DISABLED, width=20)
self.login_button.grid(row=2, column=0, padx=5, pady=5)
self.generate_key_button = tk.Button(root, text="Generate Key", command=self.generate_key, state=tk.DISABLED, width=20)
self.generate_key_button.grid(row=1, column=1, padx=5, pady=5)
self.generate_csr_button = tk.Button(root, text="Generate CSR", command=self.generate_csr, state=tk.DISABLED, width=20)
self.generate_csr_button.grid(row=2, column=1, padx=5, pady=5)
self.import_cert_button = tk.Button(root, text="Import Certificate", command=self.import_cert, state=tk.DISABLED, width=20)
self.import_cert_button.grid(row=1, column=2, padx=5, pady=5)
self.list_objects_button = tk.Button(root, text="List Objects", command=self.list_objects, state=tk.DISABLED, width=20)
self.list_objects_button.grid(row=2, column=2, padx=5, pady=5)
# Text widget to display objects on the token
self.objects_text = tk.Text(root, width=50, height=10)
self.objects_text.grid(row=3, column=0, columnspan=3, pady=10)
# PKCS#11 session variables
self.pkcs11 = None
self.token = None
self.session = None
self.slot = None
self.pkcs11_module_path = None
def detect_token(self):
try:
if not self.pkcs11_module_path:
dll_path = filedialog.askopenfilename(title="Select PKCS#11 Module", filetypes=(("SO Files", "*.so"), ("DLL Files", "*.dll")))
if dll_path:
self.pkcs11_module_path = dll_path
self.pkcs11 = pkcs11.lib(dll_path)
# List available slots and select the first one
slots = self.pkcs11.get_slots(token_present=True)
if not slots:
messagebox.showerror("Error", "No token detected")
else:
self.token_label.config(text="Token Detected!")
self.slot = slots[0]
self.token = self.slot.get_token() # Retrieve the token from the slot
self.login_button.config(state=tk.NORMAL)
except pkcs11.PKCS11Error as e:
messagebox.showerror("Error", f"Failed to detect token: {str(e)}")
def login_token(self):
try:
pin = simpledialog.askstring("Login", "Enter your PIN:", show='*')
if pin:
# Open the token session in read/write mode and pass the user pin
self.session = self.token.open(rw=True, user_pin=pin)
messagebox.showinfo("Success", "Login Successful!")
self.generate_key_button.config(state=tk.NORMAL)
self.generate_csr_button.config(state=tk.NORMAL)
self.import_cert_button.config(state=tk.NORMAL)
self.list_objects_button.config(state=tk.NORMAL)
else:
messagebox.showerror("Error", "PIN not entered")
except pkcs11.PKCS11Error as e:
messagebox.showerror("Error", f"Failed to login: {str(e)}")
def generate_key(self):
try:
# Get key label from the user
key_label = simpledialog.askstring("Key Generation", "Enter key label:", initialvalue="MyPersistentRSAKey")
if not key_label:
messagebox.showerror("Error", "Key label not provided")
return
# Generate RSA key pair (public and private)
pub_key, priv_key = self.session.generate_keypair(
KeyType.RSA,
2048, # Key size in bits
mechanism=Mechanism.RSA_PKCS_KEY_PAIR_GEN, # Mechanism used for key pair generation
public_template={
pkcs11.Attribute.LABEL: key_label,
pkcs11.Attribute.TOKEN: True,
pkcs11.Attribute.PUBLIC_EXPONENT: (0x01, 0x00, 0x01), # e = 65537
},
private_template={
pkcs11.Attribute.LABEL: key_label,
pkcs11.Attribute.TOKEN: True,
pkcs11.Attribute.SENSITIVE: True,
pkcs11.Attribute.PRIVATE: True,
}
)
messagebox.showinfo("Success", f"RSA Key Pair Generated!\nLabel: {key_label}")
except pkcs11.PKCS11Error as e:
messagebox.showerror("Error", f"Failed to generate key: {str(e)}")
def generate_csr(self):
try:
# Locate the private key on the token
priv_key = self.session.get_key(pkcs11.ObjectClass.PRIVATE_KEY)
if not priv_key:
messagebox.showerror("Error", "No private key found on token.")
return
# Retrieve the public key from the token
pub_key = self.session.get_key(pkcs11.ObjectClass.PUBLIC_KEY)
if not pub_key:
messagebox.showerror("Error", "No public key found on token.")
return
# Encode the public key into DER format
pub_key_der = pkcs11.util.rsa.encode_rsa_public_key(pub_key)
# Parse the DER-encoded RSA public key to extract modulus and exponent
rsa_pub_key = RSAPublicKey.load(pub_key_der)
# Construct the PublicKeyInfo structure
public_key_info = keys.PublicKeyInfo({
'algorithm': {
'algorithm': 'rsa',
'parameters': None
},
'public_key': rsa_pub_key
})
# Get subject details from the user
country_name = simpledialog.askstring("CSR Subject", "Enter Country Name (e.g., US):", initialvalue="US")
state_name = simpledialog.askstring("CSR Subject", "Enter State or Province Name (e.g., California):", initialvalue="California")
locality_name = simpledialog.askstring("CSR Subject", "Enter Locality Name (e.g., San Francisco):", initialvalue="San Francisco")
organization_name = simpledialog.askstring("CSR Subject", "Enter Organization Name (e.g., My Company):", initialvalue="My Company")
common_name = simpledialog.askstring("CSR Subject", "Enter Common Name (e.g., example.com):", initialvalue="example.com")
if not (country_name and state_name and locality_name and organization_name and common_name):
messagebox.showerror("Error", "Incomplete subject details")
return
# Define subject for the CSR
subject = ac_x509.Name.build({
'country_name': country_name,
'state_or_province_name': state_name,
'locality_name': locality_name,
'organization_name': organization_name,
'common_name': common_name
})
# Build the CSR using asn1crypto
cert_request = csr.CertificationRequest({
'certification_request_info': {
'version': 'v1',
'subject': subject,
'subject_pk_info': public_key_info,
'attributes': [],
},
'signature_algorithm': {
'algorithm': 'sha256_rsa',
},
'signature': b'\x00' # Placeholder for the signature
})
# Get the CertificationRequestInfo for signing
to_be_signed = cert_request['certification_request_info'].dump()
# Sign the CSR using the private key on the token
signature = priv_key.sign(
to_be_signed,
mechanism=Mechanism.SHA256_RSA_PKCS
)
# Add the signature to the CSR
cert_request['signature'] = signature
# Encode the CSR in PEM format
csr_pem = pem.armor("CERTIFICATE REQUEST", cert_request.dump())
# Save the CSR to a file in PEM format
file_path = filedialog.asksaveasfilename(defaultextension=".pem", filetypes=(("PEM Files", "*.pem"),))
if file_path:
with open(file_path, 'w') as f:
f.write(csr_pem.decode('utf-8'))
messagebox.showinfo("Success", "CSR Generated and saved!")
except pkcs11.PKCS11Error as e:
messagebox.showerror("Error", f"Failed to generate CSR: {str(e)}")
def import_cert(self):
cert_file = filedialog.askopenfilename(title="Select Certificate", filetypes=(("PEM Files", "*.pem"),))
if cert_file:
with open(cert_file, 'rb') as f:
cert_data = f.read()
# Check if the certificate is PEM-encoded and convert it to DER
if pem.detect(cert_data):
_, _, cert_der = pem.unarmor(cert_data) # Convert PEM to DER
else:
cert_der = cert_data # Already in DER format
# Prompt user for the label
cert_label = simpledialog.askstring("Certificate Label", "Enter a label for the certificate:")
if not cert_label:
messagebox.showerror("Error", "No label entered. Import canceled.")
return
# Decode the certificate using python-pkcs11 utility function
cert_obj = decode_x509_certificate(cert_der)
cert_obj.update({
pkcs11.Attribute.LABEL: cert_label, # Use the user-provided label
pkcs11.Attribute.CLASS: pkcs11.ObjectClass.CERTIFICATE,
pkcs11.Attribute.CERTIFICATE_TYPE: pkcs11.CertificateType.X_509,
pkcs11.Attribute.TOKEN: True
})
# Create certificate object on the token
created_cert = self.session.create_object(cert_obj)
if created_cert:
messagebox.showinfo("Success", f"Certificate Imported to Token with label '{cert_label}'!")
else:
messagebox.showerror("Error", "Failed to store certificate in token.")
from asn1crypto import x509 as ac_x509
from asn1crypto import x509 as ac_x509
def format_object_details(self, obj):
"""Format details of a PKCS#11 object for display."""
try:
obj_class = obj[pkcs11.Attribute.CLASS]
label = obj[pkcs11.Attribute.LABEL]
obj_type = "Unknown"
if obj_class == pkcs11.ObjectClass.PUBLIC_KEY:
obj_type = "Public Key Object"
key_type = obj[pkcs11.Attribute.KEY_TYPE]
bits = obj[pkcs11.Attribute.MODULUS_BITS] if key_type == KeyType.RSA else 'N/A'
usage_flags = []
# Check each usage flag
try:
if obj[pkcs11.Attribute.ENCRYPT]:
usage_flags.append("encrypt")
except KeyError:
pass
try:
if obj[pkcs11.Attribute.WRAP]:
usage_flags.append("wrap")
except KeyError:
pass
try:
if obj[pkcs11.Attribute.VERIFY]:
usage_flags.append("verify")
except KeyError:
pass
usage = ', '.join(usage_flags)
access = 'local' if obj[pkcs11.Attribute.TOKEN] else 'session'
return (f"{obj_type}; Algorithm: {key_type.name}, Bits: {bits}\n"
f" label: {label}\n"
f" Usage: {usage}\n"
f" Access: {access}")
elif obj_class == pkcs11.ObjectClass.PRIVATE_KEY:
obj_type = "Private Key Object"
key_type = obj[pkcs11.Attribute.KEY_TYPE]
usage_flags = []
# Check each usage flag
try:
if obj[pkcs11.Attribute.DECRYPT]:
usage_flags.append("decrypt")
except KeyError:
pass
try:
if obj[pkcs11.Attribute.SIGN]:
usage_flags.append("sign")
except KeyError:
pass
try:
if obj[pkcs11.Attribute.UNWRAP]:
usage_flags.append("unwrap")
except KeyError:
pass
usage = ', '.join(usage_flags)
access = 'sensitive, always sensitive, never extractable' if obj[pkcs11.Attribute.SENSITIVE] else 'local'
return (f"{obj_type}; Algorithm: {key_type.name}\n"
f" label: {label}\n"
f" Usage: {usage}\n"
f" Access: {access}")
elif obj_class == pkcs11.ObjectClass.CERTIFICATE:
obj_type = "Certificate Object"
cert_type = obj[pkcs11.Attribute.CERTIFICATE_TYPE]
cert_data = obj[pkcs11.Attribute.VALUE]
# Decode the certificate using asn1crypto
try:
cert = ac_x509.Certificate.load(cert_data)
subject = cert['tbs_certificate']['subject']
# Convert subject from ASN.1 structure to a human-readable format
subject_dn_str = ', '.join(f"{k}={v}" for k, v in subject.native.items())
subject_dn_formatted = f"DN: {subject_dn_str}"
except Exception as e:
subject_dn_formatted = f"Error decoding DN: {str(e)}"
return (f"{obj_type}; type = X.509 cert\n"
f" label: {label}\n"
f" subject: {subject_dn_formatted}")
else:
return f"Unknown Object Class: {obj_class}"
except KeyError as e:
return f"Missing attribute: {e}"
# Example usage in list_objects
def list_objects(self):
try:
self.objects_text.delete(1.0, tk.END)
for obj in self.session.get_objects():
details = self.format_object_details(obj)
self.objects_text.insert(tk.END, details + "\n\n")
except pkcs11.PKCS11Error as e:
messagebox.showerror("Error", f"Failed to list objects: {str(e)}")
if __name__ == "__main__":
root = tk.Tk()
app = TokenManagerApp(root)
root.mainloop()