diff --git a/docs/README.md b/docs/README.md index 9788f582..8618145e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -74,7 +74,7 @@ tokendito --profile engineer usage: tokendito [-h] [--version] [--configure] [--username OKTA_USERNAME] [--password OKTA_PASSWORD] [--profile USER_CONFIG_PROFILE] [--config-file USER_CONFIG_FILE] [--loglevel {DEBUG,INFO,WARN,ERROR}] [--log-output-file USER_LOG_OUTPUT_FILE] [--aws-config-file AWS_CONFIG_FILE] [--aws-output AWS_OUTPUT] [--aws-profile AWS_PROFILE] [--aws-region AWS_REGION] [--aws-role-arn AWS_ROLE_ARN] [--aws-shared-credentials-file AWS_SHARED_CREDENTIALS_FILE] - [--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--quiet] + [--okta-org OKTA_ORG | --okta-tile OKTA_TILE] [--okta-mfa OKTA_MFA] [--okta-mfa-response OKTA_MFA_RESPONSE] [--use-device-token] [--quiet] Gets an STS token to use with the AWS CLI and SDK. @@ -112,6 +112,7 @@ options: --okta-mfa OKTA_MFA Sets the MFA method --okta-mfa-response OKTA_MFA_RESPONSE Sets the MFA response to a challenge + --use-device-token Use device token across sessions --quiet Suppress output ``` @@ -153,6 +154,7 @@ The following table lists the environment variable and user configuration entry | `--okta-tile` | `TOKENDITO_OKTA_TILE` | `okta_tile` | | `--okta-mfa` | `TOKENDITO_OKTA_MFA` | `okta_mfa` | | `--okta-mfa-response` | `TOKENDITO_OKTA_MFA_RESPONSE` | `okta_mfa_response` | +| `--use-device-token` | `TOKENDITO_USER_USE_DEVICE_TOKEN` | `user_use_device_token` | | `--quiet` | `TOKENDITO_USER_QUIET` | `quiet` | # Configuration file location diff --git a/tests/unit/test_aws.py b/tests/unit/test_aws.py index 6f214c0b..25c6a426 100644 --- a/tests/unit/test_aws.py +++ b/tests/unit/test_aws.py @@ -94,6 +94,7 @@ def test_select_assumeable_role_no_tiles(): def test_authenticate_to_roles(status_code, monkeypatch): """Test if function return correct response.""" from tokendito.aws import authenticate_to_roles + from tokendito.config import Config import tokendito.http_client as http_client # Create a mock response object @@ -104,7 +105,12 @@ def test_authenticate_to_roles(status_code, monkeypatch): # Use monkeypatch to replace the HTTP_client.get method with the mock monkeypatch.setattr(http_client.HTTP_client, "get", lambda *args, **kwargs: mock_response) + pytest_config = Config( + okta={ + "org": "https://acme.okta.org/", + } + ) cookies = {"some_cookie": "some_value"} with pytest.raises(SystemExit): - authenticate_to_roles([("http://test.url.com", "")], cookies) + authenticate_to_roles(pytest_config, [("http://test.url.com", "")], cookies) diff --git a/tests/unit/test_http_client.py b/tests/unit/test_http_client.py index aa13927d..46058332 100644 --- a/tests/unit/test_http_client.py +++ b/tests/unit/test_http_client.py @@ -155,3 +155,22 @@ def test_post_logging_on_exception(client, mocker): with pytest.raises(SystemExit): client.post("http://test.com", json={"key": "value"}) mock_logger.assert_called() + + +def test_get_device_token(client): + """Test getting device token from the session.""" + device_token = "test-device-token" + cookies = {"DT": device_token} + client.set_cookies(cookies) + + # Check if the device token is set correctly in the session + assert client.get_device_token() == device_token + + +def test_set_device_token(client): + """Test setting device token in the session.""" + device_token = "test-device-token" + client.set_device_token("http://test.com", device_token) + + # Check if the device token is set correctly in the session + assert client.session.cookies.get("DT") == device_token diff --git a/tests/unit/test_okta.py b/tests/unit/test_okta.py index b2769b22..fef61c32 100644 --- a/tests/unit/test_okta.py +++ b/tests/unit/test_okta.py @@ -540,6 +540,55 @@ def test_authenticate(mocker): assert okta.authenticate(pytest_config) == error +def test_step_up_authenticate(mocker): + """Test set up authenticate method.""" + from tokendito import okta + from tokendito.config import Config + from tokendito.http_client import HTTP_client + + pytest_config = Config( + okta={ + "username": "pytest", + "org": "https://acme.okta.org/", + } + ) + + state_token = "test-state-token" + + # Test missing auth type + mocker.patch("tokendito.okta.get_auth_properties", return_value={}) + assert okta.step_up_authenticate(pytest_config, state_token) is False + + # Test unsupported auth type + mocker.patch("tokendito.okta.get_auth_properties", return_value={"type": "SAML2"}) + assert okta.step_up_authenticate(pytest_config, state_token) is False + + # Test supported auth type... + mocker.patch("tokendito.okta.get_auth_properties", return_value={"type": "OKTA"}) + + # ...with SUCCESS status + mock_response_data = {"status": "SUCCESS"} + mocker.patch.object(HTTP_client, "post", return_value=mock_response_data) + + assert okta.step_up_authenticate(pytest_config, state_token) is True + + # ...with MFA_REQUIRED status + mock_response_data = {"status": "MFA_REQUIRED"} + mocker.patch.object(HTTP_client, "post", return_value=mock_response_data) + patched_mfa_challenge = mocker.patch.object( + okta, "mfa_challenge", return_value="test-session-token" + ) + + assert okta.step_up_authenticate(pytest_config, state_token) is True + assert patched_mfa_challenge.call_count == 1 + + # ...with unknown status + mock_response_data = {"status": "unknown"} + mocker.patch.object(HTTP_client, "post", return_value=mock_response_data) + + assert okta.step_up_authenticate(pytest_config, state_token) is False + + def test_local_auth(mocker): """Test local auth method.""" from tokendito import okta diff --git a/tests/unit/test_user.py b/tests/unit/test_user.py index 1e80835f..0cb823af 100644 --- a/tests/unit/test_user.py +++ b/tests/unit/test_user.py @@ -466,6 +466,26 @@ def test_update_configuration(tmpdir): assert ret.okta["mfa"] == "pytest" +def test_update_device_token(tmpdir): + """Test writing and reading device token to a configuration file.""" + from tokendito import user + from tokendito.config import Config + + path = tmpdir.mkdir("pytest").join("pytest_tokendito.ini") + + device_token = "test-device-token" + + pytest_config = Config( + okta={"device_token": device_token}, + user={"config_file": path, "config_profile": "pytest"}, + ) + + # Write out a config file via configure() and ensure it's functional + user.update_device_token(pytest_config) + ret = user.process_ini_file(path, "pytest") + assert ret.okta["device_token"] == device_token + + def test_process_ini_file(tmpdir): """Test whether ini config elements are set correctly. diff --git a/tokendito/aws.py b/tokendito/aws.py index ef5634be..9bfc66be 100644 --- a/tokendito/aws.py +++ b/tokendito/aws.py @@ -47,7 +47,7 @@ def get_output_types(): return ["json", "text", "csv", "yaml", "yaml-stream"] -def authenticate_to_roles(urls, cookies=None): +def authenticate_to_roles(config, urls, cookies=None): """Authenticate AWS user with saml. :param urls: list of tuples or tuple, with tiles info @@ -67,11 +67,22 @@ def authenticate_to_roles(urls, cookies=None): logger.info(f"Discovering roles in {tile_count} tile{plural}.") for url, label in url_list: response = HTTP_client.get(url) # Use the HTTPClient's get method + + session_url = config.okta["org"] + "/login/sessionCookieRedirect" + params = {"token": cookies.get("sessionToken"), "redirectUrl": url} + + response = HTTP_client.get(session_url, params=params) + saml_response_string = response.text saml_xml = okta.extract_saml_response(saml_response_string) if not saml_xml: - if "Extra Verification" in saml_response_string: + state_token = okta.extract_state_token(saml_response_string) + if "Extra Verification" in saml_response_string and state_token: + logger.info(f"Step-Up authentication required for {url}.") + if okta.step_up_authenticate(config, state_token): + return authenticate_to_roles(config, urls, cookies) + logger.error("Step-Up Authentication required, but not supported.") elif "App Access Locked" in saml_response_string: logger.error( diff --git a/tokendito/config.py b/tokendito/config.py index eedb3027..c06471a6 100644 --- a/tokendito/config.py +++ b/tokendito/config.py @@ -29,6 +29,7 @@ class Config(object): encoding=_default_encoding, loglevel="INFO", log_output_file="", + use_device_token=False, mask_items=[], quiet=False, ), @@ -47,6 +48,7 @@ class Config(object): mfa_response=None, tile=None, org=None, + device_token=None, ), ) diff --git a/tokendito/http_client.py b/tokendito/http_client.py index cb67acc1..bd8f1cd2 100644 --- a/tokendito/http_client.py +++ b/tokendito/http_client.py @@ -4,6 +4,7 @@ import logging import sys +from urllib.parse import urlparse import requests from tokendito import __title__ @@ -75,5 +76,24 @@ def reset(self): self.session.headers = requests.utils.default_headers() self.session.headers.update({"User-Agent": user_agent}) + def get_device_token(self): + """Get the device token from the current session cookies. + + :return: Device token or None + """ + return self.session.cookies.get("DT", None) + + def set_device_token(self, org_url, device_token): + """Set the device token in the current session cookies. + + :param org_url: The organization URL + :param device_token: The device token + :return: None + """ + if not device_token: + return + + self.session.cookies.set("DT", device_token, domain=urlparse(org_url).netloc, path="/") + HTTP_client = HTTPClient() diff --git a/tokendito/okta.py b/tokendito/okta.py index e23e5981..10e94ad5 100644 --- a/tokendito/okta.py +++ b/tokendito/okta.py @@ -258,6 +258,35 @@ def authenticate(config): return sid +def step_up_authenticate(config, state_token): + """Try to step up authenticate the user. Only supported for local auth. + + :param config: Configuration object + :param state_token: The state token + :return: True if step up authentication was successful; False otherwise + """ + auth_properties = get_auth_properties(userid=config.okta["username"], url=config.okta["org"]) + if "type" not in auth_properties or not is_local_auth(auth_properties): + return False + + headers = {"content-type": "application/json", "accept": "application/json"} + payload = {"stateToken": state_token} + + auth = HTTP_client.post( + f"{config.okta['org']}/api/v1/authn", json=payload, headers=headers, return_json=True + ) + + status = auth.get("status", None) + if status == "SUCCESS": + return True + elif status == "MFA_REQUIRED": + mfa_challenge(config, headers, auth) + return True + + logger.error("Okta auth failed: unknown status for step up authentication.") + return False + + def is_local_auth(auth_properties): """Check whether authentication happens locally. @@ -429,7 +458,7 @@ def extract_state_token(html): state_token = None pattern = re.compile(r"var stateToken = '(?P.*)';", re.MULTILINE) - script = soup.find("script", text=pattern) + script = soup.find("script", string=pattern) if type(script) is bs4.element.Tag: match = pattern.search(script.text) if match: @@ -605,6 +634,9 @@ def totp_approval(config, selected_mfa_option, headers, mfa_challenge_url, paylo user.add_sensitive_value_to_be_masked(mfa_verify["sessionToken"]) logger.debug(f"mfa_verify [{json.dumps(mfa_verify)}]") + # Clear out any MFA response since it is no longer valid + config.okta["mfa_response"] = None + return mfa_verify diff --git a/tokendito/tool.py b/tokendito/tool.py index c3477a8a..2c3bd0fa 100644 --- a/tokendito/tool.py +++ b/tokendito/tool.py @@ -38,6 +38,16 @@ def cli(args): ) sys.exit(1) + if config.user["use_device_token"]: + device_token = config.okta["device_token"] + if device_token: + HTTP_client.set_device_token(config.okta["org"], device_token) + else: + logger.warning( + f"Device token unavailable for config profile {args.user_config_profile}. " + "May see multiple MFA requests this time." + ) + # Authenticate to okta session_cookies = okta.authenticate(config) @@ -50,7 +60,7 @@ def cli(args): config.okta["tile"] = user.discover_tiles(config.okta["org"]) # Authenticate to AWS roles - auth_tiles = aws.authenticate_to_roles(config.okta["tile"], cookies=session_cookies) + auth_tiles = aws.authenticate_to_roles(config, config.okta["tile"], cookies=session_cookies) (role_response, role_name) = aws.select_assumeable_role(auth_tiles) @@ -70,4 +80,10 @@ def cli(args): output=config.aws["output"], ) + device_token = HTTP_client.get_device_token() + if config.user["use_device_token"] and device_token: + logger.info(f"Saving device token to config profile {args.user_config_profile}") + config.okta["device_token"] = device_token + user.update_device_token(config) + user.display_selected_role(profile_name=config.aws["profile"], role_response=role_response) diff --git a/tokendito/user.py b/tokendito/user.py index 955964c6..fd1cfc9b 100644 --- a/tokendito/user.py +++ b/tokendito/user.py @@ -144,6 +144,13 @@ def parse_cli_args(args): help="Sets the MFA response to a challenge. You " "can also use the TOKENDITO_OKTA_MFA_RESPONSE environment variable.", ) + parser.add_argument( + "--use-device-token", + dest="user_use_device_token", + action="store_true", + default=False, + help="Use device token across sessions", + ) parser.add_argument( "--quiet", dest="user_quiet", @@ -911,6 +918,27 @@ def update_configuration(config): logger.info(f"Updated {ini_file} with profile {profile}") +def update_device_token(config): + """Update configuration file on local system with device token. + + :param config: the current configuration + :return: None + """ + logger.debug("Update configuration file on local system with device token.") + ini_file = config.user["config_file"] + profile = config.user["config_profile"] + + contents = {} + # Copy relevant parts of the configuration into an dictionary that + # will be written out to disk + if "device_token" in config.okta and config.okta["device_token"] is not None: + contents["okta_device_token"] = config.okta["device_token"] + + logger.debug(f"Adding {contents} to config file.") + update_ini(profile=profile, ini_file=ini_file, **contents) + logger.info(f"Updated {ini_file} with profile {profile}") + + def set_local_credentials(response={}, role="default", region="us-east-1", output="json"): """Write to local files to insert credentials. @@ -1227,8 +1255,11 @@ def request_cookies(url, session_token): add_sensitive_value_to_be_masked(sess_id) # create cookies with sid 'sid'. + domain = urlparse(url).netloc + cookies = requests.cookies.RequestsCookieJar() - cookies.set("sid", sess_id, domain=urlparse(url).netloc, path="/") + cookies.set("sid", sess_id, domain=domain, path="/") + cookies.set("sessionToken", session_token, domain=domain, path="/") # Log the session cookies. logger.debug(f"Received session cookies: {cookies}")