Skip to content

Commit

Permalink
Add typing in signer.py
Browse files Browse the repository at this point in the history
Signed-off-by: Bala.FA <bala@minio.io>
  • Loading branch information
balamurugana committed Nov 22, 2023
1 parent 3d92349 commit 742f408
Showing 1 changed file with 101 additions and 47 deletions.
148 changes: 101 additions & 47 deletions minio/signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,42 @@
"""

from __future__ import absolute_import, annotations

import hashlib
import hmac
import re
from collections import OrderedDict
from datetime import datetime
from urllib.parse import SplitResult

from . import time
from .credentials import Credentials
from .helpers import queryencode, sha256_hash

SIGN_V4_ALGORITHM = 'AWS4-HMAC-SHA256'
_MULTI_SPACE_REGEX = re.compile(r"( +)")


def _hmac_hash(key, data, hexdigest=False):
def _hmac_hash(
key: bytes | bytearray,
data: bytes,
hexdigest: bool = False,
) -> bytes | str:
"""Return HMacSHA256 digest of given key and data."""

hasher = hmac.new(key, data, hashlib.sha256)
return hasher.hexdigest() if hexdigest else hasher.digest()


def _get_scope(date, region, service_name):
def _get_scope(date: datetime, region: str, service_name: str) -> str:
"""Get scope string."""
return f"{time.to_signer_date(date)}/{region}/{service_name}/aws4_request"


def _get_canonical_headers(headers):
def _get_canonical_headers(
headers: dict[str, str | list[str] | tuple[str]],
) -> tuple[dict[str, str], str]:
"""Get canonical headers."""

canonical_headers = {}
Expand All @@ -74,7 +84,7 @@ def _get_canonical_headers(headers):
return canonical_headers, signed_headers


def _get_canonical_query_string(query):
def _get_canonical_query_string(query: str) -> str:
"""Get canonical query string."""

query = query or ""
Expand All @@ -87,7 +97,12 @@ def _get_canonical_query_string(query):
)


def _get_canonical_request_hash(method, url, headers, content_sha256):
def _get_canonical_request_hash(
method: str,
url: SplitResult,
headers: dict[str, str | list[str] | tuple[str]],
content_sha256: str,
) -> tuple[str, str]:
"""Get canonical request hash."""
canonical_headers, signed_headers = _get_canonical_headers(headers)
canonical_query_string = _get_canonical_query_string(url.query)
Expand All @@ -110,35 +125,64 @@ def _get_canonical_request_hash(method, url, headers, content_sha256):
return sha256_hash(canonical_request), signed_headers


def _get_string_to_sign(date, scope, canonical_request_hash):
def _get_string_to_sign(
date: datetime,
scope: str,
canonical_request_hash: str,
) -> str:
"""Get string-to-sign."""
return (
f"AWS4-HMAC-SHA256\n{time.to_amz_date(date)}\n{scope}\n"
f"{canonical_request_hash}"
)


def _get_signing_key(secret_key, date, region, service_name):
def _get_signing_key(
secret_key: str,
date: datetime,
region: str,
service_name: str,
) -> bytes:
"""Get signing key."""

date_key = _hmac_hash(
("AWS4" + secret_key).encode(),
time.to_signer_date(date).encode(),
)
if isinstance(date_key, str):
date_key = date_key.encode()

date_region_key = _hmac_hash(date_key, region.encode())
if isinstance(date_region_key, str):
date_region_key = date_region_key.encode()

date_region_service_key = _hmac_hash(
date_region_key, service_name.encode(),
)
return _hmac_hash(date_region_service_key, b"aws4_request")
if isinstance(date_region_service_key, str):
date_region_service_key = date_region_service_key.encode()

signing_key = _hmac_hash(date_region_service_key, b"aws4_request")
if isinstance(signing_key, str):
signing_key = signing_key.encode()

return signing_key

def _get_signature(signing_key, string_to_sign):

def _get_signature(signing_key: bytes, string_to_sign: str) -> str:
"""Get signature."""

return _hmac_hash(signing_key, string_to_sign.encode(), hexdigest=True)
return str(
_hmac_hash(signing_key, string_to_sign.encode(), hexdigest=True),
)


def _get_authorization(access_key, scope, signed_headers, signature):
def _get_authorization(
access_key: str,
scope: str,
signed_headers: str,
signature: str,
) -> str:
"""Get authorization."""
return (
f"AWS4-HMAC-SHA256 Credential={access_key}/{scope}, "
Expand All @@ -147,15 +191,15 @@ def _get_authorization(access_key, scope, signed_headers, signature):


def _sign_v4(
service_name,
method,
url,
region,
headers,
credentials,
content_sha256,
date,
):
service_name: str,
method: str,
url: SplitResult,
region: str,
headers: dict[str, str | list[str] | tuple[str]],
credentials: Credentials,
content_sha256: str,
date: datetime,
) -> dict[str, str | list[str] | tuple[str]]:
"""Do signature V4 of given request for given service name."""

scope = _get_scope(date, region, service_name)
Expand All @@ -175,14 +219,14 @@ def _sign_v4(


def sign_v4_s3(
method,
url,
region,
headers,
credentials,
content_sha256,
date,
):
method: str,
url: SplitResult,
region: str,
headers: dict[str, str | list[str] | tuple[str]],
credentials: Credentials,
content_sha256: str,
date: datetime,
) -> dict[str, str | list[str] | tuple[str]]:
"""Do signature V4 of given request for S3 service."""
return _sign_v4(
"s3",
Expand All @@ -197,14 +241,14 @@ def sign_v4_s3(


def sign_v4_sts(
method,
url,
region,
headers,
credentials,
content_sha256,
date,
):
method: str,
url: SplitResult,
region: str,
headers: dict[str, str | list[str] | tuple[str]],
credentials: Credentials,
content_sha256: str,
date: datetime,
) -> dict[str, str | list[str] | tuple[str]]:
"""Do signature V4 of given request for STS service."""
return _sign_v4(
"sts",
Expand All @@ -219,8 +263,13 @@ def sign_v4_sts(


def _get_presign_canonical_request_hash( # pylint: disable=invalid-name
method, url, access_key, scope, date, expires,
):
method: str,
url: SplitResult,
access_key: str,
scope: str,
date: datetime,
expires: int,
) -> tuple[str, SplitResult]:
"""Get canonical request hash for presign request."""
x_amz_credential = queryencode(access_key + "/" + scope)
canonical_headers, signed_headers = "host:" + url.netloc, "host"
Expand Down Expand Up @@ -258,13 +307,13 @@ def _get_presign_canonical_request_hash( # pylint: disable=invalid-name


def presign_v4(
method,
url,
region,
credentials,
date,
expires,
):
method: str,
url: SplitResult,
region: str,
credentials: Credentials,
date: datetime,
expires: int,
) -> SplitResult:
"""Do signature V4 of given presign request."""

scope = _get_scope(date, region, "s3")
Expand All @@ -281,12 +330,17 @@ def presign_v4(
return url


def get_credential_string(access_key, date, region):
def get_credential_string(access_key: str, date: datetime, region: str) -> str:
"""Get credential string of given access key, date and region."""
return f"{access_key}/{time.to_signer_date(date)}/{region}/s3/aws4_request"


def post_presign_v4(data, secret_key, date, region):
def post_presign_v4(
data: str,
secret_key: str,
date: datetime,
region: str,
) -> str:
"""Do signature V4 of given presign POST form-data."""
return _get_signature(
_get_signing_key(secret_key, date, region, "s3"),
Expand Down

0 comments on commit 742f408

Please sign in to comment.