-
Notifications
You must be signed in to change notification settings - Fork 156
/
chisel2.py
212 lines (183 loc) · 6.81 KB
/
chisel2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
"""
A simple client that uses the Python ACME library to run a test issuance against
a local Pebble server. Unlike chisel.py this version implements the most recent
version of the ACME specification. Usage:
$ virtualenv venv
$ . venv/bin/activate
$ pip install -r requirements.txt
$ python chisel2.py foo.com bar.com
"""
from __future__ import print_function
import logging
import os
import ssl
import sys
import signal
import threading
import time
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.hazmat.primitives import hashes
import OpenSSL
import josepy
from acme import challenges
from acme import client as acme_client
from acme import crypto_util as acme_crypto_util
from acme import errors as acme_errors
from acme import messages
from acme import standalone
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(int(os.getenv('LOGLEVEL', 0)))
DIRECTORY = os.getenv('DIRECTORY', 'https://localhost:14000/dir')
ACCEPTABLE_TOS = os.getenv('ACCEPTABLE_TOS',"data:text/plain,Do%20what%20thou%20wilt")
PORT = os.getenv('PORT', '5002')
# URLs to control dns-test-srv
SET_TXT = "http://localhost:8055/set-txt"
CLEAR_TXT = "http://localhost:8055/clear-txt"
def wait_for_acme_server():
"""Wait for directory URL set in the DIRECTORY env variable to respond"""
while True:
try:
if requests.get(DIRECTORY).status_code == 200:
return
except requests.exceptions.ConnectionError:
pass
time.sleep(0.1)
def make_client(email=None):
"""Build an acme.Client and register a new account with a random key."""
key = josepy.JWKRSA(key=rsa.generate_private_key(65537, 2048, default_backend()))
net = acme_client.ClientNetwork(key, user_agent="Boulder integration tester")
directory = messages.Directory.from_json(net.get(DIRECTORY).json())
client = acme_client.ClientV2(directory, net)
tos = client.directory.meta.terms_of_service
if tos == ACCEPTABLE_TOS:
net.account = client.new_account(messages.NewRegistration.from_data(email=email,
terms_of_service_agreed=True))
else:
raise Exception("Unrecognized terms of service URL %s" % tos)
return client
def get_chall(authz, typ):
for chall_body in authz.body.challenges:
if isinstance(chall_body.chall, typ):
return chall_body
raise Exception("No %s challenge found" % typ)
class ValidationError(Exception):
"""An error that occurs during challenge validation."""
def __init__(self, domain, problem_type, detail, *args, **kwargs):
self.domain = domain
self.problem_type = problem_type
self.detail = detail
def __str__(self):
return "%s: %s: %s" % (self.domain, self.problem_type, self.detail)
def make_csr(domains):
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
return acme_crypto_util.make_csr(pem, domains, False)
def http_01_answer(client, chall_body):
"""Return an HTTP01Resource to server in response to the given challenge."""
response, validation = chall_body.response_and_validation(client.net.key)
return standalone.HTTP01RequestHandler.HTTP01Resource(
chall=chall_body.chall, response=response,
validation=validation)
def auth_and_issue(domains, chall_type="http-01", email=None, cert_output=None, client=None):
"""Make authzs for each of the given domains, set up a server to answer the
challenges in those authzs, tell the ACME server to validate the challenges,
then poll for the authzs to be ready and issue a cert."""
if client is None:
client = make_client(email)
csr_pem = make_csr(domains)
order = client.new_order(csr_pem)
authzs = order.authorizations
if chall_type == "http-01":
cleanup = do_http_challenges(client, authzs)
elif chall_type == "dns-01":
cleanup = do_dns_challenges(client, authzs)
else:
raise Exception("invalid challenge type %s" % chall_type)
try:
order = client.poll_and_finalize(order)
finally:
cleanup()
return order
def do_dns_challenges(client, authzs):
cleanup_hosts = []
for a in authzs:
c = get_chall(a, challenges.DNS01)
name, value = (c.validation_domain_name(a.body.identifier.value),
c.validation(client.net.key))
cleanup_hosts.append(name)
requests.post(SET_TXT, json={
"host": name + ".",
"value": value
}).raise_for_status()
client.answer_challenge(c, c.response(client.net.key))
def cleanup():
for host in cleanup_hosts:
requests.post(CLEAR_TXT, json={
"host": host + "."
}).raise_for_status()
return cleanup
def do_http_challenges(client, authzs):
port = int(PORT)
challs = [get_chall(a, challenges.HTTP01) for a in authzs]
answers = set([http_01_answer(client, c) for c in challs])
server = standalone.HTTP01Server(("", port), answers)
thread = threading.Thread(target=server.serve_forever)
thread.start()
# cleanup has to be called on any exception, or when validation is done.
# Otherwise the process won't terminate.
def cleanup():
server.shutdown()
server.server_close()
thread.join()
try:
# Loop until the HTTP01Server is ready.
while True:
try:
if requests.get("http://localhost:{0}".format(port)).status_code == 200:
break
except requests.exceptions.ConnectionError:
pass
time.sleep(0.1)
for chall_body in challs:
client.answer_challenge(chall_body, chall_body.response(client.net.key))
except Exception:
cleanup()
raise
return cleanup
def expect_problem(problem_type, func):
"""Run a function. If it raises a ValidationError or messages.Error that
contains the given problem_type, return. If it raises no error or the wrong
error, raise an exception."""
ok = False
try:
func()
except ValidationError as e:
if e.problem_type == problem_type:
ok = True
else:
raise
except messages.Error as e:
if problem_type in e.__str__():
ok = True
else:
raise
if not ok:
raise Exception('Expected %s, got no error' % problem_type)
if __name__ == "__main__":
# Die on SIGINT
signal.signal(signal.SIGINT, signal.SIG_DFL)
domains = sys.argv[1:]
if len(domains) == 0:
print(__doc__)
sys.exit(0)
try:
wait_for_acme_server()
auth_and_issue(domains)
except messages.Error as e:
print(e)
sys.exit(1)