diff --git a/gp-okta.py b/gp-okta.py index dd78804..6345a81 100755 --- a/gp-okta.py +++ b/gp-okta.py @@ -9,6 +9,7 @@ Copyright (C) 2019 Taylor Dean (taylor@makeshift.dev) Copyright (C) 2020 Max Lanin (mlanin@evolutiongaming.com) Copyright (C) 2019-2020 Tino Lange (coldcoff@yahoo.com) + Copyright (C) 2022 David Keijser (keijser@gmail.com) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -29,8 +30,9 @@ THE SOFTWARE. """ from __future__ import print_function -import argparse, base64, getpass, io, os, re, shlex, signal, subprocess, sys, tempfile, time, traceback +import argparse, base64, getpass, io, os, re, shlex, signal, subprocess, sys, ssl, tempfile, time, traceback import requests +import requests.adapters from lxml import etree if sys.version_info >= (3,): @@ -56,6 +58,7 @@ from fido2.utils import websafe_decode from fido2.hid import CtapHidDevice from fido2.client import Fido2Client + from fido2.webauthn import PublicKeyCredentialRequestOptions, PublicKeyCredentialDescriptor, PublicKeyCredentialType have_fido = True except ImportError: pass @@ -126,7 +129,7 @@ def dbg(d, h, *xs): if not d: return for x in xs: - if not isinstance(x, dict) and not isinstance(x, list): + if not isinstance(x, (dict, list, tuple)): for line in x.split('\n'): print(u'[DEBUG] {0}: {1}'.format(h, line)) else: @@ -151,8 +154,11 @@ def err(s): print('[ERROR] {0}'.format(s), file=sys.stderr) sys.exit(1) -def _remx(c, v): return re.search(r'\s*' + v + r'\s*"?[=:]\s*(?:"((?:[^"\\]|\\.)*)"|\'((?:[^\'\\]|\\.)*)\')', c) -_refx = lambda mx: to_b(mx.group(1)).decode('unicode_escape').strip() +def _remx(c, v): + return re.search(r'\s*' + v + r'\s*"?[=:]\s*(?:"((?:[^"\\]|\\.)*)"|\'((?:[^\'\\]|\\.)*)\')', c) + +def _refx(mx): + return to_b(mx.group(1) or mx.group(2)).decode('unicode_escape').strip() def parse_xml(xml): # type: (str) -> etree._Element @@ -193,6 +199,14 @@ def parse_form(html, current_url=None): data[k] = v return url, data +class InsecureHTTPAdapter(requests.adapters.HTTPAdapter): + def __init__(self, *, ssl_context, **kwargs): + self._ssl_context = ssl_context + super().__init__(**kwargs) + + def init_poolmanager(self, connections, maxsize, **kwargs): + super().init_poolmanager(connections, maxsize, **kwargs, ssl_context=self._ssl_context) + class Conf(object): def __init__(self): # type: () -> None @@ -325,6 +339,9 @@ def from_data(cls, content): setattr(conf, k, conf._store[k].strip()) conf.debug = conf._store.get('debug', '').lower() in ['1', 'true'] s = requests.Session() + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) + ctx.options |= 0x4 + s.mount('https://', InsecureHTTPAdapter(ssl_context=ctx)) s.headers['User-Agent'] = 'PAN GlobalProtect' conf._session = s return conf @@ -657,27 +674,35 @@ def okta_mfa_webauthn(conf, factor, state_token): profile = rfactor['profile'] purl = parse_url(conf.okta_url) origin = '{0}://{1}'.format(purl[0], purl[1]) - challenge = rfactor['_embedded']['challenge']['challenge'] credentialId = websafe_decode(profile['credentialId']) allow_list = [{'type': 'public-key', 'id': credentialId}] + request_options = PublicKeyCredentialRequestOptions( + challenge = websafe_decode(rfactor['_embedded']['challenge']['challenge']), + rp_id = purl[1], + allow_credentials = [ + PublicKeyCredentialDescriptor( + PublicKeyCredentialType.PUBLIC_KEY, + websafe_decode(profile['credentialId'])) + ] + ) for dev in devices: client = Fido2Client(dev, origin) print('!!! Touch the flashing U2F device to authenticate... !!!') try: - result = client.get_assertion(purl[1], challenge, allow_list) - dbg(conf.debug, 'assertion.result', result) + result = client.get_assertion(request_options) + dbg(conf.debug, 'assertion.result', vars(result)) break except Exception: traceback.print_exc(file=sys.stderr) result = None if not result: return None - assertion, client_data = result[0][0], result[1] # only one cred in allowList, so only one response. + response = result.get_response(0) # only one cred in allowList, so only one response. data = { 'stateToken': state_token, - 'clientData': to_n((base64.b64encode(client_data)).decode('ascii')), - 'signatureData': to_n((base64.b64encode(assertion.signature)).decode('ascii')), - 'authenticatorData': to_n((base64.b64encode(assertion.auth_data)).decode('ascii')) + 'clientData': to_n((base64.b64encode(response.client_data)).decode('ascii')), + 'signatureData': to_n((base64.b64encode(response.signature)).decode('ascii')), + 'authenticatorData': to_n((base64.b64encode(response.authenticator_data)).decode('ascii')) } log('mfa {0} signature request [okta_url]'.format(provider)) _, _h, j = send_json_req(conf, 'okta', 'uf2 mfa signature', j['_links']['next']['href'], data, expected_url=conf.okta_url) @@ -1093,6 +1118,7 @@ def run_openconnect(conf, do_portal_auth, urls, saml_username, cookies): if conf.get_bool('execute'): ecmd = [os.path.expandvars(os.path.expanduser(x)) for x in shlex.split(cmd)] pp = subprocess.Popen(shlex.split(pcmd), stdout=subprocess.PIPE) + print(f"Command: {ecmd}") cp = subprocess.Popen(ecmd, stdin=pp.stdout, stdout=sys.stdout) if pp.stdout is not None: pp.stdout.close()