Skip to content

Commit

Permalink
leverage new type syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
gruebel committed Jul 24, 2024
1 parent a186c8f commit 23afbe1
Show file tree
Hide file tree
Showing 21 changed files with 247 additions and 235 deletions.
8 changes: 4 additions & 4 deletions cloudsplaining/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# pylint: disable=missing-module-docstring
from __future__ import annotations

import logging
import sys

Expand All @@ -8,8 +10,6 @@
# Uncomment to get the full debug logs.
# 2020-10-06 10:04:17,200 - root - DEBUG - Leveraging the bundled IAM Definition.
# Need to figure out how to get click_log to do this for me.
from typing import Optional, Union

logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)
handler = logging.StreamHandler(sys.stdout)
Expand All @@ -22,7 +22,7 @@
name = "cloudsplaining" # pylint: disable=invalid-name


def change_log_level(log_level: Union[int, str]) -> None:
def change_log_level(log_level: int | str) -> None:
""" "Change log level of module logger"""
logger.setLevel(log_level)

Expand All @@ -31,7 +31,7 @@ def change_log_level(log_level: Union[int, str]) -> None:
def set_stream_logger(
name: str = "cloudsplaining",
level: int = logging.DEBUG,
format_string: Optional[str] = None,
format_string: str | None = None,
) -> None:
"""
Add a stream handler for the given name and level to the logging module.
Expand Down
8 changes: 4 additions & 4 deletions cloudsplaining/command/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import json
import logging
import os
from typing import TYPE_CHECKING, Any, Dict, List
from typing import TYPE_CHECKING, Any

import boto3
import click
Expand Down Expand Up @@ -76,14 +76,14 @@ def download(profile: str, output: str, include_non_default_policy_versions: boo


def get_account_authorization_details(
session_data: Dict[str, str], include_non_default_policy_versions: bool
) -> Dict[str, List[Any]]:
session_data: dict[str, str], include_non_default_policy_versions: bool
) -> dict[str, list[Any]]:
"""Runs aws-iam-get-account-authorization-details"""
session = boto3.Session(**session_data) # type:ignore[arg-type]
config = Config(connect_timeout=5, retries={"max_attempts": 10})
iam_client: IAMClient = session.client("iam", config=config)

results: Dict[str, List[Any]] = {
results: dict[str, list[Any]] = {
"UserDetailList": [],
"GroupDetailList": [],
"RoleDetailList": [],
Expand Down
18 changes: 9 additions & 9 deletions cloudsplaining/command/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import os
import webbrowser
from pathlib import Path
from typing import Any, Dict, List, Literal, cast, overload
from typing import Any, Literal, cast, overload

import click
import yaml
Expand Down Expand Up @@ -104,7 +104,7 @@ def scan(
minimize: bool,
flag_all_risky_actions: bool,
verbosity: int,
severity: List[str],
severity: list[str],
) -> None: # pragma: no cover
"""
Given the path to account authorization details files and the exclusions config file, scan all inline and
Expand Down Expand Up @@ -198,7 +198,7 @@ def scan(

@overload
def scan_account_authorization_details(
account_authorization_details_cfg: Dict[str, Any],
account_authorization_details_cfg: dict[str, Any],
exclusions: Exclusions,
account_name: str,
output_directory: str,
Expand All @@ -207,13 +207,13 @@ def scan_account_authorization_details(
return_json_results: Literal[True],
flag_conditional_statements: bool = ...,
flag_resource_arn_statements: bool = ...,
severity: List[str] | None = ...,
severity: list[str] | None = ...,
) -> dict[str, Any]: ...


@overload
def scan_account_authorization_details(
account_authorization_details_cfg: Dict[str, Any],
account_authorization_details_cfg: dict[str, Any],
exclusions: Exclusions,
account_name: str = ...,
output_directory: str = ...,
Expand All @@ -222,12 +222,12 @@ def scan_account_authorization_details(
return_json_results: Literal[False] = ...,
flag_conditional_statements: bool = ...,
flag_resource_arn_statements: bool = ...,
severity: List[str] | None = ...,
severity: list[str] | None = ...,
) -> str: ...


def scan_account_authorization_details(
account_authorization_details_cfg: Dict[str, Any],
account_authorization_details_cfg: dict[str, Any],
exclusions: Exclusions,
account_name: str = "default",
output_directory: str = os.getcwd(),
Expand All @@ -236,7 +236,7 @@ def scan_account_authorization_details(
return_json_results: bool = False,
flag_conditional_statements: bool = False,
flag_resource_arn_statements: bool = False,
severity: List[str] | None = None,
severity: list[str] | None = None,
) -> str | dict[str, Any]: # pragma: no cover
"""
Given the path to account authorization details files and the exclusions config file, scan all inline and
Expand Down Expand Up @@ -294,7 +294,7 @@ def scan_account_authorization_details(

def get_authorization_files_in_directory(
directory: str,
) -> List[str]: # pragma: no cover
) -> list[str]: # pragma: no cover
"""Get a list of download-account-authorization-files in a directory"""
file_list_with_full_path = [file.absolute() for file in Path(directory).glob("*.json")]

Expand Down
30 changes: 15 additions & 15 deletions cloudsplaining/command/scan_multi_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import json
import logging
import os
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
from typing import TYPE_CHECKING, Any, cast

import click
import yaml
Expand All @@ -32,14 +32,14 @@
class MultiAccountConfig:
"""Handle the YAML file that parses the Multi-account config"""

def __init__(self, config: Dict[str, Any], role_name: str) -> None:
def __init__(self, config: dict[str, Any], role_name: str) -> None:
# self.config_file = config_file
self.config = config
self.role_name = role_name
self.accounts = self._accounts()

def _accounts(self) -> Dict[str, str]:
accounts: Dict[str, str] = self.config.get("accounts", None)
def _accounts(self) -> dict[str, str]:
accounts: dict[str, str] | None = self.config.get("accounts")
if not accounts:
raise Exception("Please supply a list of accounts in the multi-account config file")
return accounts
Expand Down Expand Up @@ -130,7 +130,7 @@ def scan_multi_account(
write_data_file: bool,
flag_all_risky_actions: bool,
verbosity: int,
severity: List[str],
severity: list[str],
) -> None:
"""Scan multiple accounts via AssumeRole"""
set_log_level(verbosity)
Expand Down Expand Up @@ -168,10 +168,10 @@ def scan_accounts(
exclusions: Exclusions,
role_name: str,
write_data_file: bool,
profile: Optional[str] = None,
output_directory: Optional[str] = None,
output_bucket: Optional[str] = None,
severity: List[str] | None = None,
profile: str | None = None,
output_directory: str | None = None,
output_bucket: str | None = None,
severity: list[str] | None = None,
flag_conditional_statements: bool = False,
flag_resource_arn_statements: bool = False,
) -> None:
Expand Down Expand Up @@ -229,11 +229,11 @@ def scan_account(
target_account_id: str,
target_role_name: str,
exclusions: Exclusions,
profile: Optional[str] = None,
severity: List[str] | None = None,
profile: str | None = None,
severity: list[str] | None = None,
flag_conditional_statements: bool = False,
flag_resource_arn_statements: bool = False,
) -> Dict[str, Dict[str, Any]]:
) -> dict[str, dict[str, Any]]:
"""Scan a target account in one shot"""
account_authorization_details = download_account_authorization_details(
target_account_id=target_account_id,
Expand All @@ -253,8 +253,8 @@ def scan_account(


def download_account_authorization_details(
target_account_id: str, target_role_name: str, profile: Optional[str] = None
) -> Dict[str, List[Dict[str, Any]]]:
target_account_id: str, target_role_name: str, profile: str | None = None
) -> dict[str, list[dict[str, Any]]]:
"""Download the account authorization details from a target account"""
(
aws_access_key_id,
Expand All @@ -276,7 +276,7 @@ def download_account_authorization_details(
return authorization_details


def get_exclusions(exclusions_file: Optional[str] = None) -> Exclusions:
def get_exclusions(exclusions_file: str | None = None) -> Exclusions:
"""Get the exclusions configuration from a file"""
# Get the exclusions configuration
if exclusions_file:
Expand Down
12 changes: 6 additions & 6 deletions cloudsplaining/command/scan_policy_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import json
import logging
import sys
from typing import Any, Dict, List, cast
from typing import Any, cast

import click
import yaml
Expand Down Expand Up @@ -70,7 +70,7 @@ def scan_policy_file(
high_priority_only: bool,
flag_all_risky_actions: bool,
verbosity: int,
severity: List[str],
severity: list[str],
) -> None: # pragma: no cover
"""Scan a single policy file to identify missing resource constraints."""
set_log_level(verbosity)
Expand Down Expand Up @@ -162,12 +162,12 @@ def scan_policy_file(


def scan_policy(
policy_json: Dict[str, Any],
exclusions_config: Dict[str, List[str]] = DEFAULT_EXCLUSIONS_CONFIG,
policy_json: dict[str, Any],
exclusions_config: dict[str, list[str]] = DEFAULT_EXCLUSIONS_CONFIG,
flag_conditional_statements: bool = False,
flag_resource_arn_statements: bool = False,
severity: List[str] | None = None,
) -> Dict[str, Dict[str, Any]]:
severity: list[str] | None = None,
) -> dict[str, dict[str, Any]]:
"""
Scan a policy document for missing resource constraints.
Expand Down
20 changes: 10 additions & 10 deletions cloudsplaining/output/policy_finding.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

import logging
from typing import Any, Dict, List
from typing import Any

from cloudsplaining.scan.policy_document import PolicyDocument
from cloudsplaining.shared.constants import (
Expand All @@ -33,7 +33,7 @@ def __init__(
self,
policy_document: PolicyDocument,
exclusions: Exclusions = DEFAULT_EXCLUSIONS,
severity: List[str] | None = None,
severity: list[str] | None = None,
) -> None:
"""
Supply a PolicyDocument object and Exclusions object to get a single policy finding
Expand All @@ -47,7 +47,7 @@ def __init__(
self.missing_resource_constraints_for_modify_actions = self._missing_resource_constraints_for_modify_actions()
self.severity = [] if severity is None else severity

def _missing_resource_constraints_for_modify_actions(self) -> List[str]:
def _missing_resource_constraints_for_modify_actions(self) -> list[str]:
"""Find modify actions that lack resource ARN constraints"""
actions_missing_resource_constraints = set()
for statement in self.policy_document.statements:
Expand All @@ -59,7 +59,7 @@ def _missing_resource_constraints_for_modify_actions(self) -> List[str]:
return sorted(actions_missing_resource_constraints)

@property
def services_affected(self) -> List[str]:
def services_affected(self) -> list[str]:
"""Return a list of AWS service prefixes affected by the policy in question."""
services_affected = set()
for action in self.missing_resource_constraints_for_modify_actions:
Expand All @@ -78,7 +78,7 @@ def services_affected(self) -> List[str]:
return sorted(services_affected)

@property
def resource_exposure(self) -> List[str]:
def resource_exposure(self) -> list[str]:
"""Return a list of actions that could cause resource exposure via actions at the 'Permissions management'
access level, if applicable."""
if self.always_exclude_actions:
Expand All @@ -92,12 +92,12 @@ def resource_exposure(self) -> List[str]:
return self.policy_document.permissions_management_without_constraints

@property
def privilege_escalation(self) -> List[Dict[str, Any]]:
def privilege_escalation(self) -> list[dict[str, Any]]:
"""Returns privilege escalation action combinations in the policy, if present"""
return self.policy_document.allows_privilege_escalation

@property
def data_exfiltration(self) -> List[str]:
def data_exfiltration(self) -> list[str]:
"""Returns data exfiltration actions in the policy, if present"""
result = [
action
Expand All @@ -109,12 +109,12 @@ def data_exfiltration(self) -> List[str]:
return result

@property
def service_wildcard(self) -> List[str]:
def service_wildcard(self) -> list[str]:
"""Determine if the policy gives access to all actions within a service - simple grepping"""
return self.policy_document.service_wildcard

@property
def credentials_exposure(self) -> List[str]:
def credentials_exposure(self) -> list[str]:
"""Determine if the action returns credentials"""
# https://gist.github.com/kmcquade/33860a617e651104d243c324ddf7992a
results = [
Expand All @@ -127,7 +127,7 @@ def credentials_exposure(self) -> List[str]:
return results

@property
def results(self) -> Dict[str, Any]:
def results(self) -> dict[str, Any]:
"""Return the results as JSON"""
findings = dict(
ServiceWildcard={
Expand Down
6 changes: 4 additions & 2 deletions cloudsplaining/output/report.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Creates the HTML Reports"""

from __future__ import annotations

import datetime
import json
import os.path
from pathlib import Path
from typing import Any, Dict
from typing import Any

from jinja2 import Environment, FileSystemLoader

Expand All @@ -20,7 +22,7 @@ def __init__(
self,
account_id: str,
account_name: str,
results: Dict[str, Dict[str, Any]],
results: dict[str, dict[str, Any]],
minimize: bool = False,
) -> None:
self.account_name = account_name
Expand Down
Loading

0 comments on commit 23afbe1

Please sign in to comment.