-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add support for step up authentication #140
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -258,6 +258,35 @@ def authenticate(config): | |
return sid | ||
|
||
|
||
def step_up_authenticate(config, state_token): | ||
"""Try to step up authenticate the user. Only supported for local auth. | ||
|
||
:param config: Configuration object | ||
:param state_token: The state token | ||
:return: True if step up authentication was successful; False otherwise | ||
""" | ||
auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"]) | ||
if "type" not in auth_properties or not is_local_auth(auth_properties): | ||
return False | ||
|
||
headers = {"content-type": "application/json", "accept": "application/json"} | ||
payload = {"stateToken": state_token} | ||
|
||
auth = HTTP_client.post( | ||
f"{config.okta['org']}/api/v1/authn", json=payload, headers=headers, return_json=True | ||
) | ||
|
||
status = auth.get("status", None) | ||
if status == "SUCCESS": | ||
return True | ||
elif status == "MFA_REQUIRED": | ||
mfa_challenge(config, headers, auth) | ||
return True | ||
|
||
logger.error("Okta auth failed: unknown status for step up authentication.") | ||
return False | ||
|
||
|
||
def is_local_auth(auth_properties): | ||
"""Check whether authentication happens locally. | ||
|
||
|
@@ -429,7 +458,7 @@ def extract_state_token(html): | |
state_token = None | ||
pattern = re.compile(r"var stateToken = '(?P<stateToken>.*)';", re.MULTILINE) | ||
|
||
script = soup.find("script", text=pattern) | ||
script = soup.find("script", string=pattern) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Before:
After:
|
||
if type(script) is bs4.element.Tag: | ||
match = pattern.search(script.text) | ||
if match: | ||
|
@@ -605,6 +634,9 @@ def totp_approval(config, selected_mfa_option, headers, mfa_challenge_url, paylo | |
user.add_sensitive_value_to_be_masked(mfa_verify["sessionToken"]) | ||
logger.debug(f"mfa_verify [{json.dumps(mfa_verify)}]") | ||
|
||
# Clear out any MFA response since it is no longer valid | ||
config.okta["mfa_response"] = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ℹ️ This should address the re-use of MFA response during step up authentication. By putting it here, I'm hoping this will catch any future™ downstream re-use issues as well. |
||
|
||
return mfa_verify | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -144,6 +144,13 @@ def parse_cli_args(args): | |
help="Sets the MFA response to a challenge. You " | ||
"can also use the TOKENDITO_OKTA_MFA_RESPONSE environment variable.", | ||
) | ||
parser.add_argument( | ||
"--use-device-token", | ||
dest="user_use_device_token", | ||
action="store_true", | ||
default=False, | ||
help="Use device token across sessions", | ||
) | ||
parser.add_argument( | ||
"--quiet", | ||
dest="user_quiet", | ||
|
@@ -911,6 +918,27 @@ def update_configuration(config): | |
logger.info(f"Updated {ini_file} with profile {profile}") | ||
|
||
|
||
def update_device_token(config): | ||
"""Update configuration file on local system with device token. | ||
|
||
:param config: the current configuration | ||
:return: None | ||
""" | ||
logger.debug("Update configuration file on local system with device token.") | ||
ini_file = config.user["config_file"] | ||
profile = config.user["config_profile"] | ||
|
||
contents = {} | ||
# Copy relevant parts of the configuration into an dictionary that | ||
# will be written out to disk | ||
if "device_token" in config.okta and config.okta["device_token"] is not None: | ||
contents["okta_device_token"] = config.okta["device_token"] | ||
|
||
logger.debug(f"Adding {contents} to config file.") | ||
update_ini(profile=profile, ini_file=ini_file, **contents) | ||
logger.info(f"Updated {ini_file} with profile {profile}") | ||
|
||
|
||
def set_local_credentials(response={}, role="default", region="us-east-1", output="json"): | ||
"""Write to local files to insert credentials. | ||
|
||
|
@@ -1227,8 +1255,11 @@ def request_cookies(url, session_token): | |
add_sensitive_value_to_be_masked(sess_id) | ||
|
||
# create cookies with sid 'sid'. | ||
domain = urlparse(url).netloc | ||
|
||
cookies = requests.cookies.RequestsCookieJar() | ||
cookies.set("sid", sess_id, domain=urlparse(url).netloc, path="/") | ||
cookies.set("sid", sess_id, domain=domain, path="/") | ||
cookies.set("sessionToken", session_token, domain=domain, path="/") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ℹ️ This is necessary as the session token is needed for the later call to |
||
|
||
# Log the session cookies. | ||
logger.debug(f"Received session cookies: {cookies}") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ I only added support for
local_auth
since I do not have access to asaml2
environment to test appropriately.