From 9f4d4b2b92a9e4bdb542c8009b74b6fc5ddf24d3 Mon Sep 17 00:00:00 2001 From: Oleksii Burdash Date: Mon, 19 Apr 2021 14:56:02 +0300 Subject: [PATCH] Improve code readability --- .github/workflows/cicd.yml | 16 +- README.rst | 3 + requirements-dev.txt | 2 + requirements.txt | 1 + setup.py | 62 +++--- tests/conftest.py | 37 ++-- tests/functional_test.py | 163 ++++++++++------ tests/unit_test.py | 250 +++++++++++++++--------- tokendito/__init__.py | 2 +- tokendito/__main__.py | 13 +- tokendito/__version__.py | 16 +- tokendito/aws_helpers.py | 106 +++++----- tokendito/duo_helpers.py | 138 ++++++++----- tokendito/helpers.py | 386 +++++++++++++++++++++++-------------- tokendito/okta_helpers.py | 153 ++++++++------- tokendito/settings.py | 25 ++- tokendito/tokendito.py | 13 +- tokendito/tool.py | 24 ++- tox.ini | 9 +- 19 files changed, 833 insertions(+), 586 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index e0cb6e31..8e95aa96 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -17,7 +17,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@master + uses: actions/checkout@v2 - name: Set up Python uses: actions/setup-python@v1 with: @@ -32,14 +32,14 @@ jobs: test: name: Tox Tests needs: lint - runs-on: ${{ matrix.operating-system }} + runs-on: ${{ matrix.os }} strategy: matrix: - operating-system: [ubuntu-latest, windows-latest, macos-latest] + os: [ubuntu-latest, windows-latest, macos-latest] python-version: [2.7, 3.5, 3.6, 3.7, 3.8] steps: - name: Checkout - uses: actions/checkout@master + uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v1 with: @@ -59,9 +59,9 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v1 + uses: actions/checkout@v2 - name: Set up Python - uses: actions/setup-python@v1 + uses: actions/setup-python@v2 with: python-version: '3.x' - name: Install dependencies @@ -89,9 +89,9 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v1 + uses: actions/checkout@v2 - name: Set up Python - uses: actions/setup-python@v1 + uses: actions/setup-python@v2 with: python-version: '3.x' - name: Install dependencies diff --git a/README.rst b/README.rst index a6be136b..ede57dbc 100644 --- a/README.rst +++ b/README.rst @@ -20,6 +20,9 @@ Generate temporary AWS credentials via Okta. .. image:: https://raw.githubusercontent.com/dowjones/tokendito/master/docs/tokendito-scaled.gif + +**WARNING: Python2.7 and Python3.5 support will be dropped in a near time, pin tokendito version to current if you want to keep it working in automation.** + NOTE: Advanced users may shorten the tokendito interaction to a `single command `_. .. _STS: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html diff --git a/requirements-dev.txt b/requirements-dev.txt index 8a90d8e8..48324a4f 100755 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,6 +1,8 @@ -r ./requirements.txt +black==20.8b1; python_version >= '3.6' docutils<0.16,>=0.10 flake8 +flake8-black; python_version >= '3.6' flake8-colors flake8-docstrings flake8-import-order>=0.9 diff --git a/requirements.txt b/requirements.txt index 20e3800d..4bb4d937 100755 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,6 @@ requests>=2.19.0 configparser>=3.5.0 future>=0.16.0 pyOpenSSL>=18.0.0 +cryptography==3.3.2; python_version == '2.7' beautifulsoup4>=4.6.0 lxml>=4.3.0 diff --git a/setup.py b/setup.py index d2d3e3c2..49e4b0ea 100755 --- a/setup.py +++ b/setup.py @@ -14,54 +14,54 @@ here = path.abspath(path.dirname(__file__)) -with open(path.join(here, 'README.rst'), encoding=sys.stdin.encoding) as f: +with open(path.join(here, "README.rst"), encoding=sys.stdin.encoding) as f: long_description = f.read() -with open('requirements.txt') as f: +with open("requirements.txt") as f: required = f.read().splitlines() about = {} -with open(os.path.join(here, 'tokendito', '__version__.py'), 'r') as f: +with open(os.path.join(here, "tokendito", "__version__.py"), "r") as f: exec(f.read(), about) -if 'DEVBUILD' in os.environ: +if "DEVBUILD" in os.environ: now = datetime.datetime.now() - about['__version__'] = about['__version__'] + '.dev' + now.strftime('%Y%m%d%H%M%S') + about["__version__"] = about["__version__"] + ".dev" + now.strftime("%Y%m%d%H%M%S") setup( - name='tokendito', - version=about['__version__'], - description=about['__description__'], + name="tokendito", + version=about["__version__"], + description=about["__description__"], long_description=long_description, - long_description_content_type=about['__long_description_content_type__'], - url=about['__url__'], - author=about['__author__'], - author_email=about['__author_email__'], + long_description_content_type=about["__long_description_content_type__"], + url=about["__url__"], + author=about["__author__"], + author_email=about["__author_email__"], classifiers=[ - 'License :: OSI Approved :: Apache Software License', - 'Development Status :: 5 - Production/Stable', - 'Operating System :: OS Independent', - 'Environment :: Console', - 'Programming Language :: Python', - 'Intended Audience :: End Users/Desktop', - 'Intended Audience :: Developers', - 'Intended Audience :: System Administrators', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', + "License :: OSI Approved :: Apache Software License", + "Development Status :: 5 - Production/Stable", + "Operating System :: OS Independent", + "Environment :: Console", + "Programming Language :: Python", + "Intended Audience :: End Users/Desktop", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], - keywords=['okta', 'aws', 'sts'], - packages=find_packages(exclude=['contrib', 'docs', 'tests', '.tox']), + keywords=["okta", "aws", "sts"], + packages=find_packages(exclude=["contrib", "docs", "tests", ".tox"]), python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*", - license=about['__license__'], + license=about["__license__"], zip_safe=False, install_requires=[required], entry_points={ - 'console_scripts': ['tokendito=tokendito.__main__:main'], + "console_scripts": ["tokendito=tokendito.__main__:main"], }, # $ pip install -e . [dev,test] ) diff --git a/tests/conftest.py b/tests/conftest.py index 16b1a286..69986342 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,7 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """pytest configuration, hooks, and global fixtures.""" -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals from future import standard_library @@ -11,20 +10,20 @@ def pytest_addoption(parser): """Add command-line option for running functional tests.""" - parser.addoption("--run-functional", action="store_true", - default=False, help="run functional tests") - parser.addoption('--username', - help='username to login to Okta') - parser.addoption('--password', - help='password to login to Okta.') - parser.addoption('--okta-aws-app-url', - help='Okta App URL to use.') - parser.addoption('--mfa-method', - help='Sets the MFA method') - parser.addoption('--mfa-response', - help='Sets the MFA response to a challenge') - parser.addoption('--role-arn', - help='Sets the IAM role') - parser.addoption('--config-file', - default='/dev/null', - help='Sets an optional config file to read from') + parser.addoption( + "--run-functional", + action="store_true", + default=False, + help="run functional tests", + ) + parser.addoption("--username", help="username to login to Okta") + parser.addoption("--password", help="password to login to Okta.") + parser.addoption("--okta-aws-app-url", help="Okta App URL to use.") + parser.addoption("--mfa-method", help="Sets the MFA method") + parser.addoption("--mfa-response", help="Sets the MFA response to a challenge") + parser.addoption("--role-arn", help="Sets the IAM role") + parser.addoption( + "--config-file", + default="/dev/null", + help="Sets an optional config file to read from", + ) diff --git a/tests/functional_test.py b/tests/functional_test.py index 261b6c91..4de84b56 100644 --- a/tests/functional_test.py +++ b/tests/functional_test.py @@ -1,12 +1,30 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """Functional tests, and local fixtures.""" -from __future__ import (absolute_import, division, - print_function, unicode_literals) - -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) +from __future__ import absolute_import, division, print_function, unicode_literals + +from builtins import ( # noqa: F401 + ascii, + bytes, + chr, + dict, + filter, + hex, + input, + int, + list, + map, + next, + object, + oct, + open, + pow, + range, + round, + str, + super, + zip, +) from os import path import re import subprocess @@ -31,7 +49,7 @@ def string_decode(bytestring): """ decoded_string = bytestring try: - decoded_string = bytestring.decode('utf-8') + decoded_string = bytestring.decode("utf-8") except (NameError, TypeError): pass @@ -47,10 +65,10 @@ def run_process(proc): (stdoutdata, stderrdata) = process.communicate() proc_status = { - 'stdout': string_decode(stdoutdata), - 'stderr': string_decode(stderrdata), - 'name': ' '.join(proc), - 'exit_status': process.returncode + "stdout": string_decode(stdoutdata), + "stderr": string_decode(stderrdata), + "name": " ".join(proc), + "exit_status": process.returncode, } return proc_status @@ -58,7 +76,7 @@ def run_process(proc): @pytest.fixture def package_regex(): """Get compiled package regex.""" - version_regex = re.compile(r'^\S+/(?P.*?)\s+.*$') + version_regex = re.compile(r"^\S+/(?P.*?)\s+.*$") return version_regex @@ -66,15 +84,22 @@ def package_regex(): def package_version(): """Run test with access to the Tokendito package.""" from tokendito.__version__ import __version__ as tokendito_version + return tokendito_version @pytest.fixture def custom_args(request): """Search the custom command-line options and return a list of keys and values.""" - options = ['--username', '--password', '--okta-aws-app-url', - '--mfa-method', '--mfa-response', '--role-arn', - '--config-file'] + options = [ + "--username", + "--password", + "--okta-aws-app-url", + "--mfa-method", + "--mfa-response", + "--role-arn", + "--config-file", + ] arg_list = [] # pytest does not have a method for listing options, so we have look them up. for item in options: @@ -83,92 +108,108 @@ def custom_args(request): return arg_list -@pytest.mark.run('first') +@pytest.mark.run("first") def test_package_uninstall(): """Uninstall tokendito if it is already installed.""" - proc = run_process([sys.executable, '-m', 'pip', 'uninstall', '-q', '-q', '-y', 'tokendito']) - assert not proc['stderr'] - assert proc['exit_status'] == 0 + proc = run_process( + [sys.executable, "-m", "pip", "uninstall", "-q", "-q", "-y", "tokendito"] + ) + assert not proc["stderr"] + assert proc["exit_status"] == 0 -@pytest.mark.run('second') +@pytest.mark.run("second") def test_package_install(): """Install tokendito as a python package.""" repo_root = path.dirname(path.dirname(path.abspath(__file__))) - proc = run_process([sys.executable, '-m', 'pip', 'install', '-e', repo_root]) - assert not proc['stderr'] - assert proc['exit_status'] == 0 + proc = run_process([sys.executable, "-m", "pip", "install", "-e", repo_root]) + assert not proc["stderr"] + assert proc["exit_status"] == 0 def test_package_exists(): """Check whether the package is installed.""" - proc = run_process([sys.executable, '-m', 'pip', 'show', 'tokendito']) - assert not proc['stderr'] - assert proc['exit_status'] == 0 - - -@pytest.mark.parametrize('runnable', [[sys.executable, '-m', 'tokendito', '--version'], - [sys.executable, sys.path[0] + '/tokendito/tokendito.py', - '--version'], - ['tokendito', '--version']]) + proc = run_process([sys.executable, "-m", "pip", "show", "tokendito"]) + assert not proc["stderr"] + assert proc["exit_status"] == 0 + + +@pytest.mark.parametrize( + "runnable", + [ + [sys.executable, "-m", "tokendito", "--version"], + [sys.executable, sys.path[0] + "/tokendito/tokendito.py", "--version"], + ["tokendito", "--version"], + ], +) def test_version(package_version, package_regex, runnable): """Check if the package version is the same when running in different ways.""" proc = run_process(runnable) - assert not proc['stderr'] - assert proc['exit_status'] == 0 - match = re.match(package_regex, proc['stdout']) - local_version = match.group('version') + assert not proc["stderr"] + assert proc["exit_status"] == 0 + match = re.match(package_regex, proc["stdout"]) + local_version = match.group("version") assert package_version == local_version -@pytest.mark.run('second-to-last') +@pytest.mark.run("second-to-last") def test_generate_credentials(custom_args): """Run the tool and generate credentials.""" from tokendito import helpers, settings # Emulate helpers.process_options() bypassing interactive portions. tool_args = helpers.setup(custom_args) - helpers.process_ini_file(tool_args.config_file, 'default') + helpers.process_ini_file(tool_args.config_file, "default") helpers.process_arguments(tool_args) helpers.process_environment() - if settings.role_arn is None or \ - settings.okta_aws_app_url is None or \ - settings.mfa_method is None or \ - not settings.okta_username or \ - not settings.okta_password: - pytest.skip('Not enough arguments collected to execute non-interactively.') + if ( + settings.role_arn is None + or settings.okta_aws_app_url is None + or settings.mfa_method is None + or not settings.okta_username + or not settings.okta_password + ): + pytest.skip("Not enough arguments collected to execute non-interactively.") # Rebuild argument list - args = ['--role-arn', '{}'.format(settings.role_arn), - '--okta-aws-app-url', '{}'.format(settings.okta_aws_app_url), - '--mfa-method', '{}'.format(settings.mfa_method), - '--mfa-response', '{}'.format(settings.mfa_response), - '--username', '{}'.format(settings.okta_username), - '--password', '{}'.format(settings.okta_password) - ] - executable = ['tokendito'] # Can use sys.executable -m tokendito, or parametrize + args = [ + "--role-arn", + "{}".format(settings.role_arn), + "--okta-aws-app-url", + "{}".format(settings.okta_aws_app_url), + "--mfa-method", + "{}".format(settings.mfa_method), + "--mfa-response", + "{}".format(settings.mfa_response), + "--username", + "{}".format(settings.okta_username), + "--password", + "{}".format(settings.okta_password), + ] + executable = ["tokendito"] # Can use sys.executable -m tokendito, or parametrize runnable = executable + args proc = run_process(runnable) - assert not proc['stderr'] - assert proc['exit_status'] == 0 + assert not proc["stderr"] + assert proc["exit_status"] == 0 -@pytest.mark.run('last') +@pytest.mark.run("last") def test_aws_credentials(custom_args): """Run the AWS cli to verify whether credentials work.""" from tokendito import helpers, settings + # Emulate helpers.process_options() bypassing interactive portions. tool_args = helpers.setup(custom_args) - helpers.process_ini_file(tool_args.config_file, 'default') + helpers.process_ini_file(tool_args.config_file, "default") helpers.process_arguments(tool_args) helpers.process_environment() if settings.role_arn is None: - pytest.skip('No AWS profile defined, test will be skipped.') - profile = settings.role_arn.split('/')[-1] - runnable = ['aws', '--profile', profile, 'sts', 'get-caller-identity'] + pytest.skip("No AWS profile defined, test will be skipped.") + profile = settings.role_arn.split("/")[-1] + runnable = ["aws", "--profile", profile, "sts", "get-caller-identity"] proc = run_process(runnable) - assert not proc['stderr'] - assert proc['exit_status'] == 0 + assert not proc["stderr"] + assert proc["exit_status"] == 0 diff --git a/tests/unit_test.py b/tests/unit_test.py index eb07b0bb..842b8172 100644 --- a/tests/unit_test.py +++ b/tests/unit_test.py @@ -1,12 +1,30 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """Unit tests, and local fixtures.""" -from __future__ import (absolute_import, division, - print_function, unicode_literals) - -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) +from __future__ import absolute_import, division, print_function, unicode_literals + +from builtins import ( # noqa: F401 + ascii, + bytes, + chr, + dict, + filter, + hex, + input, + int, + list, + map, + next, + object, + oct, + open, + pow, + range, + round, + str, + super, + zip, +) from os import path import sys @@ -23,18 +41,50 @@ def valid_settings(): """Return a dict with valid settings for the tokendito.settings module.""" from tokendito import settings + builtins_and_methods = [ - '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', - '__name__', '__package__', '__spec__', 'absolute_import', 'ascii', - 'bytes', 'chr', 'dict', 'division', 'encoding', 'filter', 'hex', - 'input', 'int', 'list', 'map', 'next', 'object', 'oct', 'open', - 'pow', 'print_function', 'range', 'role_arn', 'round', - 'standard_library', 'str', 'super', 'sys', 'unicode_literals', 'zip'] + "__builtins__", + "__cached__", + "__doc__", + "__file__", + "__loader__", + "__name__", + "__package__", + "__spec__", + "absolute_import", + "ascii", + "bytes", + "chr", + "dict", + "division", + "encoding", + "filter", + "hex", + "input", + "int", + "list", + "map", + "next", + "object", + "oct", + "open", + "pow", + "print_function", + "range", + "role_arn", + "round", + "standard_library", + "str", + "super", + "sys", + "unicode_literals", + "zip", + ] settings_keys = dir(settings) unmatched_keys = list(set(settings_keys) - set(builtins_and_methods)) - valid_keys = {str(key): key + '_pytest_patched' for key in unmatched_keys} + valid_keys = {str(key): key + "_pytest_patched" for key in unmatched_keys} return valid_keys @@ -45,7 +95,7 @@ def invalid_settings(): "okta": "okta_pytest_patched", "okta_deadbeef": "okta_deadbeef_pytest_patched", "aws_deadbeef": "aws_deadbeef_pytest_patched", - "pytest_bad_value": "pytest_bad_value_pytest_patched" + "pytest_bad_value": "pytest_bad_value_pytest_patched", } return invalid_keys @@ -53,8 +103,10 @@ def invalid_settings(): def test_import_location(): """Ensure module imported is the local one.""" import tokendito - local_path = path.realpath(path.dirname(path.dirname( - path.abspath(__file__))) + '/tokendito/__init__.py') + + local_path = path.realpath( + path.dirname(path.dirname(path.abspath(__file__))) + "/tokendito/__init__.py" + ) imported_path = path.realpath(tokendito.__file__) assert imported_path.startswith(local_path) @@ -62,21 +114,24 @@ def test_import_location(): def test_semver_version(): """Ensure the package version is semver compliant.""" from tokendito.__version__ import __version__ as version - assert semver.parse_version_info(version) + + assert semver.VersionInfo.parse(version) def test__version__var_names(): """Ensure variables follow the __varname__ convention.""" from tokendito import __version__ + for item in vars(__version__): - assert item.startswith('__') - assert item.endswith('__') + assert item.startswith("__") + assert item.endswith("__") -@pytest.mark.parametrize('string', [r'raw_string', u'unicode_string', r'byte_string']) +@pytest.mark.parametrize("string", [r"raw_string", "unicode_string", r"byte_string"]) def test_to_unicode(string): """Test whether to_unicode returns unicode strings.""" from tokendito import helpers + new_str = helpers.to_unicode(string) assert isinstance(new_str, str) @@ -85,11 +140,11 @@ def test_set_okta_username(monkeypatch): """Test whether data sent is the same as data returned.""" from tokendito import helpers, settings - monkeypatch.setattr('tokendito.helpers.input', lambda _: 'pytest_patched') + monkeypatch.setattr("tokendito.helpers.input", lambda _: "pytest_patched") val = helpers.set_okta_username() - assert val == 'pytest_patched' - assert settings.okta_username == 'pytest_patched' + assert val == "pytest_patched" + assert settings.okta_username == "pytest_patched" def test_set_okta_password(monkeypatch): @@ -97,20 +152,27 @@ def test_set_okta_password(monkeypatch): from tokendito import helpers, settings import getpass - monkeypatch.setattr(getpass, 'getpass', lambda: 'pytest_patched') + monkeypatch.setattr(getpass, "getpass", lambda: "pytest_patched") val = helpers.set_okta_password() - assert val == 'pytest_patched' - assert settings.okta_password == 'pytest_patched' - - -@pytest.mark.parametrize('url,expected', [ - ('pytest_deadbeef', False), - ('http://acme.org/', False), - ('https://acme.okta.org/app/UserHome', False), - ('http://login.acme.org/home/amazon_aws/0123456789abcdef0123/456', False), - ('https://login.acme.org/home/amazon_aws/0123456789abcdef0123/456', True), - ('https://acme.okta.org/home/amazon_aws/0123456789abcdef0123/456?fromHome=true', True)]) + assert val == "pytest_patched" + assert settings.okta_password == "pytest_patched" + + +@pytest.mark.parametrize( + "url,expected", + [ + ("pytest_deadbeef", False), + ("http://acme.org/", False), + ("https://acme.okta.org/app/UserHome", False), + ("http://login.acme.org/home/amazon_aws/0123456789abcdef0123/456", False), + ("https://login.acme.org/home/amazon_aws/0123456789abcdef0123/456", True), + ( + "https://acme.okta.org/home/amazon_aws/0123456789abcdef0123/456?fromHome=true", + True, + ), + ], +) def test_validate_okta_aws_app_url(url, expected): """Test whether the Okta URL is parsed correctly.""" from tokendito import helpers @@ -118,51 +180,50 @@ def test_validate_okta_aws_app_url(url, expected): assert helpers.validate_okta_aws_app_url(input_url=url) is expected -@pytest.mark.parametrize('test,limit,expected', [ - (0, 10, True), - (5, 10, True), - (10, 10, False), - (-1, 10, False), - (1, 0, False) -]) +@pytest.mark.parametrize( + "test,limit,expected", + [(0, 10, True), (5, 10, True), (10, 10, False), (-1, 10, False), (1, 0, False)], +) def test_check_within_range(mocker, test, limit, expected): """Test whether a given number is in the range 0 >= num < limit.""" from tokendito import helpers - mocker.patch('logging.error') + mocker.patch("logging.error") assert helpers.check_within_range(test, limit) is expected -@pytest.mark.parametrize('value,expected', [ - ('-1', False), - ('0', True), - ('1', True), - (-1, False), - (0, True), - (1, True), - (3.7, False), - ('3.7', False), - ('seven', False), - ('0xff', False), - (None, False)]) +@pytest.mark.parametrize( + "value,expected", + [ + ("-1", False), + ("0", True), + ("1", True), + (-1, False), + (0, True), + (1, True), + (3.7, False), + ("3.7", False), + ("seven", False), + ("0xff", False), + (None, False), + ], +) def test_check_integer(value, expected, mocker): """Test whether the integer testing function works within boundaries.""" from tokendito import helpers - mocker.patch('logging.error') + mocker.patch("logging.error") assert helpers.check_integer(value) is expected -@pytest.mark.parametrize('test,limit,expected', [ - (1, 10, True), - (-1, 10, False), - ('pytest', 10, False) -]) +@pytest.mark.parametrize( + "test,limit,expected", [(1, 10, True), (-1, 10, False), ("pytest", 10, False)] +) def test_validate_input(mocker, test, limit, expected): """Check if a given input is within the 0 >= num < limit range.""" from tokendito import helpers - mocker.patch('logging.error') + mocker.patch("logging.error") assert helpers.validate_input(test, limit) is expected @@ -170,20 +231,16 @@ def test_get_input(monkeypatch): """Check if provided input is return unmodified.""" from tokendito import helpers - monkeypatch.setattr('tokendito.helpers.input', lambda _: 'pytest_patched') - assert helpers.get_input() == 'pytest_patched' + monkeypatch.setattr("tokendito.helpers.input", lambda _: "pytest_patched") + assert helpers.get_input() == "pytest_patched" -@pytest.mark.parametrize('value,expected', [ - ('00', 0), - ('01', 1), - ('5', 5) -]) +@pytest.mark.parametrize("value,expected", [("00", 0), ("01", 1), ("5", 5)]) def test_collect_integer(monkeypatch, value, expected): """Check if a given digit or series of digits are properly casted to int.""" from tokendito import helpers - monkeypatch.setattr('tokendito.helpers.input', lambda _: value) + monkeypatch.setattr("tokendito.helpers.input", lambda _: value) assert helpers.collect_integer(10) == expected @@ -191,18 +248,21 @@ def test_prepare_payload(): """Check if values passed return in a dictionary.""" from tokendito import helpers - assert helpers.prepare_payload(pytest_key='pytest_val') == {'pytest_key': 'pytest_val'} - assert helpers.prepare_payload(pytest_key=None) == {'pytest_key': None} - assert helpers.prepare_payload(pytest_key1='pytest_val1', pytest_key2='pytest_val2') == { - 'pytest_key1': 'pytest_val1', 'pytest_key2': 'pytest_val2'} + assert helpers.prepare_payload(pytest_key="pytest_val") == { + "pytest_key": "pytest_val" + } + assert helpers.prepare_payload(pytest_key=None) == {"pytest_key": None} + assert helpers.prepare_payload( + pytest_key1="pytest_val1", pytest_key2="pytest_val2" + ) == {"pytest_key1": "pytest_val1", "pytest_key2": "pytest_val2"} def test_set_passcode(monkeypatch): """Check if numerical passcode can handle leading zero values.""" from tokendito import duo_helpers - monkeypatch.setattr('tokendito.helpers.input', lambda _: '0123456') - assert duo_helpers.set_passcode({'factor': 'passcode'}) == '0123456' + monkeypatch.setattr("tokendito.helpers.input", lambda _: "0123456") + assert duo_helpers.set_passcode({"factor": "passcode"}) == "0123456" def test_process_environment(monkeypatch, valid_settings, invalid_settings): @@ -218,14 +278,14 @@ def test_process_environment(monkeypatch, valid_settings, invalid_settings): env_keys = valid_keys.copy() env_keys.update(invalid_keys) - monkeypatch.setattr(os, 'environ', env_keys) + monkeypatch.setattr(os, "environ", env_keys) helpers.process_environment() for key in valid_settings: assert getattr(settings, key) == valid_settings[key] for key in invalid_settings: - assert getattr(settings, key, 'not_found') == 'not_found' + assert getattr(settings, key, "not_found") == "not_found" def test_process_arguments(valid_settings, invalid_settings): @@ -243,39 +303,47 @@ def test_process_arguments(valid_settings, invalid_settings): assert getattr(settings, key_name) == valid_settings[key_name] for key_name in invalid_settings: - assert getattr(settings, key_name, 'not_found') == 'not_found' + assert getattr(settings, key_name, "not_found") == "not_found" -@pytest.mark.skipif(sys.version_info[:2] == (3, 5), - reason="ConfigParser bug, see https://bugs.python.org/issue29623") + +@pytest.mark.skipif( + sys.version_info[:2] == (3, 5), + reason="ConfigParser bug, see https://bugs.python.org/issue29623", +) def test_process_ini_file(tmpdir, valid_settings, invalid_settings, mocker): """Test whether ini config elements are correctly set in settings.*.""" from tokendito import helpers, settings + # Create a mock config file - data = '[default]\nokta_username = pytest\n\n[pytest]\n' - data += ''.join('{} = {}\n'.format(key, val) for key, val in valid_settings.items()) - data += ''.join('{} = {}\n'.format(key, val) for key, val in invalid_settings.items()) - data += '\n[pytest_end]\n' - data += ''.join('{} = {}\n'.format(key, val) for key, val in invalid_settings.items()) + data = "[default]\nokta_username = pytest\n\n[pytest]\n" + data += "".join("{} = {}\n".format(key, val) for key, val in valid_settings.items()) + data += "".join( + "{} = {}\n".format(key, val) for key, val in invalid_settings.items() + ) + data += "\n[pytest_end]\n" + data += "".join( + "{} = {}\n".format(key, val) for key, val in invalid_settings.items() + ) # Python 3.7 supports patching builtins.open(), which gives us the ability # to bypass file creation with: # mocker.patch('builtins.open', mocker.mock_open(read_data=data), create=True) # There is no (easy) way to achieve the same on earlier versions, so we create # an actual file instead. tmpdir keeps the last 3 files/dirs behind for inspection - path = tmpdir.mkdir('pytest').join('pytest_tokendito.ini') + path = tmpdir.mkdir("pytest").join("pytest_tokendito.ini") path.write(data) # Ensure we fail if the section is not found with pytest.raises(SystemExit) as err: - mocker.patch('logging.error') - helpers.process_ini_file(path, 'pytest_expected_failure') + mocker.patch("logging.error") + helpers.process_ini_file(path, "pytest_expected_failure") # assert err.type == SystemExit assert err.value.code == 2 - helpers.process_ini_file(path, 'pytest') + helpers.process_ini_file(path, "pytest") # Test that correct options are set for key_name in valid_settings: assert getattr(settings, key_name) == valid_settings[key_name] # Test that incorrect options aren't set for key_name in invalid_settings: - assert getattr(settings, key_name, 'not_found') == 'not_found' + assert getattr(settings, key_name, "not_found") == "not_found" diff --git a/tokendito/__init__.py b/tokendito/__init__.py index 551e808e..6b922eab 100644 --- a/tokendito/__init__.py +++ b/tokendito/__init__.py @@ -1,4 +1,4 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """tokendito module initialization.""" -__all__ = [''] +__all__ = [""] diff --git a/tokendito/__main__.py b/tokendito/__main__.py index fb4d188c..65716d70 100755 --- a/tokendito/__main__.py +++ b/tokendito/__main__.py @@ -3,15 +3,12 @@ # -*- coding: utf-8 -*- """tokendito module entry point.""" -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) import sys from future import standard_library + standard_library.install_aliases() @@ -19,15 +16,17 @@ def main(args=None): # needed for console script """Packge entry point.""" if __package__ is None: import os.path + path = os.path.dirname(os.path.dirname(__file__)) sys.path[0:0] = [path] from tokendito.tool import cli + try: return cli(args) except KeyboardInterrupt: - print('\nInterrupted') + print("\nInterrupted") sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/tokendito/__version__.py b/tokendito/__version__.py index fe8893dd..e0398afd 100644 --- a/tokendito/__version__.py +++ b/tokendito/__version__.py @@ -1,11 +1,11 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """tokendito version.""" -__version__ = '1.1.0' -__title__ = 'tokendito' -__description__ = 'Get AWS STS tokens from Okta SSO' -__long_description_content_type__ = 'text/x-rst' -__url__ = 'https://github.com/dowjones/tokendito' -__author__ = 'tokendito' -__author_email__ = 'tokendito@dowjones.com' -__license__ = 'Apache 2.0' +__version__ = "1.1.0" +__title__ = "tokendito" +__description__ = "Get AWS STS tokens from Okta SSO" +__long_description_content_type__ = "text/x-rst" +__url__ = "https://github.com/dowjones/tokendito" +__author__ = "tokendito" +__author_email__ = "tokendito@dowjones.com" +__license__ = "Apache 2.0" diff --git a/tokendito/aws_helpers.py b/tokendito/aws_helpers.py index e8c8eaec..458c7574 100644 --- a/tokendito/aws_helpers.py +++ b/tokendito/aws_helpers.py @@ -8,12 +8,8 @@ 3. Updating the AWS Config """ -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) import codecs import logging import sys @@ -38,34 +34,34 @@ def authenticate_to_roles(secret_session_token, url): :return: response text """ - payload = {'onetimetoken': secret_session_token} - logging.debug( - "Authenticate AWS user with SAML URL [{}:{}]".format(url, payload)) + payload = {"onetimetoken": secret_session_token} + logging.debug("Authenticate AWS user with SAML URL [{}:{}]".format(url, payload)) try: response = requests.get(url, params=payload) saml_response_string = response.text if response.status_code == 400 or response.status_code == 401: - errmsg = 'Invalid Credentials.' - logging.critical("{}\nExiting with code:{}".format( - errmsg, response.status_code)) + errmsg = "Invalid Credentials." + logging.critical( + "{}\nExiting with code:{}".format(errmsg, response.status_code) + ) sys.exit(2) elif response.status_code == 404: - errmsg = 'Invalid Okta application URL. Please verify your configuration.' + errmsg = "Invalid Okta application URL. Please verify your configuration." logging.critical("{}".format(errmsg)) sys.exit(2) elif response.status_code >= 500 and response.status_code < 504: - errmsg = 'Unable to establish connection with Okta. Verify Okta Org URL and try again.' - logging.critical("{}\nExiting with code:{}".format( - errmsg, response.status_code)) + errmsg = "Unable to establish connection with Okta. Verify Okta Org URL and try again." + logging.critical( + "{}\nExiting with code:{}".format(errmsg, response.status_code) + ) sys.exit(2) elif response.status_code != 200: - logging.critical("Exiting with code:{}".format( - response.status_code)) + logging.critical("Exiting with code:{}".format(response.status_code)) logging.debug(saml_response_string) sys.exit(2) except Exception as error: - errmsg = 'Okta auth failed:\n{}'.format(error) + errmsg = "Okta auth failed:\n{}".format(error) logging.critical(errmsg) sys.exit(1) @@ -83,24 +79,33 @@ def assume_role(role_arn, provider_arn, saml): :return: AssumeRoleWithSaml API response """ - default_error = ('\nUnable to assume role with the following details:\n' - '- Role ARN: {}\n' - '- Error: {}\n') + default_error = ( + "\nUnable to assume role with the following details:\n" + "- Role ARN: {}\n" + "- Error: {}\n" + ) - encoded_xml = codecs.encode(saml.encode('utf-8'), 'base64') + encoded_xml = codecs.encode(saml.encode("utf-8"), "base64") # Attempt to assume a role with the following durations: # 12h, 8h, 6h, 4h, 2h, 1h, 30m, 15m session_times = [43200, 28800, 21600, 14400, 7200, 3600, 1800, 900, "exit"] for duration in session_times: if duration == "exit": - logging.critical(default_error.format( - role_arn, "IAM role session time is not within set: {}".format(session_times[:-1]))) + logging.critical( + default_error.format( + role_arn, + "IAM role session time is not within set: {}".format( + session_times[:-1] + ), + ) + ) sys.exit(2) assume_role_response = handle_assume_role( - role_arn, provider_arn, encoded_xml, duration, default_error) - if 'Credentials' in assume_role_response: + role_arn, provider_arn, encoded_xml, duration, default_error + ) + if "Credentials" in assume_role_response: break return assume_role_response @@ -115,21 +120,25 @@ def handle_assume_role(role_arn, provider_arn, encoded_xml, duration, default_er :return: AssumeRoleWithSaml API responses """ logging.debug("Attempting session time [{}]".format(duration)) - client = boto3.client('sts', config=Config(signature_version=UNSIGNED)) + client = boto3.client("sts", config=Config(signature_version=UNSIGNED)) try: - assume_role_response = client.assume_role_with_saml(RoleArn=role_arn, - PrincipalArn=provider_arn, - SAMLAssertion=encoded_xml.decode(), - DurationSeconds=duration) + assume_role_response = client.assume_role_with_saml( + RoleArn=role_arn, + PrincipalArn=provider_arn, + SAMLAssertion=encoded_xml.decode(), + DurationSeconds=duration, + ) # Client Exceptions except ClientError as error: - if error.response['Error']['Code'] == 'ValidationError': - logging.info("AssumeRoleWithSaml failed with {} for duration {}".format( - error.response['Error']['Code'], duration)) + if error.response["Error"]["Code"] == "ValidationError": + logging.info( + "AssumeRoleWithSaml failed with {} for duration {}".format( + error.response["Error"]["Code"], duration + ) + ) assume_role_response = "continue" - elif error.response['Error']['Code'] == 'AccessDenied': - errmsg = 'Error assuming intermediate {} SAML role'.format( - provider_arn) + elif error.response["Error"]["Code"] == "AccessDenied": + errmsg = "Error assuming intermediate {} SAML role".format(provider_arn) logging.critical(errmsg) sys.exit(2) else: @@ -154,19 +163,24 @@ def ensure_keys_work(assume_role_response): """ logging.debug("Validate the temporary AWS credentials") - aws_access_key = assume_role_response['Credentials']['AccessKeyId'] - aws_secret_key = assume_role_response['Credentials']['SecretAccessKey'] - aws_session_token = assume_role_response['Credentials']['SessionToken'] + aws_access_key = assume_role_response["Credentials"]["AccessKeyId"] + aws_secret_key = assume_role_response["Credentials"]["SecretAccessKey"] + aws_session_token = assume_role_response["Credentials"]["SessionToken"] try: client = boto3.client( - 'sts', aws_access_key_id=aws_access_key, - aws_secret_access_key=aws_secret_key, aws_session_token=aws_session_token) + "sts", + aws_access_key_id=aws_access_key, + aws_secret_access_key=aws_secret_key, + aws_session_token=aws_session_token, + ) client.get_caller_identity() except Exception as auth_error: logging.critical( "There was an error authenticating your keys with AWS: {}".format( - auth_error)) + auth_error + ) + ) sys.exit(1) @@ -179,10 +193,10 @@ def select_assumeable_role(saml_response_string, saml): """ roles_and_providers = helpers.extract_arns(saml) role_arn = helpers.select_role_arn( - list(roles_and_providers.keys()), saml, saml_response_string) + list(roles_and_providers.keys()), saml, saml_response_string + ) role_name = role_arn.split("/")[-1] - assume_role_response = assume_role( - role_arn, roles_and_providers[role_arn], saml) + assume_role_response = assume_role(role_arn, roles_and_providers[role_arn], saml) return assume_role_response, role_name diff --git a/tokendito/duo_helpers.py b/tokendito/duo_helpers.py index d14f663c..d9106328 100644 --- a/tokendito/duo_helpers.py +++ b/tokendito/duo_helpers.py @@ -1,12 +1,8 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """This module handles Duo operations.""" -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) import json import logging import sys @@ -18,6 +14,7 @@ import requests from tokendito import helpers from tokendito import settings + standard_library.install_aliases() @@ -28,7 +25,9 @@ def prepare_duo_info(selected_okta_factor): :return duo_info: dict of parameters for Duo """ duo_info = {} - okta_factor = selected_okta_factor["_embedded"]["factor"]["_embedded"]["verification"] + okta_factor = selected_okta_factor["_embedded"]["factor"]["_embedded"][ + "verification" + ] duo_info["okta_factor"] = okta_factor duo_info["factor_id"] = selected_okta_factor["_embedded"]["factor"]["id"] @@ -36,8 +35,7 @@ def prepare_duo_info(selected_okta_factor): duo_info["okta_callback_url"] = okta_factor["_links"]["complete"]["href"] duo_info["tx"] = okta_factor["signature"].split(":")[0] duo_info["app_sig"] = okta_factor["signature"].split(":")[1] - duo_info["parent"] = "{}/signin/verify/duo/web".format( - settings.okta_org) + duo_info["parent"] = "{}/signin/verify/duo/web".format(settings.okta_org) duo_info["host"] = okta_factor["host"] duo_info["sid"] = "" @@ -58,10 +56,12 @@ def duo_api_post(url, params={}, headers={}, payload={}): """ try: response = requests.request( - 'POST', url, params=params, headers=headers, data=payload) + "POST", url, params=params, headers=headers, data=payload + ) except Exception as request_issue: logging.error( - "There was an error connecting to the Duo API: \n{}".format(request_issue)) + "There was an error connecting to the Duo API: \n{}".format(request_issue) + ) sys.exit(1) json_message = None @@ -71,14 +71,20 @@ def duo_api_post(url, params={}, headers={}, payload={}): logging.debug("Non-json response from Duo API: \n{}".format(response)) if response.status_code != 200: - logging.critical("Your Duo authentication has failed with status {}.".format( - response.status_code)) + logging.critical( + "Your Duo authentication has failed with status {}.".format( + response.status_code + ) + ) if json_message and json_message["stat"].lower() != "ok": - logging.critical("\n{}".format( - response.status_code, json_message["message"])) + logging.critical( + "\n{}, {}".format(response.status_code, json_message["message"]) + ) else: - logging.critical('Please re-run the program with parameter' - ' "--loglevel debug" to see more information.') + logging.critical( + "Please re-run the program with parameter" + ' "--loglevel debug" to see more information.' + ) sys.exit(2) return response @@ -94,20 +100,23 @@ def get_duo_sid(duo_info): :return: duo_auth_response, contains html content listing available factors. """ params = helpers.prepare_payload( - tx=duo_info["tx"], v=duo_info["version"], parent=duo_info["parent"]) + tx=duo_info["tx"], v=duo_info["version"], parent=duo_info["parent"] + ) url = "https://{}/frame/web/v1/auth".format(duo_info["host"]) - logging.info("Calling Duo {} with params {}".format( - urlparse(url).path, params.keys())) + logging.info( + "Calling Duo {} with params {}".format(urlparse(url).path, params.keys()) + ) duo_auth_response = duo_api_post(url, params=params) try: - duo_auth_redirect = urlparse("{}".format( - unquote(duo_auth_response.url))).query + duo_auth_redirect = urlparse("{}".format(unquote(duo_auth_response.url))).query duo_info["sid"] = duo_auth_redirect.strip("sid=") except Exception as sid_error: - logging.error("There was an error getting your SID." - "Please try again. \n{}".format(sid_error)) + logging.error( + "There was an error getting your SID." + "Please try again. \n{}".format(sid_error) + ) return duo_info, duo_auth_response @@ -133,8 +142,7 @@ def get_duo_devices(duo_auth): factor_options = [] for device in devices: - options = soup.find( - "fieldset", {"data-device-index": device.split(" - ")[0]}) + options = soup.find("fieldset", {"data-device-index": device.split(" - ")[0]}) factors = options.findAll("input", {"name": "factor"}) for factor in factors: factor_option = {} @@ -156,17 +164,22 @@ def parse_duo_mfa_challenge(mfa_challenge): txid = mfa_challenge["response"]["txid"] except ValueError as value_error: logging.error( - "The Duo API returned a non-json response: \n{}".format(value_error)) + "The Duo API returned a non-json response: \n{}".format(value_error) + ) sys.exit(1) except KeyError as key_error: logging.error( - "The Duo API response is missing a required parameter: \n{}".format(key_error)) + "The Duo API response is missing a required parameter: \n{}".format( + key_error + ) + ) print(json.dumps(mfa_challenge)) sys.exit(1) if mfa_status == "fail": - logging.error("Your Duo authentication has failed: \n{}".format( - mfa_challenge["message"])) + logging.error( + "Your Duo authentication has failed: \n{}".format(mfa_challenge["message"]) + ) sys.exit(1) return txid @@ -184,12 +197,14 @@ def duo_mfa_challenge(duo_info, mfa_option, passcode): """ url = "https://{}/frame/prompt".format(duo_info["host"]) device = mfa_option["device"].split(" - ")[0] - mfa_data = helpers.prepare_payload(factor=mfa_option["factor"], - device=device, - sid=duo_info["sid"], - out_of_date=False, - days_out_of_date=0, - days_to_block=None) + mfa_data = helpers.prepare_payload( + factor=mfa_option["factor"], + device=device, + sid=duo_info["sid"], + out_of_date=False, + days_out_of_date=0, + days_to_block=None, + ) mfa_data["async"] = True # async is a reserved keyword if passcode: mfa_data["passcode"] = passcode @@ -210,7 +225,10 @@ def get_mfa_response(mfa_result): verify_mfa = mfa_result.json()["response"] except Exception as parse_error: logging.error( - "There was an error parsing the mfa challenge result: \n{}".format(parse_error)) + "There was an error parsing the mfa challenge result: \n{}".format( + parse_error + ) + ) sys.exit(1) return verify_mfa @@ -257,7 +275,8 @@ def duo_mfa_verify(duo_info, txid): mfa_result = duo_api_post(url, payload=challenged_mfa) verify_mfa = get_mfa_response(mfa_result) challenge_result, challenge_reason = parse_challenge( - verify_mfa, challenge_result) + verify_mfa, challenge_result + ) if challenge_result is None: continue @@ -265,12 +284,16 @@ def duo_mfa_verify(duo_info, txid): logging.debug("Successful MFA challenge received") break elif challenge_result == "failure": - logging.critical("MFA challenge has failed:" - " {}. Please try again.".format(challenge_reason)) + logging.critical( + "MFA challenge has failed:" + " {}. Please try again.".format(challenge_reason) + ) sys.exit(2) else: - logging.debug("MFA challenge result: {}" - "Reason: {}\n\n".format(challenge_result, challenge_reason)) + logging.debug( + "MFA challenge result: {}" + "Reason: {}\n\n".format(challenge_result, challenge_reason) + ) time.sleep(1) return verify_mfa @@ -287,16 +310,21 @@ def duo_factor_callback(duo_info, verify_mfa): :return sig_response: required to sign final Duo callback request. """ factor_callback_url = "https://{}{}".format( - duo_info["host"], verify_mfa["result_url"]) - factor_callback = duo_api_post(factor_callback_url, payload={ - "sid": duo_info["sid"]}) + duo_info["host"], verify_mfa["result_url"] + ) + factor_callback = duo_api_post( + factor_callback_url, payload={"sid": duo_info["sid"]} + ) try: sig_response = "{}:{}".format( - factor_callback.json()["response"]["cookie"], duo_info["app_sig"]) + factor_callback.json()["response"]["cookie"], duo_info["app_sig"] + ) except Exception as sig_error: - logging.error("There was an error getting your" - " application signature from Duo: \n{}".format(json.dumps(sig_error))) + logging.error( + "There was an error getting your" + " application signature from Duo: \n{}".format(json.dumps(sig_error)) + ) logging.debug("Completed factor callback.") return sig_response @@ -312,7 +340,7 @@ def set_passcode(mfa_option): """ passcode = None if mfa_option["factor"].lower() == "passcode": - print('Type your TOTP and press Enter') + print("Type your TOTP and press Enter") passcode = helpers.get_input() return passcode @@ -333,14 +361,16 @@ def authenticate_duo(selected_okta_factor): except KeyError as missing_key: logging.error( "There was an issue parsing the Okta factor." - " Please try again. \n{}".format(missing_key)) + " Please try again. \n{}".format(missing_key) + ) sys.exit(1) # Collect devices, factors, auth params for Duo duo_info, duo_auth_response = get_duo_sid(duo_info) factor_options = get_duo_devices(duo_auth_response) mfa_index = helpers.select_preferred_mfa_index( - factor_options, factor_key="factor", subfactor_key="device") + factor_options, factor_key="factor", subfactor_key="device" + ) mfa_option = factor_options[mfa_index] logging.debug("Selected MFA is [{}]".format(mfa_option)) @@ -353,9 +383,11 @@ def authenticate_duo(selected_okta_factor): sig_response = duo_factor_callback(duo_info, verify_mfa) # Prepare for Okta callback - payload = helpers.prepare_payload(id=duo_info["factor_id"], - sig_response=sig_response, - stateToken=duo_info["state_token"]) + payload = helpers.prepare_payload( + id=duo_info["factor_id"], + sig_response=sig_response, + stateToken=duo_info["state_token"], + ) headers = {} headers["content-type"] = "application/json" headers["accept"] = "application/json" diff --git a/tokendito/helpers.py b/tokendito/helpers.py index e4ae7477..3af731d3 100644 --- a/tokendito/helpers.py +++ b/tokendito/helpers.py @@ -1,13 +1,31 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """Helper module for AWS and Okta configuration, management and data flow.""" -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals import argparse -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) +from builtins import ( # noqa: F401 + ascii, + bytes, + chr, + dict, + filter, + hex, + input, + int, + list, + map, + next, + object, + oct, + open, + pow, + range, + round, + str, + super, + zip, +) import codecs import configparser import getpass @@ -37,44 +55,90 @@ def setup(args): :return: args parse object """ parser = argparse.ArgumentParser( - prog='tokendito', - description='Gets a STS token to use with the AWS CLI') - parser.add_argument('--version', '-v', action='store_true', - help='Displays version and exit') - parser.add_argument('--configure', '-c', action='store_true', help='Prompt user for ' - 'configuration parameters') - parser.add_argument('--username', '-u', type=to_unicode, dest='okta_username', - help='username to login to Okta. You can ' - 'also use the OKTA_USERNAME environment variable.') - parser.add_argument('--password', '-p', type=to_unicode, dest='okta_password', - help='password to login to Okta. You ' - 'can also user the OKTA_PASSWORD environment variable.') - parser.add_argument('--config-file', '-C', type=to_unicode, - default=settings.config_file, - help='Use an alternative configuration file') - parser.add_argument('--okta-aws-app-url', '-ou', type=to_unicode, - help='Okta App URL to use.') - parser.add_argument('--okta-profile', '-op', type=to_unicode, - default=settings.okta_profile, - help='Okta configuration profile to use.') - parser.add_argument('--aws-region', '-r', type=to_unicode, - help='Sets the AWS region for the profile') - parser.add_argument('--aws-output', '-ao', type=to_unicode, - help='Sets the AWS output type for the profile') - parser.add_argument('--aws-profile', '-ap', type=to_unicode, - help='Override AWS profile to save as in the credentials file.') - parser.add_argument('--mfa-method', '-mm', type=to_unicode, - help='Sets the MFA method') - parser.add_argument('--mfa-response', '-mr', type=to_unicode, - help='Sets the MFA response to a challenge') - parser.add_argument('--role-arn', '-R', type=to_unicode, - help='Sets the IAM role') - parser.add_argument('--output-file', '-o', type=to_unicode, - help="Log output to filename") - parser.add_argument('--loglevel', '-l', type=lambda s: s.upper(), default='ERROR', - choices=["DEBUG", "INFO", "WARN", "ERROR"], - help='[DEBUG|INFO|WARN|ERROR], default loglevel is ERROR.' - ' Note: DEBUG level may display credentials') + prog="tokendito", description="Gets a STS token to use with the AWS CLI" + ) + parser.add_argument( + "--version", "-v", action="store_true", help="Displays version and exit" + ) + parser.add_argument( + "--configure", + "-c", + action="store_true", + help="Prompt user for " "configuration parameters", + ) + parser.add_argument( + "--username", + "-u", + type=to_unicode, + dest="okta_username", + help="username to login to Okta. You can " + "also use the OKTA_USERNAME environment variable.", + ) + parser.add_argument( + "--password", + "-p", + type=to_unicode, + dest="okta_password", + help="password to login to Okta. You " + "can also user the OKTA_PASSWORD environment variable.", + ) + parser.add_argument( + "--config-file", + "-C", + type=to_unicode, + default=settings.config_file, + help="Use an alternative configuration file", + ) + parser.add_argument( + "--okta-aws-app-url", "-ou", type=to_unicode, help="Okta App URL to use." + ) + parser.add_argument( + "--okta-profile", + "-op", + type=to_unicode, + default=settings.okta_profile, + help="Okta configuration profile to use.", + ) + parser.add_argument( + "--aws-region", + "-r", + type=to_unicode, + help="Sets the AWS region for the profile", + ) + parser.add_argument( + "--aws-output", + "-ao", + type=to_unicode, + help="Sets the AWS output type for the profile", + ) + parser.add_argument( + "--aws-profile", + "-ap", + type=to_unicode, + help="Override AWS profile to save as in the credentials file.", + ) + parser.add_argument( + "--mfa-method", "-mm", type=to_unicode, help="Sets the MFA method" + ) + parser.add_argument( + "--mfa-response", + "-mr", + type=to_unicode, + help="Sets the MFA response to a challenge", + ) + parser.add_argument("--role-arn", "-R", type=to_unicode, help="Sets the IAM role") + parser.add_argument( + "--output-file", "-o", type=to_unicode, help="Log output to filename" + ) + parser.add_argument( + "--loglevel", + "-l", + type=lambda s: s.upper(), + default="ERROR", + choices=["DEBUG", "INFO", "WARN", "ERROR"], + help="[DEBUG|INFO|WARN|ERROR], default loglevel is ERROR." + " Note: DEBUG level may display credentials", + ) parsed_args = parser.parse_args(args) set_logging(parsed_args) @@ -108,8 +172,11 @@ def create_directory(dir_name): try: os.mkdir(dir_name) except OSError as error: - logging.error("Cannot continue creating directory \'{}\': {}".format( - settings.config_dir, error.strerror)) + logging.error( + "Cannot continue creating directory '{}': {}".format( + settings.config_dir, error.strerror + ) + ) sys.exit(1) @@ -121,11 +188,10 @@ def set_okta_username(): """ logging.debug("Set okta username in a constant settings variable.") - if settings.okta_username == '': - okta_username = input('Username: ') - setattr(settings, 'okta_username', to_unicode(okta_username)) - logging.debug('username set to {} interactively'.format( - settings.okta_username)) + if settings.okta_username == "": + okta_username = input("Username: ") + setattr(settings, "okta_username", to_unicode(okta_username)) + logging.debug("username set to {} interactively".format(settings.okta_username)) return settings.okta_username @@ -139,11 +205,11 @@ def set_okta_password(): """ logging.debug("Set okta password in a constant settings variable.") - while settings.okta_password == '': + while settings.okta_password == "": okta_password = getpass.getpass() - setattr(settings, 'okta_password', to_unicode(okta_password)) + setattr(settings, "okta_password", to_unicode(okta_password)) - logging.debug('password set interactively') + logging.debug("password set interactively") return settings.okta_password @@ -159,14 +225,12 @@ def set_logging(args): log_level_int = getattr(logging, args.loglevel) # increment boto logs to not print api keys - logging.getLogger('botocore').setLevel( - log_level_int + 10) + logging.getLogger("botocore").setLevel(log_level_int + 10) log_format = ( - '%(levelname)s ' - '[%(filename)s:%(funcName)s():%(lineno)i]: %(message)s' + "%(levelname)s " "[%(filename)s:%(funcName)s():%(lineno)i]: %(message)s" ) - date_format = '%m/%d/%Y %I:%M:%S %p' + date_format = "%m/%d/%Y %I:%M:%S %p" formatter = logging.Formatter(log_format, date_format) @@ -190,13 +254,13 @@ def select_role_arn(role_arns, saml_xml, saml_response_string): """ logging.debug("Select the role user wants to pick [{}]".format(role_arns)) if settings.role_arn is None: - selected_role = prompt_role_choices( - role_arns, saml_xml, saml_response_string) + selected_role = prompt_role_choices(role_arns, saml_xml, saml_response_string) elif settings.role_arn in role_arns: selected_role = settings.role_arn else: logging.error( - "User provided rolename does not exist [{}]".format(settings.role_arn)) + "User provided rolename does not exist [{}]".format(settings.role_arn) + ) sys.exit(2) logging.debug("Selected role: [{}]".format(selected_role)) @@ -204,23 +268,31 @@ def select_role_arn(role_arns, saml_xml, saml_response_string): return selected_role -def select_preferred_mfa_index(mfa_options, factor_key="provider", subfactor_key="factorType"): +def select_preferred_mfa_index( + mfa_options, factor_key="provider", subfactor_key="factorType" +): """Show all the MFA options to the users. :param mfa_options: List of available MFA options :return: MFA option selected index by the user from the output """ logging.debug("Show all the MFA options to the users.") - print('\nSelect your preferred MFA method and press Enter:') + print("\nSelect your preferred MFA method and press Enter:") longest_index = len(str(len(mfa_options))) for (i, mfa_option) in enumerate(mfa_options): padding_index = longest_index - len(str(i)) longest_factor_name = max([len(d[factor_key]) for d in mfa_options]) - print('[{}] {}{: <{}} {}'.format( - i, padding_index*' ', mfa_option[factor_key], longest_factor_name, - mfa_option[subfactor_key])) + print( + "[{}] {}{: <{}} {}".format( + i, + padding_index * " ", + mfa_option[factor_key], + longest_factor_name, + mfa_option[subfactor_key], + ) + ) user_input = collect_integer(len(mfa_options)) @@ -249,8 +321,11 @@ def prompt_role_choices(role_arns, saml_xml, saml_response_string): for (i, arn) in enumerate(sorted_role_arns): padding_index = longest_index - len(str(i)) account_alias = alias_table[arn.split(":")[4]] - print('[{}] {}{: <{}} {}'.format( - i, padding_index*' ', account_alias, longest_alias, arn)) + print( + "[{}] {}{: <{}} {}".format( + i, padding_index * " ", account_alias, longest_alias, arn + ) + ) user_input = collect_integer(len(role_arns)) selected_role = sorted_role_arns[user_input] @@ -268,14 +343,19 @@ def print_selected_role(profile_name, expiration_time): """ msg = ( - '\nGenerated profile \'{}\' in {}.\n' - '\nUse profile to authenticate to AWS:\n\t' - 'aws --profile \'{}\' sts get-caller-identity' - '\nOR\n\t' - 'export AWS_PROFILE=\'{}\'\n\n' - 'Credentials are valid until {}.' - ).format(profile_name, settings.aws_shared_credentials_file, - profile_name, profile_name, expiration_time) + "\nGenerated profile '{}' in {}.\n" + "\nUse profile to authenticate to AWS:\n\t" + "aws --profile '{}' sts get-caller-identity" + "\nOR\n\t" + "export AWS_PROFILE='{}'\n\n" + "Credentials are valid until {}." + ).format( + profile_name, + settings.aws_shared_credentials_file, + profile_name, + profile_name, + expiration_time, + ) return print(msg) @@ -288,8 +368,8 @@ def extract_arns(saml): """ logging.debug("Decode response string as a SAML decoded value.") - soup = BeautifulSoup(saml, 'xml') - arns = soup.find_all(text=re.compile('arn:aws:iam::')) + soup = BeautifulSoup(saml, "xml") + arns = soup.find_all(text=re.compile("arn:aws:iam::")) if len(arns) == 0: logging.error("No IAM roles found in SAML response.") logging.debug(arns) @@ -307,14 +387,15 @@ def validate_saml_response(html): soup = BeautifulSoup(html, "html.parser") xml = None - for elem in soup.find_all('input', attrs={'name': 'SAMLResponse'}): - saml_base64 = elem.get('value') - xml = codecs.decode(saml_base64.encode( - 'ascii'), 'base64').decode('utf-8') + for elem in soup.find_all("input", attrs={"name": "SAMLResponse"}): + saml_base64 = elem.get("value") + xml = codecs.decode(saml_base64.encode("ascii"), "base64").decode("utf-8") if xml is None: - logging.error("Invalid data detected in SAML response." - " View the response with the DEBUG loglevel.") + logging.error( + "Invalid data detected in SAML response." + " View the response with the DEBUG loglevel." + ) logging.debug(html) sys.exit(1) @@ -327,16 +408,18 @@ def validate_okta_aws_app_url(input_url=None): :param input_url: string :return: bool. True if valid, False otherwise """ - logging.debug('Will try to match \'{}\' to a valid URL'.format(input_url)) + logging.debug("Will try to match '{}' to a valid URL".format(input_url)) url = urlparse(input_url) # Here, we could also check url.netloc against r'.*\.okta(preview)?\.com$' # but Okta allows the usage of custome URLs such as login.acme.com - if url.scheme == 'https' and \ - re.match(r'^/home/amazon_aws/\w{20}/\d{3}$', url.path) is not None: + if ( + url.scheme == "https" + and re.match(r"^/home/amazon_aws/\w{20}/\d{3}$", url.path) is not None + ): return True - logging.debug('{} does not look like a valid match.'.format(url)) + logging.debug("{} does not look like a valid match.".format(url)) return False @@ -348,29 +431,31 @@ def get_account_aliases(saml_xml, saml_response_string): :return: mapping table of account ids to their aliases """ soup = BeautifulSoup(saml_response_string, "html.parser") - url = soup.find('form').get('action') + url = soup.find("form").get("action") - encoded_xml = codecs.encode(saml_xml.encode('utf-8'), 'base64') + encoded_xml = codecs.encode(saml_xml.encode("utf-8"), "base64") aws_response = None try: - aws_response = requests.Session().post( - url, data={'SAMLResponse': encoded_xml}) + aws_response = requests.Session().post(url, data={"SAMLResponse": encoded_xml}) except Exception as request_error: logging.error( - "There was an error retrieving the AWS SAML page: \n{}".format(request_error)) + "There was an error retrieving the AWS SAML page: \n{}".format( + request_error + ) + ) logging.debug(json.dumps(aws_response)) sys.exit(1) if "Account: " not in aws_response.text: - logging.error( - "There were no accounts returned in the AWS SAML page.") + logging.error("There were no accounts returned in the AWS SAML page.") logging.debug(json.dumps(aws_response.text)) sys.exit(2) soup = BeautifulSoup(aws_response.text, "html.parser") - account_names = soup.find_all(text=re.compile('Account:')) - alias_table = {str(i.split(" ")[-1]).strip("()"): - i.split(" ")[1] for i in account_names} + account_names = soup.find_all(text=re.compile("Account:")) + alias_table = { + str(i.split(" ")[-1]).strip("()"): i.split(" ")[1] for i in account_names + } return alias_table @@ -379,9 +464,17 @@ def display_version(): """Print program version and exit.""" python_version = platform.python_version() (system, _, release, _, _, _) = platform.uname() - print('tokendito/{} Python/{} {}/{} botocore/{} bs4/{} requests/{}'.format( - __version__, python_version, system, release, - __botocore_version__, __bs4_version__, __requests_version__)) + print( + "tokendito/{} Python/{} {}/{} botocore/{} bs4/{} requests/{}".format( + __version__, + python_version, + system, + release, + __botocore_version__, + __bs4_version__, + __requests_version__, + ) + ) def process_ini_file(file, profile): @@ -398,11 +491,10 @@ def process_ini_file(file, profile): try: for (key, val) in config.items(profile): if hasattr(settings, key): - logging.debug( - 'Set option {}={} from ini file'.format(key, val)) + logging.debug("Set option {}={} from ini file".format(key, val)) setattr(settings, key, val) except configparser.NoSectionError: - logging.error('Profile \'{}\' does not exist.'.format(profile)) + logging.error("Profile '{}' does not exist.".format(profile)) sys.exit(2) @@ -414,8 +506,7 @@ def process_arguments(args): """ for (key, val) in vars(args).items(): if hasattr(settings, key) and val is not None: - logging.debug( - 'Set option {}={} from command line'.format(key, val)) + logging.debug("Set option {}={} from command line".format(key, val)) setattr(settings, key, val) @@ -427,7 +518,7 @@ def process_environment(): for (key, val) in os.environ.items(): key = key.lower() if hasattr(settings, key): - logging.debug('Set option {}={} from environment'.format(key, val)) + logging.debug("Set option {}={} from environment".format(key, val)) setattr(settings, key, os.getenv(key.upper())) @@ -438,15 +529,17 @@ def process_okta_aws_app_url(): :return: None. """ if not validate_okta_aws_app_url(settings.okta_aws_app_url): - logging.error("Okta Application URL not found, or invalid. Please check " - "your configuration and try again.") + logging.error( + "Okta Application URL not found, or invalid. Please check " + "your configuration and try again." + ) sys.exit(2) url = urlparse(settings.okta_aws_app_url) - okta_org = '{}://{}'.format(url.scheme, url.netloc) - okta_aws_app_url = '{}{}'.format(okta_org, url.path) - setattr(settings, 'okta_org', okta_org) - setattr(settings, 'okta_aws_app_url', okta_aws_app_url) + okta_org = "{}://{}".format(url.scheme, url.netloc) + okta_aws_app_url = "{}{}".format(okta_org, url.path) + setattr(settings, "okta_org", okta_org) + setattr(settings, "okta_aws_app_url", okta_aws_app_url) def user_configuration_input(): @@ -455,32 +548,31 @@ def user_configuration_input(): :return: (okta app url, organization username) """ logging.debug("Obtain user input for the user.") - url = '' - username = '' + url = "" + username = "" config_details = [] message = { - 'app_url': '\nOkta App URL. E.g https://acme.okta.com/home/' - 'amazon_aws/b07384d113edec49eaa6/123\n[none]: ', - 'username': '\nOrganization username. E.g jane.doe@acme.com' - '\n[none]: ' + "app_url": "\nOkta App URL. E.g https://acme.okta.com/home/" + "amazon_aws/b07384d113edec49eaa6/123\n[none]: ", + "username": "\nOrganization username. E.g jane.doe@acme.com" "\n[none]: ", } - while url == '': - user_data = to_unicode(input(message['app_url'])) + while url == "": + user_data = to_unicode(input(message["app_url"])) user_data = user_data.strip() if validate_okta_aws_app_url(user_data): url = user_data else: - print('Invalid input, try again.') + print("Invalid input, try again.") config_details.append(url) - while username == '': - user_data = to_unicode(input(message['username'])) + while username == "": + user_data = to_unicode(input(message["username"])) user_data = user_data.strip() - if user_data != '': + if user_data != "": username = user_data else: - print('Invalid input, try again.') + print("Invalid input, try again.") config_details.append(username) return (config_details[0], config_details[1]) @@ -511,16 +603,15 @@ def update_configuration(okta_file, profile): url = urlparse(app_url.strip()) okta_username = username.strip() - okta_aws_app_url = '{}://{}{}'.format(url.scheme, url.netloc, url.path) + okta_aws_app_url = "{}://{}{}".format(url.scheme, url.netloc, url.path) - config.set(profile, 'okta_aws_app_url', okta_aws_app_url) - config.set(profile, 'okta_username', okta_username) + config.set(profile, "okta_aws_app_url", okta_aws_app_url) + config.set(profile, "okta_username", okta_username) logging.debug("Config Okta [{}]".format(config)) - with open(okta_file, 'w+', encoding=settings.encoding) as file: + with open(okta_file, "w+", encoding=settings.encoding) as file: config.write(file) - logging.debug( - "Write new section Okta config [{} {}]".format(okta_file, config)) + logging.debug("Write new section Okta config [{} {}]".format(okta_file, config)) def set_local_credentials(assume_role_response, role_name, aws_region, aws_output): @@ -531,16 +622,15 @@ def set_local_credentials(assume_role_response, role_name, aws_region, aws_outpu :param aws_region configured region for aws credential profile: :param aws output configured datatype for aws cli output: """ - expiration_time = assume_role_response['Credentials']['Expiration'] - aws_access_key = assume_role_response['Credentials']['AccessKeyId'] - aws_secret_key = assume_role_response['Credentials']['SecretAccessKey'] - aws_session_token = assume_role_response['Credentials']['SessionToken'] + expiration_time = assume_role_response["Credentials"]["Expiration"] + aws_access_key = assume_role_response["Credentials"]["AccessKeyId"] + aws_secret_key = assume_role_response["Credentials"]["SecretAccessKey"] + aws_session_token = assume_role_response["Credentials"]["SessionToken"] if settings.aws_profile is not None: role_name = settings.aws_profile - update_aws_credentials(role_name, aws_access_key, aws_secret_key, - aws_session_token) + update_aws_credentials(role_name, aws_access_key, aws_secret_key, aws_session_token) update_aws_config(role_name, aws_output, aws_region) print_selected_role(role_name, expiration_time) @@ -565,10 +655,10 @@ def update_aws_credentials(profile, aws_access_key, aws_secret_key, aws_session_ config.read(cred_file, encoding=settings.encoding) if not config.has_section(profile): config.add_section(profile) - config.set(profile, 'aws_access_key_id', aws_access_key) - config.set(profile, 'aws_secret_access_key', aws_secret_key) - config.set(profile, 'aws_session_token', aws_session_token) - with open(cred_file, 'w+', encoding=settings.encoding) as file: + config.set(profile, "aws_access_key_id", aws_access_key) + config.set(profile, "aws_secret_access_key", aws_secret_key) + config.set(profile, "aws_session_token", aws_session_token) + with open(cred_file, "w+", encoding=settings.encoding) as file: config.write(file) @@ -588,16 +678,16 @@ def update_aws_config(profile, output, region): create_directory(config_dir) # Prepend the word profile the the profile name - profile = 'profile {}'.format(profile) + profile = "profile {}".format(profile) config = configparser.RawConfigParser() if os.path.isfile(config_file): config.read(config_file, encoding=settings.encoding) if not config.has_section(profile): config.add_section(profile) - config.set(profile, 'output', output) - config.set(profile, 'region', region) + config.set(profile, "output", output) + config.set(profile, "region", region) - with open(config_file, 'w+', encoding=settings.encoding) as file: + with open(config_file, "w+", encoding=settings.encoding) as file: config.write(file) @@ -644,7 +734,7 @@ def validate_input(value, valid_range): return integer_validation -def get_input(prompt='-> '): +def get_input(prompt="-> "): """Collect user input for TOTP. :return user_input: raw from user. @@ -687,7 +777,7 @@ def prepare_payload(**kwargs): for key, value in list(kwargs.items()): payload_dict[key] = value - if key != 'password': + if key != "password": logging.debug("Prepare payload [{} {}]".format(key, value)) return payload_dict diff --git a/tokendito/okta_helpers.py b/tokendito/okta_helpers.py index cf082adf..308d429f 100644 --- a/tokendito/okta_helpers.py +++ b/tokendito/okta_helpers.py @@ -7,12 +7,8 @@ 2. Update Okta Config File """ -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) import json import logging import sys @@ -37,14 +33,15 @@ def okta_verify_api_method(mfa_challenge_url, payload, headers=None): """ try: if headers: - response = requests.request('POST', mfa_challenge_url, - data=json.dumps(payload), headers=headers) - else: response = requests.request( - 'POST', mfa_challenge_url, data=payload) + "POST", mfa_challenge_url, data=json.dumps(payload), headers=headers + ) + else: + response = requests.request("POST", mfa_challenge_url, data=payload) except Exception as request_error: logging.error( - "There was an error connecting to Okta: \n{}".format(request_error)) + "There was an error connecting to Okta: \n{}".format(request_error) + ) sys.exit(1) logging.debug("Okta authentication response: \n{}".format(response)) @@ -55,11 +52,12 @@ def okta_verify_api_method(mfa_challenge_url, payload, headers=None): logging.debug("Received type of response: {}".format(type(response.text))) response = response.text - if 'errorCode' in response: + if "errorCode" in response: error_string = "Exiting due to Okta API error [{}]\n{}".format( - response['errorCode'], response['errorSummary']) - if len(response['errorCauses']) > 0: - error_string += "\n{}".format(json.dumps(response['errorCauses'])) + response["errorCode"], response["errorSummary"] + ) + if len(response["errorCauses"]) > 0: + error_string += "\n{}".format(json.dumps(response["errorCauses"])) logging.error(error_string) sys.exit(1) @@ -77,16 +75,15 @@ def authenticate_user(okta_url, okta_username, okta_password): """ logging.debug( "Authenticate user with okta credential [{} user {}]".format( - okta_url, okta_username)) - headers = { - 'content-type': 'application/json', - 'accept': 'application/json' - } - payload = helpers.prepare_payload( - username=okta_username, password=okta_password) + okta_url, okta_username + ) + ) + headers = {"content-type": "application/json", "accept": "application/json"} + payload = helpers.prepare_payload(username=okta_username, password=okta_password) primary_auth = okta_verify_api_method( - '{}/api/v1/authn'.format(okta_url), payload, headers) + "{}/api/v1/authn".format(okta_url), payload, headers + ) logging.debug("Authenticate Okta header [{}] ".format(headers)) session_token = user_mfa_challenge(headers, primary_auth) @@ -105,69 +102,74 @@ def user_mfa_challenge(headers, primary_auth): """ logging.debug("Handle user MFA challenges") try: - mfa_options = primary_auth['_embedded']['factors'] + mfa_options = primary_auth["_embedded"]["factors"] except KeyError: - logging.error("Okta auth failed: " - "Could not retrieve list of MFA methods") - logging.debug("Error parsing response: {}".format( - json.dumps(primary_auth))) + logging.error("Okta auth failed: " "Could not retrieve list of MFA methods") + logging.debug("Error parsing response: {}".format(json.dumps(primary_auth))) sys.exit(1) mfa_setup_statuses = [ - d['status'] for d in mfa_options if 'status' in d and d['status'] != "ACTIVE"] + d["status"] for d in mfa_options if "status" in d and d["status"] != "ACTIVE" + ] if len(mfa_setup_statuses) == len(mfa_options): - logging.error("MFA not configured. " - "Please enable MFA on your account and try again.") + logging.error( + "MFA not configured. " "Please enable MFA on your account and try again." + ) sys.exit(2) preset_mfa = settings.mfa_method - available_mfas = [d['factorType'] for d in mfa_options] + available_mfas = [d["factorType"] for d in mfa_options] if preset_mfa is not None and preset_mfa in available_mfas: mfa_index = available_mfas.index(settings.mfa_method) else: logging.warning( "No MFA provided or provided MFA does not exist. [{}]".format( - settings.mfa_method)) + settings.mfa_method + ) + ) mfa_index = helpers.select_preferred_mfa_index(mfa_options) # time to challenge the mfa option selected_mfa_option = mfa_options[mfa_index] logging.debug("Selected MFA is [{}]".format(selected_mfa_option)) - mfa_challenge_url = selected_mfa_option['_links']['verify']['href'] + mfa_challenge_url = selected_mfa_option["_links"]["verify"]["href"] - payload = helpers.prepare_payload(stateToken=primary_auth['stateToken'], - factorType=selected_mfa_option['factorType'], - provider=selected_mfa_option['provider'], - profile=selected_mfa_option['profile']) - selected_factor = okta_verify_api_method( - mfa_challenge_url, payload, headers) + payload = helpers.prepare_payload( + stateToken=primary_auth["stateToken"], + factorType=selected_mfa_option["factorType"], + provider=selected_mfa_option["provider"], + profile=selected_mfa_option["profile"], + ) + selected_factor = okta_verify_api_method(mfa_challenge_url, payload, headers) mfa_provider = selected_factor["_embedded"]["factor"]["provider"].lower() - logging.debug("MFA Challenge URL: [{}] headers: {}".format( - mfa_challenge_url, headers)) + logging.debug( + "MFA Challenge URL: [{}] headers: {}".format(mfa_challenge_url, headers) + ) if mfa_provider == "duo": - payload, headers, callback_url = duo_helpers.authenticate_duo( - selected_factor) + payload, headers, callback_url = duo_helpers.authenticate_duo(selected_factor) okta_verify_api_method(callback_url, payload) payload.pop("id", "sig_response") - mfa_verify = okta_verify_api_method( - mfa_challenge_url, payload, headers) + mfa_verify = okta_verify_api_method(mfa_challenge_url, payload, headers) elif mfa_provider == "okta" or mfa_provider == "google": - mfa_verify = user_mfa_options(selected_mfa_option, - headers, mfa_challenge_url, payload, primary_auth) + mfa_verify = user_mfa_options( + selected_mfa_option, headers, mfa_challenge_url, payload, primary_auth + ) else: - logging.error("Sorry, the MFA provider '{}' is not yet supported." - " Please retry with another option.".format(mfa_provider)) + logging.error( + "Sorry, the MFA provider '{}' is not yet supported." + " Please retry with another option.".format(mfa_provider) + ) exit(1) - return mfa_verify['sessionToken'] + return mfa_verify["sessionToken"] -def user_mfa_options(selected_mfa_option, - headers, mfa_challenge_url, - payload, primary_auth): +def user_mfa_options( + selected_mfa_option, headers, mfa_challenge_url, payload, primary_auth +): """Handle user mfa options. :param selected_mfa_option: Selected MFA option (SMS, push, etc) @@ -180,19 +182,21 @@ def user_mfa_options(selected_mfa_option, """ logging.debug("Handle user MFA options") - logging.debug("User MFA options selected: [{}]".format( - selected_mfa_option['factorType'])) - if selected_mfa_option['factorType'] == 'push': + logging.debug( + "User MFA options selected: [{}]".format(selected_mfa_option["factorType"]) + ) + if selected_mfa_option["factorType"] == "push": return push_approval(headers, mfa_challenge_url, payload) if settings.mfa_response is None: logging.debug("Getting verification code from user.") - print('Type verification code and press Enter') + print("Type verification code and press Enter") settings.mfa_response = helpers.get_input() # time to verify the mfa method payload = helpers.prepare_payload( - stateToken=primary_auth['stateToken'], passCode=settings.mfa_response) + stateToken=primary_auth["stateToken"], passCode=settings.mfa_response + ) mfa_verify = okta_verify_api_method(mfa_challenge_url, payload, headers) logging.debug("mfa_verify [{}]".format(json.dumps(mfa_verify))) @@ -208,36 +212,35 @@ def push_approval(headers, mfa_challenge_url, payload): :return: Session Token if succeeded or terminates if user wait goes 5 min """ - logging.debug("Handle push approval from the user [{}] [{}]".format( - headers, mfa_challenge_url)) + logging.debug( + "Handle push approval from the user [{}] [{}]".format( + headers, mfa_challenge_url + ) + ) - print('Waiting for an approval from device...') + print("Waiting for an approval from device...") mfa_status = "WAITING" while mfa_status == "WAITING": - mfa_verify = okta_verify_api_method( - mfa_challenge_url, payload, headers) + mfa_verify = okta_verify_api_method(mfa_challenge_url, payload, headers) logging.debug("MFA Response:\n{}".format(json.dumps(mfa_verify))) - if 'factorResult' in mfa_verify: - mfa_status = mfa_verify['factorResult'] - elif mfa_verify['status'] == 'SUCCESS': + if "factorResult" in mfa_verify: + mfa_status = mfa_verify["factorResult"] + elif mfa_verify["status"] == "SUCCESS": break else: - logging.error( - "There was an error getting your MFA status.") + logging.error("There was an error getting your MFA status.") logging.debug(mfa_verify) - if 'status' in mfa_verify: - logging.error("Exiting due to error: {}".format( - mfa_verify['status'])) + if "status" in mfa_verify: + logging.error("Exiting due to error: {}".format(mfa_verify["status"])) sys.exit(1) - if mfa_status == 'REJECTED': - logging.error( - "The Okta Verify push has been denied. Please retry later.") + if mfa_status == "REJECTED": + logging.error("The Okta Verify push has been denied. Please retry later.") sys.exit(2) - elif mfa_status == 'TIMEOUT': + elif mfa_status == "TIMEOUT": logging.error("Device approval window has expired.") sys.exit(2) diff --git a/tokendito/settings.py b/tokendito/settings.py index f627c147..59ca2c07 100644 --- a/tokendito/settings.py +++ b/tokendito/settings.py @@ -1,32 +1,29 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """This module is responsible for initialisation of global variables.""" -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) from os.path import expanduser import sys from future import standard_library + standard_library.install_aliases() -config_dir = expanduser('~') + '/.aws' -config_file = config_dir + '/okta_auth' -aws_config_file = config_dir + '/config' -aws_shared_credentials_file = config_dir + '/credentials' -aws_output = 'json' +config_dir = expanduser("~") + "/.aws" +config_file = config_dir + "/okta_auth" +aws_config_file = config_dir + "/config" +aws_shared_credentials_file = config_dir + "/credentials" +aws_output = "json" aws_profile = None -aws_region = 'us-east-1' +aws_region = "us-east-1" encoding = sys.stdin.encoding mfa_method = None mfa_response = None okta_aws_app_url = None okta_org = None -okta_password = '' -okta_profile = 'default' -okta_username = '' +okta_password = "" +okta_profile = "default" +okta_username = "" role_arn = None diff --git a/tokendito/tokendito.py b/tokendito/tokendito.py index 506aa936..93004ff0 100755 --- a/tokendito/tokendito.py +++ b/tokendito/tokendito.py @@ -2,15 +2,12 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """tokendito cli entry point.""" -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) import sys from future import standard_library + standard_library.install_aliases() @@ -18,15 +15,17 @@ def main(args=None): # needed for console script """Packge entry point.""" if __package__ is None: import os.path + path = os.path.dirname(os.path.dirname(__file__)) sys.path[0:0] = [path] from tokendito.tool import cli + return cli(args) -if __name__ == '__main__': +if __name__ == "__main__": try: sys.exit(main(sys.argv[1:])) except KeyboardInterrupt: - print('\nInterrupted') + print("\nInterrupted") sys.exit(1) diff --git a/tokendito/tool.py b/tokendito/tool.py index b9d212e2..6a9a4141 100644 --- a/tokendito/tool.py +++ b/tokendito/tool.py @@ -1,12 +1,8 @@ # vim: set filetype=python ts=4 sw=4 # -*- coding: utf-8 -*- """This module retrieves AWS credentials after authenticating with Okta.""" -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals -from builtins import (ascii, bytes, chr, dict, filter, hex, input, # noqa: F401 - int, list, map, next, object, oct, open, pow, range, - round, str, super, zip) import logging from future import standard_library @@ -23,9 +19,7 @@ def cli(args): # Set some required initial values args = helpers.setup(args) - logging.debug( - "tokendito retrieves AWS credentials after authenticating with Okta." - ) + logging.debug("tokendito retrieves AWS credentials after authenticating with Okta.") # Collect and organize user specific information helpers.process_options(args) @@ -34,15 +28,19 @@ def cli(args): logging.debug("Authenticate user with Okta and AWS.") secret_session_token = okta_helpers.authenticate_user( - settings.okta_org, settings.okta_username, settings.okta_password) + settings.okta_org, settings.okta_username, settings.okta_password + ) saml_response_string, saml_xml = aws_helpers.authenticate_to_roles( - secret_session_token, settings.okta_aws_app_url) + secret_session_token, settings.okta_aws_app_url + ) assume_role_response, role_name = aws_helpers.select_assumeable_role( - saml_response_string, saml_xml) + saml_response_string, saml_xml + ) aws_helpers.ensure_keys_work(assume_role_response) - helpers.set_local_credentials(assume_role_response, role_name, - settings.aws_region, settings.aws_output) + helpers.set_local_credentials( + assume_role_response, role_name, settings.aws_region, settings.aws_output + ) diff --git a/tox.ini b/tox.ini index 716f90f7..e6b316e5 100755 --- a/tox.ini +++ b/tox.ini @@ -4,9 +4,9 @@ envlist = lint, py{27,35,36,37,38} [testenv] deps = -r requirements-dev.txt -commands = - py.test -v -rA -k 'not tests/functional' -s tests/ - py.test -v -rA -k 'tests/functional' -s tests/ +commands = + py.test -v -rA -k 'not functional' -s tests/ + py.test -v -rA -k 'functional' -s tests/ [testenv:lint] skip_install = true @@ -26,6 +26,7 @@ python = max-line-length = 100 max-complexity = 8 exclude = .git, __pycache__, .tox, build/, venv/ +extend-ignore = E203, W503 import-order-style = google application-import-names = flake8 format = ${cyan}%(path)s${reset}:${yellow_bold}%(row)d${reset}:${green_bold}%(col)d${reset}: ${red_bold}%(code)s${reset} %(text)s @@ -33,4 +34,4 @@ format = ${cyan}%(path)s${reset}:${yellow_bold}%(row)d${reset}:${green_bold}%(co [pytest] env = PIP_DISABLE_PIP_VERSION_CHECK=1 - PYTHONWARNINGS=ignore:DEPRECATION + PYTHONWARNINGS=ignore:DEPRECATION, ignore::UserWarning, ignore::DeprecationWarning