-
-
Notifications
You must be signed in to change notification settings - Fork 311
/
Copy pathidp_metadata_parser.py
274 lines (216 loc) · 11.5 KB
/
idp_metadata_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# -*- coding: utf-8 -*-
""" OneLogin_Saml2_IdPMetadataParser class
Metadata class of SAML Python Toolkit.
"""
from copy import deepcopy
from urllib.request import Request, urlopen
import ssl
from onelogin.saml2.constants import OneLogin_Saml2_Constants
from onelogin.saml2.xml_utils import OneLogin_Saml2_XML
class OneLogin_Saml2_IdPMetadataParser(object):
"""
A class that contain methods related to obtaining and parsing metadata from IdP
This class does not validate in any way the URL that is introduced,
make sure to validate it properly before use it in a get_metadata method.
"""
@classmethod
def get_metadata(
cls,
url,
validate_cert=True,
cafile=None,
capath=None,
timeout=None,
headers=None,
):
"""
Gets the metadata XML from the provided URL
:param url: Url where the XML of the Identity Provider Metadata is published.
:type url: string
:param validate_cert: If the url uses https schema, that flag enables or not the verification of the associated certificate.
:type validate_cert: bool
:param timeout: Timeout in seconds to wait for metadata response
:type timeout: int
:param headers: Extra headers to send in the request
:type headers: dict
:returns: metadata XML
:rtype: string
"""
valid = False
# Respect the no-TLS-certificate validation option
ctx = None
if not validate_cert:
if cafile or capath:
raise ValueError(
"Specifying 'cafile' or 'capath' while disabling certificate "
"validation is contradictory."
)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
request = Request(url, headers=headers or {})
response = urlopen(request, timeout=timeout, cafile=cafile, capath=capath, context=ctx)
xml = response.read()
if xml:
try:
dom = OneLogin_Saml2_XML.to_etree(xml)
idp_descriptor_nodes = OneLogin_Saml2_XML.query(dom, "//md:IDPSSODescriptor")
if idp_descriptor_nodes:
valid = True
except Exception:
pass
if not valid:
raise Exception("Not valid IdP XML found from URL: %s" % (url))
return xml
@classmethod
def parse_remote(cls, url, validate_cert=True, entity_id=None, timeout=None, **kwargs):
"""
Gets the metadata XML from the provided URL and parse it, returning a dict with extracted data
:param url: Url where the XML of the Identity Provider Metadata is published.
:type url: string
:param validate_cert: If the url uses https schema, that flag enables or not the verification of the associated certificate.
:type validate_cert: bool
:param entity_id: Specify the entity_id of the EntityDescriptor that you want to parse a XML
that contains multiple EntityDescriptor.
:type entity_id: string
:param timeout: Timeout in seconds to wait for metadata response
:type timeout: int
:returns: settings dict with extracted data
:rtype: dict
"""
idp_metadata = cls.get_metadata(url, validate_cert, timeout, headers=kwargs.pop("headers", None))
return cls.parse(idp_metadata, entity_id=entity_id, **kwargs)
@classmethod
def parse(cls, idp_metadata, required_sso_binding=OneLogin_Saml2_Constants.BINDING_HTTP_REDIRECT, required_slo_binding=OneLogin_Saml2_Constants.BINDING_HTTP_REDIRECT, entity_id=None):
"""
Parses the Identity Provider metadata and return a dict with extracted data.
If there are multiple <IDPSSODescriptor> tags, parse only the first.
Parses only those SSO endpoints with the same binding as given by
the `required_sso_binding` parameter.
Parses only those SLO endpoints with the same binding as given by
the `required_slo_binding` parameter.
If the metadata specifies multiple SSO endpoints with the required
binding, extract only the first (the same holds true for SLO
endpoints).
:param idp_metadata: XML of the Identity Provider Metadata.
:type idp_metadata: string
:param required_sso_binding: Parse only POST or REDIRECT SSO endpoints.
:type required_sso_binding: one of OneLogin_Saml2_Constants.BINDING_HTTP_REDIRECT
or OneLogin_Saml2_Constants.BINDING_HTTP_POST
:param required_slo_binding: Parse only POST or REDIRECT SLO endpoints.
:type required_slo_binding: one of OneLogin_Saml2_Constants.BINDING_HTTP_REDIRECT
or OneLogin_Saml2_Constants.BINDING_HTTP_POST
:param entity_id: Specify the entity_id of the EntityDescriptor that you want to parse a XML
that contains multiple EntityDescriptor.
:type entity_id: string
:returns: settings dict with extracted data
:rtype: dict
"""
data = {}
dom = OneLogin_Saml2_XML.to_etree(idp_metadata)
idp_entity_id = want_authn_requests_signed = idp_name_id_format = idp_sso_url = idp_slo_url = certs = None
entity_desc_path = "//md:EntityDescriptor"
if entity_id:
entity_desc_path += "[@entityID='%s']" % entity_id
entity_descriptor_nodes = OneLogin_Saml2_XML.query(dom, entity_desc_path)
if len(entity_descriptor_nodes) > 0:
entity_descriptor_node = entity_descriptor_nodes[0]
idp_descriptor_nodes = OneLogin_Saml2_XML.query(entity_descriptor_node, "./md:IDPSSODescriptor")
if len(idp_descriptor_nodes) > 0:
idp_descriptor_node = idp_descriptor_nodes[0]
idp_entity_id = entity_descriptor_node.get("entityID", None)
want_authn_requests_signed = idp_descriptor_node.get("WantAuthnRequestsSigned", None)
name_id_format_nodes = OneLogin_Saml2_XML.query(idp_descriptor_node, "./md:NameIDFormat")
if len(name_id_format_nodes) > 0:
idp_name_id_format = OneLogin_Saml2_XML.element_text(name_id_format_nodes[0])
sso_nodes = OneLogin_Saml2_XML.query(idp_descriptor_node, "./md:SingleSignOnService[@Binding='%s']" % required_sso_binding)
if len(sso_nodes) > 0:
idp_sso_url = sso_nodes[0].get("Location", None)
slo_nodes = OneLogin_Saml2_XML.query(idp_descriptor_node, "./md:SingleLogoutService[@Binding='%s']" % required_slo_binding)
if len(slo_nodes) > 0:
idp_slo_url = slo_nodes[0].get("Location", None)
signing_nodes = OneLogin_Saml2_XML.query(idp_descriptor_node, "./md:KeyDescriptor[not(contains(@use, 'encryption'))]/ds:KeyInfo/ds:X509Data/ds:X509Certificate")
encryption_nodes = OneLogin_Saml2_XML.query(idp_descriptor_node, "./md:KeyDescriptor[not(contains(@use, 'signing'))]/ds:KeyInfo/ds:X509Data/ds:X509Certificate")
if len(signing_nodes) > 0 or len(encryption_nodes) > 0:
certs = {}
if len(signing_nodes) > 0:
certs["signing"] = []
for cert_node in signing_nodes:
certs["signing"].append("".join(OneLogin_Saml2_XML.element_text(cert_node).split()))
if len(encryption_nodes) > 0:
certs["encryption"] = []
for cert_node in encryption_nodes:
certs["encryption"].append("".join(OneLogin_Saml2_XML.element_text(cert_node).split()))
data["idp"] = {}
if idp_entity_id is not None:
data["idp"]["entityId"] = idp_entity_id
if idp_sso_url is not None:
data["idp"]["singleSignOnService"] = {}
data["idp"]["singleSignOnService"]["url"] = idp_sso_url
data["idp"]["singleSignOnService"]["binding"] = required_sso_binding
if idp_slo_url is not None:
data["idp"]["singleLogoutService"] = {}
data["idp"]["singleLogoutService"]["url"] = idp_slo_url
data["idp"]["singleLogoutService"]["binding"] = required_slo_binding
if want_authn_requests_signed is not None:
data["security"] = {}
data["security"]["authnRequestsSigned"] = want_authn_requests_signed == "true"
if idp_name_id_format:
data["sp"] = {}
data["sp"]["NameIDFormat"] = idp_name_id_format
if certs is not None:
if (len(certs) == 1 and (("signing" in certs and len(certs["signing"]) == 1) or ("encryption" in certs and len(certs["encryption"]) == 1))) or (
("signing" in certs and len(certs["signing"]) == 1) and ("encryption" in certs and len(certs["encryption"]) == 1 and certs["signing"][0] == certs["encryption"][0])
):
if "signing" in certs:
data["idp"]["x509cert"] = certs["signing"][0]
else:
data["idp"]["x509cert"] = certs["encryption"][0]
else:
data["idp"]["x509certMulti"] = certs
return data
@staticmethod
def merge_settings(settings, new_metadata_settings):
"""
Will update the settings with the provided new settings data extracted from the IdP metadata
:param settings: Current settings dict data
:type settings: dict
:param new_metadata_settings: Settings to be merged (extracted from IdP metadata after parsing)
:type new_metadata_settings: dict
:returns: merged settings
:rtype: dict
"""
for d in (settings, new_metadata_settings):
if not isinstance(d, dict):
raise TypeError("Both arguments must be dictionaries.")
# Guarantee to not modify original data (`settings.copy()` would not
# be sufficient, as it's just a shallow copy).
result_settings = deepcopy(settings)
# previously I will take care of cert stuff
if "idp" in new_metadata_settings and "idp" in result_settings:
if new_metadata_settings["idp"].get("x509cert", None) and result_settings["idp"].get("x509certMulti", None):
del result_settings["idp"]["x509certMulti"]
if new_metadata_settings["idp"].get("x509certMulti", None) and result_settings["idp"].get("x509cert", None):
del result_settings["idp"]["x509cert"]
# Merge `new_metadata_settings` into `result_settings`.
dict_deep_merge(result_settings, new_metadata_settings)
return result_settings
def dict_deep_merge(a, b, path=None):
"""Deep-merge dictionary `b` into dictionary `a`.
Kudos to http://stackoverflow.com/a/7205107/145400
"""
if path is None:
path = []
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
dict_deep_merge(a[key], b[key], path + [str(key)])
elif a[key] == b[key]:
# Key conflict, but equal value.
pass
else:
# Key/value conflict. Prioritize b over a.
a[key] = b[key]
else:
a[key] = b[key]
return a