Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add typing in signer.py #1340

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 101 additions & 47 deletions minio/signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,42 @@

"""

from __future__ import absolute_import, annotations

import hashlib
import hmac
import re
from collections import OrderedDict
from datetime import datetime
from urllib.parse import SplitResult

from . import time
from .credentials import Credentials
from .helpers import queryencode, sha256_hash

SIGN_V4_ALGORITHM = 'AWS4-HMAC-SHA256'
_MULTI_SPACE_REGEX = re.compile(r"( +)")


def _hmac_hash(key, data, hexdigest=False):
def _hmac_hash(
key: bytes | bytearray,
data: bytes,
hexdigest: bool = False,
) -> bytes | str:
"""Return HMacSHA256 digest of given key and data."""

hasher = hmac.new(key, data, hashlib.sha256)
return hasher.hexdigest() if hexdigest else hasher.digest()


def _get_scope(date, region, service_name):
def _get_scope(date: datetime, region: str, service_name: str) -> str:
"""Get scope string."""
return f"{time.to_signer_date(date)}/{region}/{service_name}/aws4_request"


def _get_canonical_headers(headers):
def _get_canonical_headers(
headers: dict[str, str | list[str] | tuple[str]],
) -> tuple[dict[str, str], str]:
"""Get canonical headers."""

canonical_headers = {}
Expand All @@ -74,7 +84,7 @@ def _get_canonical_headers(headers):
return canonical_headers, signed_headers


def _get_canonical_query_string(query):
def _get_canonical_query_string(query: str) -> str:
"""Get canonical query string."""

query = query or ""
Expand All @@ -87,7 +97,12 @@ def _get_canonical_query_string(query):
)


def _get_canonical_request_hash(method, url, headers, content_sha256):
def _get_canonical_request_hash(
method: str,
url: SplitResult,
headers: dict[str, str | list[str] | tuple[str]],
content_sha256: str,
) -> tuple[str, str]:
"""Get canonical request hash."""
canonical_headers, signed_headers = _get_canonical_headers(headers)
canonical_query_string = _get_canonical_query_string(url.query)
Expand All @@ -110,35 +125,64 @@ def _get_canonical_request_hash(method, url, headers, content_sha256):
return sha256_hash(canonical_request), signed_headers


def _get_string_to_sign(date, scope, canonical_request_hash):
def _get_string_to_sign(
date: datetime,
scope: str,
canonical_request_hash: str,
) -> str:
"""Get string-to-sign."""
return (
f"AWS4-HMAC-SHA256\n{time.to_amz_date(date)}\n{scope}\n"
f"{canonical_request_hash}"
)


def _get_signing_key(secret_key, date, region, service_name):
def _get_signing_key(
secret_key: str,
date: datetime,
region: str,
service_name: str,
) -> bytes:
"""Get signing key."""

date_key = _hmac_hash(
("AWS4" + secret_key).encode(),
time.to_signer_date(date).encode(),
)
if isinstance(date_key, str):
date_key = date_key.encode()

date_region_key = _hmac_hash(date_key, region.encode())
if isinstance(date_region_key, str):
date_region_key = date_region_key.encode()

date_region_service_key = _hmac_hash(
date_region_key, service_name.encode(),
)
return _hmac_hash(date_region_service_key, b"aws4_request")
if isinstance(date_region_service_key, str):
date_region_service_key = date_region_service_key.encode()

signing_key = _hmac_hash(date_region_service_key, b"aws4_request")
if isinstance(signing_key, str):
signing_key = signing_key.encode()

return signing_key

def _get_signature(signing_key, string_to_sign):

def _get_signature(signing_key: bytes, string_to_sign: str) -> str:
"""Get signature."""

return _hmac_hash(signing_key, string_to_sign.encode(), hexdigest=True)
return str(
_hmac_hash(signing_key, string_to_sign.encode(), hexdigest=True),
)


def _get_authorization(access_key, scope, signed_headers, signature):
def _get_authorization(
access_key: str,
scope: str,
signed_headers: str,
signature: str,
) -> str:
"""Get authorization."""
return (
f"AWS4-HMAC-SHA256 Credential={access_key}/{scope}, "
Expand All @@ -147,15 +191,15 @@ def _get_authorization(access_key, scope, signed_headers, signature):


def _sign_v4(
service_name,
method,
url,
region,
headers,
credentials,
content_sha256,
date,
):
service_name: str,
method: str,
url: SplitResult,
region: str,
headers: dict[str, str | list[str] | tuple[str]],
credentials: Credentials,
content_sha256: str,
date: datetime,
) -> dict[str, str | list[str] | tuple[str]]:
"""Do signature V4 of given request for given service name."""

scope = _get_scope(date, region, service_name)
Expand All @@ -175,14 +219,14 @@ def _sign_v4(


def sign_v4_s3(
method,
url,
region,
headers,
credentials,
content_sha256,
date,
):
method: str,
url: SplitResult,
region: str,
headers: dict[str, str | list[str] | tuple[str]],
credentials: Credentials,
content_sha256: str,
date: datetime,
) -> dict[str, str | list[str] | tuple[str]]:
"""Do signature V4 of given request for S3 service."""
return _sign_v4(
"s3",
Expand All @@ -197,14 +241,14 @@ def sign_v4_s3(


def sign_v4_sts(
method,
url,
region,
headers,
credentials,
content_sha256,
date,
):
method: str,
url: SplitResult,
region: str,
headers: dict[str, str | list[str] | tuple[str]],
credentials: Credentials,
content_sha256: str,
date: datetime,
) -> dict[str, str | list[str] | tuple[str]]:
"""Do signature V4 of given request for STS service."""
return _sign_v4(
"sts",
Expand All @@ -219,8 +263,13 @@ def sign_v4_sts(


def _get_presign_canonical_request_hash( # pylint: disable=invalid-name
method, url, access_key, scope, date, expires,
):
method: str,
url: SplitResult,
access_key: str,
scope: str,
date: datetime,
expires: int,
) -> tuple[str, SplitResult]:
"""Get canonical request hash for presign request."""
x_amz_credential = queryencode(access_key + "/" + scope)
canonical_headers, signed_headers = "host:" + url.netloc, "host"
Expand Down Expand Up @@ -258,13 +307,13 @@ def _get_presign_canonical_request_hash( # pylint: disable=invalid-name


def presign_v4(
method,
url,
region,
credentials,
date,
expires,
):
method: str,
url: SplitResult,
region: str,
credentials: Credentials,
date: datetime,
expires: int,
) -> SplitResult:
"""Do signature V4 of given presign request."""

scope = _get_scope(date, region, "s3")
Expand All @@ -281,12 +330,17 @@ def presign_v4(
return url


def get_credential_string(access_key, date, region):
def get_credential_string(access_key: str, date: datetime, region: str) -> str:
"""Get credential string of given access key, date and region."""
return f"{access_key}/{time.to_signer_date(date)}/{region}/s3/aws4_request"


def post_presign_v4(data, secret_key, date, region):
def post_presign_v4(
data: str,
secret_key: str,
date: datetime,
region: str,
) -> str:
"""Do signature V4 of given presign POST form-data."""
return _get_signature(
_get_signing_key(secret_key, date, region, "s3"),
Expand Down