diff --git a/minio/signer.py b/minio/signer.py index 783d50110..e52432b1b 100644 --- a/minio/signer.py +++ b/minio/signer.py @@ -26,32 +26,42 @@ """ +from __future__ import absolute_import, annotations + import hashlib import hmac import re from collections import OrderedDict +from datetime import datetime from urllib.parse import SplitResult from . import time +from .credentials import Credentials from .helpers import queryencode, sha256_hash SIGN_V4_ALGORITHM = 'AWS4-HMAC-SHA256' _MULTI_SPACE_REGEX = re.compile(r"( +)") -def _hmac_hash(key, data, hexdigest=False): +def _hmac_hash( + key: bytes | bytearray, + data: bytes, + hexdigest: bool = False, +) -> bytes | str: """Return HMacSHA256 digest of given key and data.""" hasher = hmac.new(key, data, hashlib.sha256) return hasher.hexdigest() if hexdigest else hasher.digest() -def _get_scope(date, region, service_name): +def _get_scope(date: datetime, region: str, service_name: str) -> str: """Get scope string.""" return f"{time.to_signer_date(date)}/{region}/{service_name}/aws4_request" -def _get_canonical_headers(headers): +def _get_canonical_headers( + headers: dict[str, str | list[str] | tuple[str]], +) -> tuple[dict[str, str], str]: """Get canonical headers.""" canonical_headers = {} @@ -74,7 +84,7 @@ def _get_canonical_headers(headers): return canonical_headers, signed_headers -def _get_canonical_query_string(query): +def _get_canonical_query_string(query: str) -> str: """Get canonical query string.""" query = query or "" @@ -87,7 +97,12 @@ def _get_canonical_query_string(query): ) -def _get_canonical_request_hash(method, url, headers, content_sha256): +def _get_canonical_request_hash( + method: str, + url: SplitResult, + headers: dict[str, str | list[str] | tuple[str]], + content_sha256: str, +) -> tuple[str, str]: """Get canonical request hash.""" canonical_headers, signed_headers = _get_canonical_headers(headers) canonical_query_string = _get_canonical_query_string(url.query) @@ -110,7 +125,11 @@ def _get_canonical_request_hash(method, url, headers, content_sha256): return sha256_hash(canonical_request), signed_headers -def _get_string_to_sign(date, scope, canonical_request_hash): +def _get_string_to_sign( + date: datetime, + scope: str, + canonical_request_hash: str, +) -> str: """Get string-to-sign.""" return ( f"AWS4-HMAC-SHA256\n{time.to_amz_date(date)}\n{scope}\n" @@ -118,27 +137,52 @@ def _get_string_to_sign(date, scope, canonical_request_hash): ) -def _get_signing_key(secret_key, date, region, service_name): +def _get_signing_key( + secret_key: str, + date: datetime, + region: str, + service_name: str, +) -> bytes: """Get signing key.""" date_key = _hmac_hash( ("AWS4" + secret_key).encode(), time.to_signer_date(date).encode(), ) + if isinstance(date_key, str): + date_key = date_key.encode() + date_region_key = _hmac_hash(date_key, region.encode()) + if isinstance(date_region_key, str): + date_region_key = date_region_key.encode() + date_region_service_key = _hmac_hash( date_region_key, service_name.encode(), ) - return _hmac_hash(date_region_service_key, b"aws4_request") + if isinstance(date_region_service_key, str): + date_region_service_key = date_region_service_key.encode() + + signing_key = _hmac_hash(date_region_service_key, b"aws4_request") + if isinstance(signing_key, str): + signing_key = signing_key.encode() + return signing_key -def _get_signature(signing_key, string_to_sign): + +def _get_signature(signing_key: bytes, string_to_sign: str) -> str: """Get signature.""" - return _hmac_hash(signing_key, string_to_sign.encode(), hexdigest=True) + return str( + _hmac_hash(signing_key, string_to_sign.encode(), hexdigest=True), + ) -def _get_authorization(access_key, scope, signed_headers, signature): +def _get_authorization( + access_key: str, + scope: str, + signed_headers: str, + signature: str, +) -> str: """Get authorization.""" return ( f"AWS4-HMAC-SHA256 Credential={access_key}/{scope}, " @@ -147,15 +191,15 @@ def _get_authorization(access_key, scope, signed_headers, signature): def _sign_v4( - service_name, - method, - url, - region, - headers, - credentials, - content_sha256, - date, -): + service_name: str, + method: str, + url: SplitResult, + region: str, + headers: dict[str, str | list[str] | tuple[str]], + credentials: Credentials, + content_sha256: str, + date: datetime, +) -> dict[str, str | list[str] | tuple[str]]: """Do signature V4 of given request for given service name.""" scope = _get_scope(date, region, service_name) @@ -175,14 +219,14 @@ def _sign_v4( def sign_v4_s3( - method, - url, - region, - headers, - credentials, - content_sha256, - date, -): + method: str, + url: SplitResult, + region: str, + headers: dict[str, str | list[str] | tuple[str]], + credentials: Credentials, + content_sha256: str, + date: datetime, +) -> dict[str, str | list[str] | tuple[str]]: """Do signature V4 of given request for S3 service.""" return _sign_v4( "s3", @@ -197,14 +241,14 @@ def sign_v4_s3( def sign_v4_sts( - method, - url, - region, - headers, - credentials, - content_sha256, - date, -): + method: str, + url: SplitResult, + region: str, + headers: dict[str, str | list[str] | tuple[str]], + credentials: Credentials, + content_sha256: str, + date: datetime, +) -> dict[str, str | list[str] | tuple[str]]: """Do signature V4 of given request for STS service.""" return _sign_v4( "sts", @@ -219,8 +263,13 @@ def sign_v4_sts( def _get_presign_canonical_request_hash( # pylint: disable=invalid-name - method, url, access_key, scope, date, expires, -): + method: str, + url: SplitResult, + access_key: str, + scope: str, + date: datetime, + expires: int, +) -> tuple[str, SplitResult]: """Get canonical request hash for presign request.""" x_amz_credential = queryencode(access_key + "/" + scope) canonical_headers, signed_headers = "host:" + url.netloc, "host" @@ -258,13 +307,13 @@ def _get_presign_canonical_request_hash( # pylint: disable=invalid-name def presign_v4( - method, - url, - region, - credentials, - date, - expires, -): + method: str, + url: SplitResult, + region: str, + credentials: Credentials, + date: datetime, + expires: int, +) -> SplitResult: """Do signature V4 of given presign request.""" scope = _get_scope(date, region, "s3") @@ -281,12 +330,17 @@ def presign_v4( return url -def get_credential_string(access_key, date, region): +def get_credential_string(access_key: str, date: datetime, region: str) -> str: """Get credential string of given access key, date and region.""" return f"{access_key}/{time.to_signer_date(date)}/{region}/s3/aws4_request" -def post_presign_v4(data, secret_key, date, region): +def post_presign_v4( + data: str, + secret_key: str, + date: datetime, + region: str, +) -> str: """Do signature V4 of given presign POST form-data.""" return _get_signature( _get_signing_key(secret_key, date, region, "s3"),