From 89e5d3554ca60e45ea73a4192781049cc093c28f Mon Sep 17 00:00:00 2001 From: pcmxgti <16561338+pcmxgti@users.noreply.github.com> Date: Mon, 23 Jan 2023 10:49:28 -0500 Subject: [PATCH] Support Push + Number Challenge --- AUTHORS.md | 4 +++ README.md | 11 +++--- tests/unit_test.py | 58 ++++++++++++++++++++++++++------ tokendito/__init__.py | 2 +- tokendito/okta.py | 78 +++++++++++++++++++++++++++---------------- 5 files changed, 108 insertions(+), 45 deletions(-) diff --git a/AUTHORS.md b/AUTHORS.md index ee3b7b80..9fd3c274 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -2,9 +2,13 @@ Tokendito is written and maintained by Cloud Security and Engineering at Dow Jones, and various contributors: ## Dow Jones Cloud Security and Engineering + * Sydney Sweeney +* Jean-Pierre Sevigny * Nico Halpern + ## Patches and more + * Kuber Kaul * Steve Stevenson * Roman Sluzhynskyy diff --git a/README.md b/README.md index 8c025459..55b2487f 100644 --- a/README.md +++ b/README.md @@ -33,10 +33,10 @@ The following changes are part of this release: - Automatically discover AWS URLs. - Fix authentication with DUO. - Add support for setting the logging level via both the INI file and ENV vars. -- Add support for Python 3.9 and 3.10. +- Add support for Python 3.9, 3.10, and 3.11. - And many fixes. -Consult [additional notes](docs/README.md) for how to use tokendito. +Consult [additional notes](https://github.com/dowjones/tokendito/blob/main/docs/README.md) for how to use tokendito. ## Requirements @@ -53,9 +53,10 @@ pip or pip3. 3. Run `tokendito`. **NOTE**: Advanced users may shorten the `tokendito` interaction to a [single -command](docs/README.md#single-command-usage). +command](https://github.com/dowjones/tokendito/blob/main/docs/README.md#single-command-usage). Have multiple Okta tiles to switch between? View our [multi-tile -guide](docs/README.md#multi-tile-guide). +guide](https://github.com/dowjones/tokendito/blob/main/docs/README.md#multi-tile-guide). -### Tips, tricks, troubleshooting, examples, and more docs are [here](docs/README.md)! Also, [contributions are welcome](docs/CONTRIBUTING.md)! +### Tips, tricks, troubleshooting, examples, and more docs are [here]()https://github.com/dowjones/tokendito/blob/main/docs/README.md! +[Contributions are welcome](https://github.com/dowjones/tokendito/blob/main/docs/CONTRIBUTING.md)! diff --git a/tests/unit_test.py b/tests/unit_test.py index 9e199d22..a1c087b1 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -839,26 +839,64 @@ def test_user_mfa_challenge_with_no_mfas(sample_headers, sample_json_response): @pytest.mark.parametrize( - "wrapper_return,expected", + "return_value,side_effect,expected", [ - ({"factorResult": "REJECTED", "status": "FAILED"}, 2), - ({"factorResult": "TIMEOUT", "status": "FAILED"}, 2), - ({"status": "FAILED"}, 1), - ({}, 1), - ({"factorResult": "SUCCESS", "status": "SUCCESS"}, 0), + ({"status": "SUCCESS", "sessionToken": "pytest"}, None, 0), + ({"status": "SUCCESS", "sessionToken": "pytest", "factorResult": "SUCCESS"}, None, 0), + ({"status": "MFA_CHALLENGE", "factorResult": "REJECTED"}, None, 2), + ({"status": "MFA_CHALLENGE", "factorResult": "TIMEOUT"}, None, 2), + ({"status": "UNKNOWN", "factorResult": "UNKNOWN"}, None, 2), + ( + { + "status": "MFA_CHALLENGE", + "factorResult": "WAITING", + "_links": {"next": {"href": None}}, + }, + [ + { + "status": "MFA_CHALLENGE", + "factorResult": "WAITING", + "_links": {"next": {"href": None}}, + }, + {"status": "SUCCESS", "sessionToken": "pytest", "factorResult": "SUCCESS"}, + ], + 0, + ), + ( + { + "status": "MFA_CHALLENGE", + "factorResult": "WAITING", + "_embedded": {"factor": {"_embedded": {"challenge": {"correctAnswer": 100}}}}, + "_links": {"next": {"href": None}}, + }, + [ + { + "status": "MFA_CHALLENGE", + "factorResult": "WAITING", + "_embedded": {"factor": {"_embedded": {"challenge": {"correctAnswer": 100}}}}, + "_links": {"next": {"href": None}}, + }, + {"status": "SUCCESS", "sessionToken": "pytest", "factorResult": "SUCCESS"}, + ], + 0, + ), ], ) -def test_push_approval(mocker, sample_headers, wrapper_return, expected): +def test_push_approval(mocker, sample_headers, return_value, side_effect, expected): """Test push approval.""" from tokendito import okta challenge_url = "https://pytest/api/v1/authn/factors/factorid/verify" - mocker.patch("tokendito.okta.api_wrapper", return_value=wrapper_return) + mocker.patch("tokendito.okta.api_wrapper", return_value=return_value, side_effect=side_effect) + mocker.patch("time.sleep", return_value=0) - if "status" in wrapper_return and wrapper_return["status"] == "SUCCESS": + if "status" in return_value and return_value["status"] == "SUCCESS": + ret = okta.push_approval(sample_headers, challenge_url, None) + assert ret["status"] == "SUCCESS" + elif "factorResult" in return_value and return_value["factorResult"] == "WAITING": ret = okta.push_approval(sample_headers, challenge_url, None) - assert ret["status"] == "SUCCESS" and ret["factorResult"] == "SUCCESS" + assert ret["status"] == "SUCCESS" else: with pytest.raises(SystemExit) as err: okta.push_approval(sample_headers, challenge_url, None) diff --git a/tokendito/__init__.py b/tokendito/__init__.py index 6ba5bfa3..d3524c4b 100644 --- a/tokendito/__init__.py +++ b/tokendito/__init__.py @@ -8,7 +8,7 @@ from platformdirs import user_config_dir -__version__ = "2.0.0" +__version__ = "2.1.0" __title__ = "tokendito" __description__ = "Get AWS STS tokens from Okta SSO" __long_description_content_type__ = "text/markdown" diff --git a/tokendito/okta.py b/tokendito/okta.py index 3c8a82a5..428c68d3 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -283,36 +283,56 @@ def push_approval(headers, mfa_challenge_url, payload): :return: Session Token if succeeded or terminates if user wait goes 5 min """ - logger.debug( - f"Handle push approval from the user headers:{headers} challenge_url:{mfa_challenge_url}" - ) + logger.debug(f"Push approval with headers:{headers} challenge_url:{mfa_challenge_url}") user.print("Waiting for an approval from the device...") - mfa_status = "WAITING" - mfa_verify = {} - while mfa_status == "WAITING": - mfa_verify = api_wrapper(mfa_challenge_url, payload, headers) - - logger.debug(f"MFA Response:\n{json.dumps(mfa_verify)}") - - if "factorResult" in mfa_verify: - mfa_status = mfa_verify["factorResult"] - elif "status" in mfa_verify and mfa_verify["status"] == "SUCCESS": - break - else: - logger.error("There was an error getting your MFA status.") - logger.debug(f"{mfa_verify}") - if "status" in mfa_verify: - logger.error(f"Exiting due to error: {mfa_verify['status']}") - sys.exit(1) - - if mfa_status == "REJECTED": - logger.error("The Okta Verify push has been denied. Please retry later.") - sys.exit(2) - elif mfa_status == "TIMEOUT": - logger.error("Device approval window has expired.") - sys.exit(2) - + status = "MFA_CHALLENGE" + result = "WAITING" + response = {} + challenge_displayed = False + + while status == "MFA_CHALLENGE" and result == "WAITING": + response = api_wrapper(mfa_challenge_url, payload, headers) + if "sessionToken" in response: + user.add_sensitive_value_to_be_masked(response["sessionToken"]) + + logger.debug(f"MFA Response:\n{json.dumps(response)}") + # Retrieve these values from the object, and set a sensible default if they do not + # exist. + status = response.get("status", "UNKNOWN") + result = response.get("factorResult", "UNKNOWN") + + # The docs at https://developer.okta.com/docs/reference/api/authn/#verify-push-factor + # state that the call will return a factorResult in [ SUCCESS, REJECTED, TIMEOUT, + # WAITING]. However, on success, SUCCESS is not set and we have to rely on the + # response["status"] instead + answer = ( + response.get("_embedded", {}) + .get("factor", {}) + .get("_embedded", {}) + .get("challenge", {}) + .get("correctAnswer", None) + ) + if answer and not challenge_displayed: + # If a Number Challenge response exists, retrieve it from this deeply nested path, + # otherwise set to None. + user.print(f"Number Challenge response is {answer}") + challenge_displayed = True time.sleep(1) - return mfa_verify + if status == "SUCCESS" and "sessionToken" in response: + # noop, we will return the variable later + pass + # Everything else should have a status of "MFA_CHALLENGE", and the result provides a + # hint on why the challenge failed. + elif result == "REJECTED": + logger.error("The Okta Verify push has been denied.") + sys.exit(2) + elif result == "TIMEOUT": + logger.error("Device approval window has expired.") + sys.exit(2) + else: + logger.error(f"Push response type {result} for {status} not implemented.") + sys.exit(2) + + return response