-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: device auth * fix: check env * add: package types request
- Loading branch information
Showing
12 changed files
with
1,569 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ | |
**/obj | ||
**/secrets.dev.yaml | ||
**/values.dev.yaml | ||
README.md | ||
docs/* | ||
tmp | ||
.venv | ||
cli.py | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,37 @@ | ||
# For more information, please refer to https://aka.ms/vscode-docker-python | ||
FROM python:3.10-slim | ||
FROM python:3.10-slim as poetry | ||
|
||
RUN pip install -U pip setuptools && pip install --no-cache-dir -U poetry | ||
|
||
WORKDIR /app | ||
|
||
COPY poetry.lock pyproject.toml ./ | ||
|
||
RUN poetry export --without=dev,docs --output requirements.txt | ||
|
||
# == Main image == # | ||
FROM python:3.10-slim | ||
EXPOSE 5000 | ||
|
||
# Keeps Python from generating .pyc files in the container | ||
ENV PYTHONDONTWRITEBYTECODE=1 | ||
|
||
# Turns off buffering for easier container logging | ||
ENV PYTHONUNBUFFERED=1 | ||
|
||
RUN pip install -U pip setuptools | ||
|
||
WORKDIR /app | ||
|
||
RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser /app | ||
|
||
# Install pip requirements | ||
COPY requirements.txt . | ||
COPY --from=poetry /app/requirements.txt ./ | ||
RUN python -m pip install --no-cache-dir -r requirements.txt | ||
|
||
# # Creates a non-root user with an explicit UID and adds permission to access the /app folder | ||
USER appuser | ||
COPY broker /app | ||
# Creates a non-root user with an explicit UID and adds permission to access the /app folder | ||
# For more info, please refer to https://aka.ms/vscode-docker-python-configure-containers | ||
|
||
# During debugging, this entry point will be overridden. For more information, please refer to https://aka.ms/vscode-docker-python-debug | ||
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "routes:app"] | ||
COPY broker ./ | ||
|
||
# # During debugging, this entry point will be overridden. For more information, please refer to https://aka.ms/vscode-docker-python-debug | ||
CMD ["bash", "-c", "gunicorn --bind 0.0.0.0:5000 routes:app"] |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
# ref: | ||
import time | ||
import webbrowser | ||
|
||
import typer | ||
import requests | ||
|
||
from utils import pprint | ||
|
||
ALGORITHMS = ["RS256"] | ||
|
||
# New code 👇 | ||
def login(domain: str, client_id: str, audience: str = None): | ||
""" | ||
Runs the device authorization flow and stores the user object in memory | ||
""" | ||
device_code_payload = { | ||
"client_id": client_id, | ||
"scope": "openid email profile", | ||
"audience": audience, | ||
} | ||
|
||
well_known = requests.get( | ||
f"https://{domain}/.well-known/openid-configuration" | ||
).json() | ||
|
||
device_code_response = requests.post( | ||
well_known["device_authorization_endpoint"], | ||
data=device_code_payload, | ||
) | ||
|
||
if device_code_response.status_code != 200: | ||
print("Error generating the device code") | ||
raise typer.Exit(code=1) | ||
|
||
pprint("Device code successful") | ||
device_code_data = device_code_response.json() | ||
|
||
pprint( | ||
"1. On your computer or mobile device navigate to: ", | ||
device_code_data["verification_uri_complete"], | ||
) | ||
pprint("2. Enter the following code: ", device_code_data["user_code"]) | ||
|
||
webbrowser.open_new(device_code_data["verification_uri_complete"]) | ||
|
||
token_payload = { | ||
"grant_type": "urn:ietf:params:oauth:grant-type:device_code", | ||
"device_code": device_code_data["device_code"], | ||
"client_id": client_id, | ||
} | ||
|
||
authenticated = False | ||
while not authenticated: | ||
pprint("- Checking if the user completed the flow...") | ||
token_response = requests.post( | ||
well_known["token_endpoint"], | ||
data=token_payload, | ||
) | ||
|
||
token_data = token_response.json() | ||
if token_response.status_code == 200: | ||
pprint("Authenticated!") | ||
authenticated = True | ||
elif token_data["error"] not in ("authorization_pending", "slow_down"): | ||
pprint(token_data["error_description"]) | ||
raise typer.Exit(code=1) | ||
else: | ||
time.sleep(device_code_data["interval"]) | ||
|
||
return token_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
import os | ||
from typing import Optional | ||
|
||
import typer | ||
import jwt | ||
from rich.prompt import Prompt | ||
|
||
from auth import login as _login, ALGORITHMS | ||
from utils import HOME, md5hash, pprint | ||
from config import ( | ||
profiles, | ||
Profile, | ||
AwsConfig, | ||
awsconfig, | ||
_check_wellknown_openid, | ||
_check_aws_iam, | ||
) | ||
|
||
app = typer.Typer(help=f"AWS broker for different auth") | ||
prompt = Prompt() | ||
|
||
|
||
@app.command() | ||
def config( | ||
profile: str = typer.Option( | ||
None, | ||
"--profile", | ||
"-p", | ||
help="aws profile name to be created", | ||
prompt="AWS cli profile name", | ||
), | ||
role: str = typer.Option( | ||
None, | ||
"--role", | ||
help="AWS IAM role arn which has to be accessed", | ||
prompt="AWS IAM OpenID fedrated role arn", | ||
callback=_check_aws_iam, | ||
), | ||
client_wellknown: str = typer.Option( | ||
None, | ||
"--client-wellknown", | ||
help="auth oidc provider .well-known/openid-configuration url domain.", | ||
prompt=f"OpenID auth provider client wellknown url", | ||
callback=_check_wellknown_openid, | ||
), | ||
client_id: str = typer.Option( | ||
None, | ||
"--client-id", | ||
help="Auth identification value assigned to your application after registration.", | ||
prompt="OpenID auth provider client_id", | ||
), | ||
audience: Optional[str] = typer.Option( | ||
None, | ||
"--audience", | ||
help="OPTIONAL: Audience value is either the application (Client ID) for an ID Token or the API that is being called (API Identifier) for an Access Token.", | ||
), | ||
): | ||
|
||
if audience == None: | ||
prompt.ask(f"OPTIONAL: OpenID auth provider audience", default=None) | ||
|
||
profiles.set( | ||
key=profile, | ||
value=Profile( | ||
role_arn=role, | ||
client_id=client_id, | ||
client_wellknown=client_wellknown, | ||
audience=audience, | ||
), | ||
) | ||
profiles.save() | ||
|
||
|
||
@app.command(name="login") | ||
def login( | ||
profile: str = typer.Argument(None, help="auth via oidc provider for aws access") | ||
): | ||
_p: Profile = profiles.get(profile) | ||
token_data = _login(domain=_p.client_wellknown, client_id=_p.client_id) | ||
|
||
access_token = token_data["access_token"] | ||
pprint( | ||
"[yellow bold]access_token payload => ", | ||
jwt.decode( | ||
access_token, | ||
algorithms=ALGORITHMS, | ||
options={"verify_signature": False}, | ||
), | ||
) | ||
|
||
filename = md5hash(profile) | ||
filepath = f"{HOME}/.aws/cli/cache" | ||
|
||
if not os.path.exists(filepath): | ||
os.makedirs(filepath) | ||
|
||
with open(f"{filepath}/{filename}", "w") as token_file: | ||
token_file.write(access_token) | ||
|
||
awsconfig( | ||
profile=profile, | ||
aws_config=AwsConfig( | ||
web_identity_token_file=f"{filepath}/{filename}", role_arn=_p.role_arn | ||
), | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
app() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
import configparser | ||
from dataclasses import dataclass | ||
from typing import Optional | ||
import re | ||
|
||
import requests | ||
import typer | ||
|
||
from utils import State, HOME, Logger | ||
|
||
STATE_CONFIG_FILE = f"{HOME}/.aws/oidc-profiles.json" | ||
|
||
logger = Logger() | ||
|
||
|
||
@dataclass | ||
class Profile: | ||
role_arn: str | ||
client_wellknown: str | ||
client_id: str | ||
audience: Optional[str] = "" | ||
|
||
|
||
@dataclass | ||
class AwsConfig: | ||
"""ref: https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html""" | ||
|
||
web_identity_token_file: str | ||
role_arn: str | ||
|
||
|
||
profiles = State(STATE_CONFIG_FILE, obj=Profile) | ||
|
||
|
||
def awsconfig( | ||
profile: str, aws_config: AwsConfig, awsconfig_file=f"{HOME}/.aws/config" | ||
): | ||
profile = f"profile {profile}" | ||
|
||
config = configparser.ConfigParser() | ||
config.read(awsconfig_file) | ||
write_flag = False | ||
|
||
if not config.has_section(profile): | ||
config.add_section(profile) | ||
|
||
for arg in aws_config.__annotations__: | ||
if not config.has_option(profile, arg): | ||
config.set(profile, arg, aws_config.__dict__[arg]) | ||
write_flag = True | ||
|
||
elif aws_config.__dict__[arg] != config[profile][arg]: | ||
config[profile][arg] = aws_config.__dict__[arg] | ||
write_flag = True | ||
|
||
if write_flag: | ||
with open(awsconfig_file, "w") as configfile: | ||
config.write(configfile) | ||
|
||
|
||
def _check_aws_iam(arn: str): | ||
if re.search(r"^(arn:aws:iam::)([0-9]{12}):role\/([a-zA-Z0-9\-]+)", arn): | ||
return arn | ||
else: | ||
raise typer.BadParameter("Invalid iam role arn.") | ||
|
||
|
||
def _check_wellknown_openid(url: str): | ||
|
||
domain = re.sub(r"(https?:\/\/)?(\.well-known.+)?", "", url) | ||
|
||
if domain[-1] == "/": | ||
domain = domain[:-1] | ||
|
||
try: | ||
requests.get(f"https://{domain}/.well-known/openid-configuration").json() | ||
except: | ||
raise typer.BadParameter( | ||
"Invalid openid-configuration domain.", param_hint="op" | ||
) | ||
|
||
return domain |
Oops, something went wrong.