Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Push + Number Challenge #109

Merged
merged 1 commit into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ Tokendito is written and maintained by Cloud Security and Engineering at
Dow Jones, and various contributors:

## Dow Jones Cloud Security and Engineering

* Sydney Sweeney
* Jean-Pierre Sevigny
* Nico Halpern

## Patches and more

* Kuber Kaul
* Steve Stevenson
* Roman Sluzhynskyy
Expand Down
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ The following changes are part of this release:
- Automatically discover AWS URLs.
- Fix authentication with DUO.
- Add support for setting the logging level via both the INI file and ENV vars.
- Add support for Python 3.9 and 3.10.
- Add support for Python 3.9, 3.10, and 3.11.
- And many fixes.

Consult [additional notes](docs/README.md) for how to use tokendito.
Consult [additional notes](https://github.com/dowjones/tokendito/blob/main/docs/README.md) for how to use tokendito.

## Requirements

Expand All @@ -53,9 +53,10 @@ pip or pip3.
3. Run `tokendito`.

**NOTE**: Advanced users may shorten the `tokendito` interaction to a [single
command](docs/README.md#single-command-usage).
command](https://github.com/dowjones/tokendito/blob/main/docs/README.md#single-command-usage).

Have multiple Okta tiles to switch between? View our [multi-tile
guide](docs/README.md#multi-tile-guide).
guide](https://github.com/dowjones/tokendito/blob/main/docs/README.md#multi-tile-guide).

### Tips, tricks, troubleshooting, examples, and more docs are [here](docs/README.md)! Also, [contributions are welcome](docs/CONTRIBUTING.md)!
### Tips, tricks, troubleshooting, examples, and more docs are [here]()https://github.com/dowjones/tokendito/blob/main/docs/README.md!
[Contributions are welcome](https://github.com/dowjones/tokendito/blob/main/docs/CONTRIBUTING.md)!
58 changes: 48 additions & 10 deletions tests/unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,26 +839,64 @@ def test_user_mfa_challenge_with_no_mfas(sample_headers, sample_json_response):


@pytest.mark.parametrize(
"wrapper_return,expected",
"return_value,side_effect,expected",
[
({"factorResult": "REJECTED", "status": "FAILED"}, 2),
({"factorResult": "TIMEOUT", "status": "FAILED"}, 2),
({"status": "FAILED"}, 1),
({}, 1),
({"factorResult": "SUCCESS", "status": "SUCCESS"}, 0),
({"status": "SUCCESS", "sessionToken": "pytest"}, None, 0),
({"status": "SUCCESS", "sessionToken": "pytest", "factorResult": "SUCCESS"}, None, 0),
({"status": "MFA_CHALLENGE", "factorResult": "REJECTED"}, None, 2),
({"status": "MFA_CHALLENGE", "factorResult": "TIMEOUT"}, None, 2),
({"status": "UNKNOWN", "factorResult": "UNKNOWN"}, None, 2),
(
{
"status": "MFA_CHALLENGE",
"factorResult": "WAITING",
"_links": {"next": {"href": None}},
},
[
{
"status": "MFA_CHALLENGE",
"factorResult": "WAITING",
"_links": {"next": {"href": None}},
},
{"status": "SUCCESS", "sessionToken": "pytest", "factorResult": "SUCCESS"},
],
0,
),
(
{
"status": "MFA_CHALLENGE",
"factorResult": "WAITING",
"_embedded": {"factor": {"_embedded": {"challenge": {"correctAnswer": 100}}}},
"_links": {"next": {"href": None}},
},
[
{
"status": "MFA_CHALLENGE",
"factorResult": "WAITING",
"_embedded": {"factor": {"_embedded": {"challenge": {"correctAnswer": 100}}}},
"_links": {"next": {"href": None}},
},
{"status": "SUCCESS", "sessionToken": "pytest", "factorResult": "SUCCESS"},
],
0,
),
],
)
def test_push_approval(mocker, sample_headers, wrapper_return, expected):
def test_push_approval(mocker, sample_headers, return_value, side_effect, expected):
"""Test push approval."""
from tokendito import okta

challenge_url = "https://pytest/api/v1/authn/factors/factorid/verify"

mocker.patch("tokendito.okta.api_wrapper", return_value=wrapper_return)
mocker.patch("tokendito.okta.api_wrapper", return_value=return_value, side_effect=side_effect)
mocker.patch("time.sleep", return_value=0)

if "status" in wrapper_return and wrapper_return["status"] == "SUCCESS":
if "status" in return_value and return_value["status"] == "SUCCESS":
ret = okta.push_approval(sample_headers, challenge_url, None)
assert ret["status"] == "SUCCESS"
elif "factorResult" in return_value and return_value["factorResult"] == "WAITING":
ret = okta.push_approval(sample_headers, challenge_url, None)
assert ret["status"] == "SUCCESS" and ret["factorResult"] == "SUCCESS"
assert ret["status"] == "SUCCESS"
else:
with pytest.raises(SystemExit) as err:
okta.push_approval(sample_headers, challenge_url, None)
Expand Down
2 changes: 1 addition & 1 deletion tokendito/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from platformdirs import user_config_dir

__version__ = "2.0.0"
__version__ = "2.1.0"
__title__ = "tokendito"
__description__ = "Get AWS STS tokens from Okta SSO"
__long_description_content_type__ = "text/markdown"
Expand Down
78 changes: 49 additions & 29 deletions tokendito/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,36 +283,56 @@ def push_approval(headers, mfa_challenge_url, payload):
:return: Session Token if succeeded or terminates if user wait goes 5 min

"""
logger.debug(
f"Handle push approval from the user headers:{headers} challenge_url:{mfa_challenge_url}"
)
logger.debug(f"Push approval with headers:{headers} challenge_url:{mfa_challenge_url}")

user.print("Waiting for an approval from the device...")
mfa_status = "WAITING"
mfa_verify = {}
while mfa_status == "WAITING":
mfa_verify = api_wrapper(mfa_challenge_url, payload, headers)

logger.debug(f"MFA Response:\n{json.dumps(mfa_verify)}")

if "factorResult" in mfa_verify:
mfa_status = mfa_verify["factorResult"]
elif "status" in mfa_verify and mfa_verify["status"] == "SUCCESS":
break
else:
logger.error("There was an error getting your MFA status.")
logger.debug(f"{mfa_verify}")
if "status" in mfa_verify:
logger.error(f"Exiting due to error: {mfa_verify['status']}")
sys.exit(1)

if mfa_status == "REJECTED":
logger.error("The Okta Verify push has been denied. Please retry later.")
sys.exit(2)
elif mfa_status == "TIMEOUT":
logger.error("Device approval window has expired.")
sys.exit(2)

status = "MFA_CHALLENGE"
result = "WAITING"
response = {}
challenge_displayed = False

while status == "MFA_CHALLENGE" and result == "WAITING":
response = api_wrapper(mfa_challenge_url, payload, headers)
if "sessionToken" in response:
user.add_sensitive_value_to_be_masked(response["sessionToken"])

logger.debug(f"MFA Response:\n{json.dumps(response)}")
# Retrieve these values from the object, and set a sensible default if they do not
# exist.
status = response.get("status", "UNKNOWN")
result = response.get("factorResult", "UNKNOWN")

# The docs at https://developer.okta.com/docs/reference/api/authn/#verify-push-factor
# state that the call will return a factorResult in [ SUCCESS, REJECTED, TIMEOUT,
# WAITING]. However, on success, SUCCESS is not set and we have to rely on the
# response["status"] instead
answer = (
response.get("_embedded", {})
.get("factor", {})
.get("_embedded", {})
.get("challenge", {})
.get("correctAnswer", None)
)
if answer and not challenge_displayed:
# If a Number Challenge response exists, retrieve it from this deeply nested path,
# otherwise set to None.
user.print(f"Number Challenge response is {answer}")
challenge_displayed = True
time.sleep(1)

return mfa_verify
if status == "SUCCESS" and "sessionToken" in response:
# noop, we will return the variable later
pass
# Everything else should have a status of "MFA_CHALLENGE", and the result provides a
# hint on why the challenge failed.
elif result == "REJECTED":
logger.error("The Okta Verify push has been denied.")
sys.exit(2)
elif result == "TIMEOUT":
logger.error("Device approval window has expired.")
sys.exit(2)
else:
logger.error(f"Push response type {result} for {status} not implemented.")
sys.exit(2)

return response