From a9da59718a00e0ce840c81ffc541e683e0d474d1 Mon Sep 17 00:00:00 2001 From: Sydney Sweeney <30902737+kittysyddy@users.noreply.github.com> Date: Tue, 14 Jan 2020 16:40:45 -0500 Subject: [PATCH] Add command-line support for Duo MFA --- docs/README.rst | 7 + tests/unit_test.py | 89 +++++++++- tokendito/duo_helpers.py | 363 ++++++++++++++++++++++++++++++++++++++ tokendito/helpers.py | 161 +++++++++++++---- tokendito/okta_helpers.py | 117 ++++++------ 5 files changed, 636 insertions(+), 101 deletions(-) create mode 100644 tokendito/duo_helpers.py diff --git a/docs/README.rst b/docs/README.rst index 3a73df49..527c3929 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -111,6 +111,13 @@ Additional Usage Reference Note: DEBUG level may display credentials +Supported MFA Options: +"""""""""""""""""""""" +- Native Okta factors (push, call, sms, TOTP) *except Biometrics (FIDO webauthn)* +- Google Authenticator TOTP +- Duo (push, call, sms, TOTP) NOTE: These methods are currently *not* pre-configurable in tokendito settings and have to be selected during runtime. + + To upgrade: """"""""""" ``pip install --upgrade tokendito`` diff --git a/tests/unit_test.py b/tests/unit_test.py index 600c9f3c..eb07b0bb 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -112,12 +112,99 @@ def test_set_okta_password(monkeypatch): ('https://login.acme.org/home/amazon_aws/0123456789abcdef0123/456', True), ('https://acme.okta.org/home/amazon_aws/0123456789abcdef0123/456?fromHome=true', True)]) def test_validate_okta_aws_app_url(url, expected): - """Test whether the Okta URL functions.""" + """Test whether the Okta URL is parsed correctly.""" from tokendito import helpers assert helpers.validate_okta_aws_app_url(input_url=url) is expected +@pytest.mark.parametrize('test,limit,expected', [ + (0, 10, True), + (5, 10, True), + (10, 10, False), + (-1, 10, False), + (1, 0, False) +]) +def test_check_within_range(mocker, test, limit, expected): + """Test whether a given number is in the range 0 >= num < limit.""" + from tokendito import helpers + + mocker.patch('logging.error') + assert helpers.check_within_range(test, limit) is expected + + +@pytest.mark.parametrize('value,expected', [ + ('-1', False), + ('0', True), + ('1', True), + (-1, False), + (0, True), + (1, True), + (3.7, False), + ('3.7', False), + ('seven', False), + ('0xff', False), + (None, False)]) +def test_check_integer(value, expected, mocker): + """Test whether the integer testing function works within boundaries.""" + from tokendito import helpers + + mocker.patch('logging.error') + assert helpers.check_integer(value) is expected + + +@pytest.mark.parametrize('test,limit,expected', [ + (1, 10, True), + (-1, 10, False), + ('pytest', 10, False) +]) +def test_validate_input(mocker, test, limit, expected): + """Check if a given input is within the 0 >= num < limit range.""" + from tokendito import helpers + + mocker.patch('logging.error') + assert helpers.validate_input(test, limit) is expected + + +def test_get_input(monkeypatch): + """Check if provided input is return unmodified.""" + from tokendito import helpers + + monkeypatch.setattr('tokendito.helpers.input', lambda _: 'pytest_patched') + assert helpers.get_input() == 'pytest_patched' + + +@pytest.mark.parametrize('value,expected', [ + ('00', 0), + ('01', 1), + ('5', 5) +]) +def test_collect_integer(monkeypatch, value, expected): + """Check if a given digit or series of digits are properly casted to int.""" + from tokendito import helpers + + monkeypatch.setattr('tokendito.helpers.input', lambda _: value) + assert helpers.collect_integer(10) == expected + + +def test_prepare_payload(): + """Check if values passed return in a dictionary.""" + from tokendito import helpers + + assert helpers.prepare_payload(pytest_key='pytest_val') == {'pytest_key': 'pytest_val'} + assert helpers.prepare_payload(pytest_key=None) == {'pytest_key': None} + assert helpers.prepare_payload(pytest_key1='pytest_val1', pytest_key2='pytest_val2') == { + 'pytest_key1': 'pytest_val1', 'pytest_key2': 'pytest_val2'} + + +def test_set_passcode(monkeypatch): + """Check if numerical passcode can handle leading zero values.""" + from tokendito import duo_helpers + + monkeypatch.setattr('tokendito.helpers.input', lambda _: '0123456') + assert duo_helpers.set_passcode({'factor': 'passcode'}) == '0123456' + + def test_process_environment(monkeypatch, valid_settings, invalid_settings): """Test whether environment variables are set in settings.*.""" from tokendito import helpers, settings diff --git a/tokendito/duo_helpers.py b/tokendito/duo_helpers.py new file mode 100644 index 00000000..d14f663c --- /dev/null +++ b/tokendito/duo_helpers.py @@ -0,0 +1,363 @@ +# vim: set filetype=python ts=4 sw=4 +# -*- coding: utf-8 -*- +"""This module handles Duo operations.""" +from __future__ import (absolute_import, division, + print_function, unicode_literals) + +from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 + int, list, map, next, object, oct, open, pow, range, + round, str, super, zip) +import json +import logging +import sys +import time +from urllib.parse import unquote, urlparse + +from bs4 import BeautifulSoup +from future import standard_library +import requests +from tokendito import helpers +from tokendito import settings +standard_library.install_aliases() + + +def prepare_duo_info(selected_okta_factor): + """Aggregate most of the parameters needed throughout the Duo authentication process. + + :param selected_okta_factor: dict response describing Duo factor in Okta. + :return duo_info: dict of parameters for Duo + """ + duo_info = {} + okta_factor = selected_okta_factor["_embedded"]["factor"]["_embedded"]["verification"] + duo_info["okta_factor"] = okta_factor + duo_info["factor_id"] = selected_okta_factor["_embedded"]["factor"]["id"] + + duo_info["state_token"] = selected_okta_factor["stateToken"] + duo_info["okta_callback_url"] = okta_factor["_links"]["complete"]["href"] + duo_info["tx"] = okta_factor["signature"].split(":")[0] + duo_info["app_sig"] = okta_factor["signature"].split(":")[1] + duo_info["parent"] = "{}/signin/verify/duo/web".format( + settings.okta_org) + duo_info["host"] = okta_factor["host"] + duo_info["sid"] = "" + + version = okta_factor["_links"]["script"]["href"].split("-v")[1] + duo_info["version"] = version.strip(".js") + + return duo_info + + +def duo_api_post(url, params={}, headers={}, payload={}): + """Error handling and response parsing wrapper for Duo POSTs. + + :param url: The URL being connected to. + :param params: URL query parameters. + :param headers: Request headers. + :param payload: Request body. + :return response: Response to the API request. + """ + try: + response = requests.request( + 'POST', url, params=params, headers=headers, data=payload) + except Exception as request_issue: + logging.error( + "There was an error connecting to the Duo API: \n{}".format(request_issue)) + sys.exit(1) + + json_message = None + try: + json_message = response.json() + except ValueError: + logging.debug("Non-json response from Duo API: \n{}".format(response)) + + if response.status_code != 200: + logging.critical("Your Duo authentication has failed with status {}.".format( + response.status_code)) + if json_message and json_message["stat"].lower() != "ok": + logging.critical("\n{}".format( + response.status_code, json_message["message"])) + else: + logging.critical('Please re-run the program with parameter' + ' "--loglevel debug" to see more information.') + sys.exit(2) + + return response + + +def get_duo_sid(duo_info): + """Perform the initial Duo authentication request to obtain the SID. + + The SID is referenced throughout the authentication process for Duo. + + :param duo_info: dict response describing Duo factor in Okta. + :return: duo_info with added SID. + :return: duo_auth_response, contains html content listing available factors. + """ + params = helpers.prepare_payload( + tx=duo_info["tx"], v=duo_info["version"], parent=duo_info["parent"]) + + url = "https://{}/frame/web/v1/auth".format(duo_info["host"]) + logging.info("Calling Duo {} with params {}".format( + urlparse(url).path, params.keys())) + duo_auth_response = duo_api_post(url, params=params) + + try: + duo_auth_redirect = urlparse("{}".format( + unquote(duo_auth_response.url))).query + duo_info["sid"] = duo_auth_redirect.strip("sid=") + except Exception as sid_error: + logging.error("There was an error getting your SID." + "Please try again. \n{}".format(sid_error)) + + return duo_info, duo_auth_response + + +def get_duo_devices(duo_auth): + """Parse Duo auth response to extract user's MFA options. + + The /frame/web/v1/auth API returns an html page that lists + devices and their mfa options for the user logging in. + The return data type (list of dicts) is intended to allow us to + do printout padding and indexing when interacting with the end user. + + :param duo_auth: contains html content listing available factors. + :return factor_options: list of dict objects describing each MFA option. + """ + soup = BeautifulSoup(duo_auth.content, "html.parser") + + device_soup = soup.find("select", {"name": "device"}).findAll("option") + devices = ["{} - {}".format(d["value"], d.text) for d in device_soup] + if not devices: + logging.error("Please configure devices for your Duo MFA and retry.") + sys.exit(2) + + factor_options = [] + for device in devices: + options = soup.find( + "fieldset", {"data-device-index": device.split(" - ")[0]}) + factors = options.findAll("input", {"name": "factor"}) + for factor in factors: + factor_option = {} + factor_option["device"] = device + factor_option["factor"] = factor["value"] + factor_options.append(factor_option) + return factor_options + + +def parse_duo_mfa_challenge(mfa_challenge): + """Gracefully parse Duo MFA challenge response. + + :param mfa_challenge: Duo API response for MFA challenge. + :return txid: Duo transaction ID. + """ + try: + mfa_challenge = mfa_challenge.json() + mfa_status = mfa_challenge["stat"] + txid = mfa_challenge["response"]["txid"] + except ValueError as value_error: + logging.error( + "The Duo API returned a non-json response: \n{}".format(value_error)) + sys.exit(1) + except KeyError as key_error: + logging.error( + "The Duo API response is missing a required parameter: \n{}".format(key_error)) + print(json.dumps(mfa_challenge)) + sys.exit(1) + + if mfa_status == "fail": + logging.error("Your Duo authentication has failed: \n{}".format( + mfa_challenge["message"])) + sys.exit(1) + return txid + + +def duo_mfa_challenge(duo_info, mfa_option, passcode): + """Poke Duo to challenge the selected factor. + + After the user has selected their device and factor of choice, + tell Duo to send a challenge. This is where the end user will receive + a phone call or push. + + :param duo_info: dict of parameters for Duo + :param mfa_option: the user's selected second factor. + :return txid: Duo transaction ID used to track this auth attempt. + """ + url = "https://{}/frame/prompt".format(duo_info["host"]) + device = mfa_option["device"].split(" - ")[0] + mfa_data = helpers.prepare_payload(factor=mfa_option["factor"], + device=device, + sid=duo_info["sid"], + out_of_date=False, + days_out_of_date=0, + days_to_block=None) + mfa_data["async"] = True # async is a reserved keyword + if passcode: + mfa_data["passcode"] = passcode + mfa_challenge = duo_api_post(url, payload=mfa_data) + txid = parse_duo_mfa_challenge(mfa_challenge) + + logging.debug("Sent MFA Challenge and obtained Duo transaction ID.") + return txid + + +def get_mfa_response(mfa_result): + """Extract json from mfa response. + + :param mfa_result: raw response from mfa api + :return verify_mfa: json response from mfa api + """ + try: + verify_mfa = mfa_result.json()["response"] + except Exception as parse_error: + logging.error( + "There was an error parsing the mfa challenge result: \n{}".format(parse_error)) + sys.exit(1) + return verify_mfa + + +def parse_challenge(verify_mfa, challenge_result): + """Parse the challenge response. + + :param mfa_result: response from MFA challenge status request + :return challenge status: status of MFA challenge + :return challenge reason: additional info about challenge status + """ + challenge_reason = None + + if "status" in verify_mfa: + print(verify_mfa["status"]) + + if "reason" in verify_mfa: + challenge_reason = verify_mfa["reason"] + + if "result" in verify_mfa: + logging.info("Result received: {}".format(verify_mfa["result"])) + challenge_result = verify_mfa["result"].lower() + + logging.debug("Challenge result is {}".format(challenge_result)) + return challenge_result, challenge_reason + + +def duo_mfa_verify(duo_info, txid): + """Verify MFA challenge completion. + + After the user has received the MFA challenge, query the Duo API + until the challenge is completed. + + :param duo_info: dict of parameters for Duo. + :param mfa_option: the user's selected second factor. + :return txid: Duo transaction ID used to track this auth attempt. + """ + url = "https://{}/frame/status".format(duo_info["host"]) + challenged_mfa = helpers.prepare_payload(txid=txid, sid=duo_info["sid"]) + challenge_result = None + + while True: + logging.debug("Waiting for MFA challenge response") + mfa_result = duo_api_post(url, payload=challenged_mfa) + verify_mfa = get_mfa_response(mfa_result) + challenge_result, challenge_reason = parse_challenge( + verify_mfa, challenge_result) + + if challenge_result is None: + continue + elif challenge_result == "success": + logging.debug("Successful MFA challenge received") + break + elif challenge_result == "failure": + logging.critical("MFA challenge has failed:" + " {}. Please try again.".format(challenge_reason)) + sys.exit(2) + else: + logging.debug("MFA challenge result: {}" + "Reason: {}\n\n".format(challenge_result, challenge_reason)) + time.sleep(1) + + return verify_mfa + + +def duo_factor_callback(duo_info, verify_mfa): + """Inform factor callback api of successful challenge. + + This request seems to inform this factor's callback url + that the challenge process has been completed. + + :param duo_info: dict of parameters for Duo. + :param verify_mfa: verified mfa challenge response from status api. + :return sig_response: required to sign final Duo callback request. + """ + factor_callback_url = "https://{}{}".format( + duo_info["host"], verify_mfa["result_url"]) + factor_callback = duo_api_post(factor_callback_url, payload={ + "sid": duo_info["sid"]}) + + try: + sig_response = "{}:{}".format( + factor_callback.json()["response"]["cookie"], duo_info["app_sig"]) + except Exception as sig_error: + logging.error("There was an error getting your" + " application signature from Duo: \n{}".format(json.dumps(sig_error))) + + logging.debug("Completed factor callback.") + return sig_response + + +def set_passcode(mfa_option): + """Set totp passcode. + + If the user has selected the passcode option, collect their TOTP. + + :param mfa_option: selected factor + :return passcode: passcode value from user + """ + passcode = None + if mfa_option["factor"].lower() == "passcode": + print('Type your TOTP and press Enter') + passcode = helpers.get_input() + return passcode + + +def authenticate_duo(selected_okta_factor): + """Accomplish MFA via Duo. + + This is the main function that coordinates the Duo + multifactor fetching, presentation, selection, challenge, + and verification until making an Okta callback. + + :param selected_okta_factor: Duo factor information retrieved from Okta. + :return payload: required payload for Okta callback + :return headers: required headers for Okta callback + """ + try: + duo_info = prepare_duo_info(selected_okta_factor) + except KeyError as missing_key: + logging.error( + "There was an issue parsing the Okta factor." + " Please try again. \n{}".format(missing_key)) + sys.exit(1) + + # Collect devices, factors, auth params for Duo + duo_info, duo_auth_response = get_duo_sid(duo_info) + factor_options = get_duo_devices(duo_auth_response) + mfa_index = helpers.select_preferred_mfa_index( + factor_options, factor_key="factor", subfactor_key="device") + + mfa_option = factor_options[mfa_index] + logging.debug("Selected MFA is [{}]".format(mfa_option)) + passcode = set_passcode(mfa_option) + + txid = duo_mfa_challenge(duo_info, mfa_option, passcode) + verify_mfa = duo_mfa_verify(duo_info, txid) + + # Make factor callback to Duo + sig_response = duo_factor_callback(duo_info, verify_mfa) + + # Prepare for Okta callback + payload = helpers.prepare_payload(id=duo_info["factor_id"], + sig_response=sig_response, + stateToken=duo_info["state_token"]) + headers = {} + headers["content-type"] = "application/json" + headers["accept"] = "application/json" + + return payload, headers, duo_info["okta_callback_url"] diff --git a/tokendito/helpers.py b/tokendito/helpers.py index 97993486..e4ae7477 100644 --- a/tokendito/helpers.py +++ b/tokendito/helpers.py @@ -204,6 +204,29 @@ def select_role_arn(role_arns, saml_xml, saml_response_string): return selected_role +def select_preferred_mfa_index(mfa_options, factor_key="provider", subfactor_key="factorType"): + """Show all the MFA options to the users. + + :param mfa_options: List of available MFA options + :return: MFA option selected index by the user from the output + """ + logging.debug("Show all the MFA options to the users.") + print('\nSelect your preferred MFA method and press Enter:') + + longest_index = len(str(len(mfa_options))) + for (i, mfa_option) in enumerate(mfa_options): + padding_index = longest_index - len(str(i)) + longest_factor_name = max([len(d[factor_key]) for d in mfa_options]) + + print('[{}] {}{: <{}} {}'.format( + i, padding_index*' ', mfa_option[factor_key], longest_factor_name, + mfa_option[subfactor_key])) + + user_input = collect_integer(len(mfa_options)) + + return user_input + + def prompt_role_choices(role_arns, saml_xml, saml_response_string): """Ask user to select role. @@ -229,22 +252,8 @@ def prompt_role_choices(role_arns, saml_xml, saml_response_string): print('[{}] {}{: <{}} {}'.format( i, padding_index*' ', account_alias, longest_alias, arn)) - while True: - user_input = to_unicode(input('-> ')) - - try: - user_input = int(user_input) - except ValueError as error: - print('Invalid input, try again.' + str(error)) - logging.warning("Invalid input [{}]".format(error)) - continue - if user_input in range(0, len(role_arns)): - logging.debug("User selected item {}.".format(user_input)) - break - continue - + user_input = collect_integer(len(role_arns)) selected_role = sorted_role_arns[user_input] - logging.debug("Selected role [{}]".format(user_input)) return selected_role @@ -265,8 +274,8 @@ def print_selected_role(profile_name, expiration_time): '\nOR\n\t' 'export AWS_PROFILE=\'{}\'\n\n' 'Credentials are valid until {}.' - ).format(profile_name, settings.aws_shared_credentials_file, - profile_name, profile_name, expiration_time) + ).format(profile_name, settings.aws_shared_credentials_file, + profile_name, profile_name, expiration_time) return print(msg) @@ -360,7 +369,8 @@ def get_account_aliases(saml_xml, saml_response_string): soup = BeautifulSoup(aws_response.text, "html.parser") account_names = soup.find_all(text=re.compile('Account:')) - alias_table = {str(i.split(" ")[-1]).strip("()"): i.split(" ")[1] for i in account_names} + alias_table = {str(i.split(" ")[-1]).strip("()"): + i.split(" ")[1] for i in account_names} return alias_table @@ -388,7 +398,8 @@ def process_ini_file(file, profile): try: for (key, val) in config.items(profile): if hasattr(settings, key): - logging.debug('Set option {}={} from ini file'.format(key, val)) + logging.debug( + 'Set option {}={} from ini file'.format(key, val)) setattr(settings, key, val) except configparser.NoSectionError: logging.error('Profile \'{}\' does not exist.'.format(profile)) @@ -403,7 +414,8 @@ def process_arguments(args): """ for (key, val) in vars(args).items(): if hasattr(settings, key) and val is not None: - logging.debug('Set option {}={} from command line'.format(key, val)) + logging.debug( + 'Set option {}={} from command line'.format(key, val)) setattr(settings, key, val) @@ -419,16 +431,6 @@ def process_environment(): setattr(settings, key, os.getenv(key.upper())) -def process_okta_credentials(): - """Set Okta credentials. - - :return: Success or error message - """ - logging.debug("Set Okta credentials.") - set_okta_username() - set_okta_password() - - def process_okta_aws_app_url(): """Process Okta app url. @@ -451,7 +453,6 @@ def user_configuration_input(): """Obtain user input for the user. :return: (okta app url, organization username) - """ logging.debug("Obtain user input for the user.") url = '' @@ -552,8 +553,6 @@ def update_aws_credentials(profile, aws_access_key, aws_secret_key, aws_session_ :param aws_access_key: AWS access key :param aws_secret_key: AWS secret access key :param aws_session_token: Session token - :return: - """ cred_file = settings.aws_shared_credentials_file cred_dir = os.path.dirname(cred_file) @@ -602,6 +601,98 @@ def update_aws_config(profile, output, region): config.write(file) +def check_within_range(user_input, valid_range): + """Validate the user input is within the range of the presented menu. + + :param user_input: integer-validated user input. + :param valid_range: the valid range presented on the user's menu. + :return range_validation: true or false + """ + range_validation = False + if int(user_input) in range(0, valid_range): + range_validation = True + else: + logging.debug("Valid range is {}".format(valid_range)) + logging.error("Value is not in within the selection range.") + return range_validation + + +def check_integer(value): + """Validate integer. + + :param value: value to be validated. + :return: True when the number is a positive integer, false otherwise. + """ + integer_validation = False + if str(value).isdigit(): + integer_validation = True + else: + logging.error("Please enter a valid integer.") + + return integer_validation + + +def validate_input(value, valid_range): + """Validate user input is an integer and within menu range. + + :param value: user input + :param valid_range: valid range based on how many menu options available to user. + """ + integer_validation = check_integer(value) + if integer_validation and valid_range: + integer_validation = check_within_range(value, valid_range) + return integer_validation + + +def get_input(prompt='-> '): + """Collect user input for TOTP. + + :return user_input: raw from user. + """ + user_input = to_unicode(input(prompt)) + logging.debug("User input [{}]".format(user_input)) + + return user_input + + +def collect_integer(valid_range): + """Collect input from user. + + Prompt the user for input. Validate it and cast to integer. + + :param valid_range: number of menu options available to user. + :return user_input: validated, casted integer from user. + """ + user_input = None + while True: + user_input = get_input() + valid_input = validate_input(user_input, valid_range) + logging.debug("User input validation status is {}".format(valid_input)) + if valid_input: + user_input = int(user_input) + break + return user_input + + +def prepare_payload(**kwargs): + """Prepare payload for the HTTP request header. + + :param kwargs: parameters to get together + :return: payload for the http header + """ + logging.debug("Prepare payload") + + payload_dict = {} + if kwargs is not None: + for key, value in list(kwargs.items()): + payload_dict[key] = value + + if key != 'password': + logging.debug("Prepare payload [{} {}]".format(key, value)) + + return payload_dict + + def process_options(args): """Collect all user-specific credentials and config params.""" if args.version: @@ -621,4 +712,6 @@ def process_options(args): process_okta_aws_app_url() # Set username and password for Okta Authentication - process_okta_credentials() + logging.debug("Set Okta credentials.") + set_okta_username() + set_okta_password() diff --git a/tokendito/okta_helpers.py b/tokendito/okta_helpers.py index f20bd98c..cf082adf 100644 --- a/tokendito/okta_helpers.py +++ b/tokendito/okta_helpers.py @@ -20,12 +20,14 @@ from future import standard_library import requests +from tokendito import duo_helpers from tokendito import helpers from tokendito import settings + standard_library.install_aliases() -def okta_verify_api_method(mfa_challenge_url, payload, headers): +def okta_verify_api_method(mfa_challenge_url, payload, headers=None): """Okta MFA authentication. :param mfa_challenge_url: MFA challenge url @@ -33,17 +35,26 @@ def okta_verify_api_method(mfa_challenge_url, payload, headers): :param headers: Headers of the request :return: Okta authentication response """ - logging.debug("Okta MFA authentication URL [{}] headers [{}]".format( - mfa_challenge_url, headers)) - try: - response = json.loads(requests.request('POST', mfa_challenge_url, - data=json.dumps(payload), headers=headers).text) + if headers: + response = requests.request('POST', mfa_challenge_url, + data=json.dumps(payload), headers=headers) + else: + response = requests.request( + 'POST', mfa_challenge_url, data=payload) except Exception as request_error: logging.error( "There was an error connecting to Okta: \n{}".format(request_error)) sys.exit(1) + logging.debug("Okta authentication response: \n{}".format(response)) + + try: + response = response.json() + except ValueError: + logging.debug("Received type of response: {}".format(type(response.text))) + response = response.text + if 'errorCode' in response: error_string = "Exiting due to Okta API error [{}]\n{}".format( response['errorCode'], response['errorSummary']) @@ -71,14 +82,17 @@ def authenticate_user(okta_url, okta_username, okta_password): 'content-type': 'application/json', 'accept': 'application/json' } - payload = prepare_payload( + payload = helpers.prepare_payload( username=okta_username, password=okta_password) primary_auth = okta_verify_api_method( '{}/api/v1/authn'.format(okta_url), payload, headers) logging.debug("Authenticate Okta header [{}] ".format(headers)) - return user_mfa_challenge(headers, primary_auth) + session_token = user_mfa_challenge(headers, primary_auth) + + logging.info("User has been succesfully authenticated.") + return session_token def user_mfa_challenge(headers, primary_auth): @@ -115,22 +129,39 @@ def user_mfa_challenge(headers, primary_auth): logging.warning( "No MFA provided or provided MFA does not exist. [{}]".format( settings.mfa_method)) - mfa_index = select_preferred_mfa_index(mfa_options) + mfa_index = helpers.select_preferred_mfa_index(mfa_options) # time to challenge the mfa option selected_mfa_option = mfa_options[mfa_index] logging.debug("Selected MFA is [{}]".format(selected_mfa_option)) mfa_challenge_url = selected_mfa_option['_links']['verify']['href'] - payload = prepare_payload(stateToken=primary_auth['stateToken'], - factorType=selected_mfa_option['factorType'], - provider=selected_mfa_option['provider'], - profile=selected_mfa_option['profile']) - okta_verify_api_method(mfa_challenge_url, payload, headers) - logging.debug("mfa_challenge_url [{}] headers [{}]".format( + + payload = helpers.prepare_payload(stateToken=primary_auth['stateToken'], + factorType=selected_mfa_option['factorType'], + provider=selected_mfa_option['provider'], + profile=selected_mfa_option['profile']) + selected_factor = okta_verify_api_method( + mfa_challenge_url, payload, headers) + + mfa_provider = selected_factor["_embedded"]["factor"]["provider"].lower() + logging.debug("MFA Challenge URL: [{}] headers: {}".format( mfa_challenge_url, headers)) - mfa_verify = user_mfa_options(selected_mfa_option, - headers, mfa_challenge_url, payload, primary_auth) + + if mfa_provider == "duo": + payload, headers, callback_url = duo_helpers.authenticate_duo( + selected_factor) + okta_verify_api_method(callback_url, payload) + payload.pop("id", "sig_response") + mfa_verify = okta_verify_api_method( + mfa_challenge_url, payload, headers) + elif mfa_provider == "okta" or mfa_provider == "google": + mfa_verify = user_mfa_options(selected_mfa_option, + headers, mfa_challenge_url, payload, primary_auth) + else: + logging.error("Sorry, the MFA provider '{}' is not yet supported." + " Please retry with another option.".format(mfa_provider)) + exit(1) return mfa_verify['sessionToken'] @@ -157,37 +188,17 @@ def user_mfa_options(selected_mfa_option, if settings.mfa_response is None: logging.debug("Getting verification code from user.") print('Type verification code and press Enter') - settings.mfa_response = helpers.to_unicode(input('-> ')) + settings.mfa_response = helpers.get_input() # time to verify the mfa method - payload = prepare_payload( + payload = helpers.prepare_payload( stateToken=primary_auth['stateToken'], passCode=settings.mfa_response) mfa_verify = okta_verify_api_method(mfa_challenge_url, payload, headers) - logging.debug("mfa_verify [{}]".format(mfa_verify)) + logging.debug("mfa_verify [{}]".format(json.dumps(mfa_verify))) return mfa_verify -def prepare_payload(**kwargs): - """Prepare payload for the HTTP request header. - - :param kwargs: parameters to get together - :return: payload for the http header - - """ - logging.debug("Prepare payload") - - payload_dict = {} - if kwargs is not None: - for key, value in list(kwargs.items()): - payload_dict[key] = value - - if key != 'password': - logging.debug("Prepare payload [{} {}]".format(key, value)) - - return payload_dict - - def push_approval(headers, mfa_challenge_url, payload): """Handle push approval from the user. @@ -233,29 +244,3 @@ def push_approval(headers, mfa_challenge_url, payload): time.sleep(2) return mfa_verify - - -def select_preferred_mfa_index(mfa_options): - """Show all the MFA options to the users. - - :param mfa_options: List of available MFA options - :return: MFA option selected index by the user from the output - """ - logging.debug("Show all the MFA options to the users.") - print('\nSelect your preferred MFA method and press Enter') - for (mfa_counter, mfa_option) in enumerate(mfa_options): - print("[{}] {}".format(mfa_counter, mfa_option['factorType'])) - while True: - user_input = helpers.to_unicode(input('-> ')) - logging.debug("User input [{}]".format(user_input)) - - try: - user_input = int(user_input) - except ValueError as error: - print('Invalid input, try again.\n{}'.format(error)) - continue - if user_input in range(0, len(mfa_options)): - break - print('Invalid choice') - continue - return user_input