diff --git a/minio/crypto.py b/minio/crypto.py index d3ad9cc66..b0878059e 100644 --- a/minio/crypto.py +++ b/minio/crypto.py @@ -24,7 +24,7 @@ from Crypto.Cipher import AES, ChaCha20_Poly1305 from Crypto.Cipher._mode_gcm import GcmMode from Crypto.Cipher.ChaCha20_Poly1305 import ChaCha20Poly1305Cipher -from urllib3 import HTTPResponse +from urllib3.response import BaseHTTPResponse # # Encrypted Message Format: @@ -138,7 +138,7 @@ class DecryptReader: APIs. """ - def __init__(self, response: HTTPResponse, secret: bytes): + def __init__(self, response: BaseHTTPResponse, secret: bytes): self._response = response self._secret = secret self._payload = None @@ -156,6 +156,7 @@ def __init__(self, response: HTTPResponse, secret: bytes): ) self._chunk = b"" self._count = 0 + self._is_closed = False def __enter__(self): return self @@ -193,13 +194,14 @@ def _decrypt(self, payload: bytes, last_chunk: bool = False) -> bytes: def _read_chunk(self) -> bool: """Read a chunk at least one byte more than chunk size.""" - if self._response.isclosed(): + if self._is_closed: return True while len(self._chunk) != (1 + _MAX_CHUNK_SIZE): chunk = self._response.read(1 + _MAX_CHUNK_SIZE - len(self._chunk)) self._chunk += chunk if len(chunk) == 0: + self._is_closed = True return True return False @@ -235,7 +237,7 @@ def stream(self, num_bytes=32*1024): break -def decrypt(response: HTTPResponse, secret_key: str) -> bytes: +def decrypt(response: BaseHTTPResponse, secret_key: str) -> bytes: """Decrypt response data.""" result = b"" with DecryptReader(response, secret_key.encode()) as reader: diff --git a/minio/minioadmin.py b/minio/minioadmin.py index 46423a647..a7c631e86 100644 --- a/minio/minioadmin.py +++ b/minio/minioadmin.py @@ -18,22 +18,27 @@ """MinIO Admin Client to perform MinIO administration operations.""" -from __future__ import absolute_import +from __future__ import absolute_import, annotations import json import os from datetime import timedelta from enum import Enum +from typing import TextIO from urllib.parse import urlunsplit import certifi -import urllib3 +from typing_extensions import Protocol +from urllib3 import Retry from urllib3._collections import HTTPHeaderDict - -from minio.crypto import decrypt, encrypt +from urllib3.poolmanager import PoolManager +from urllib3.response import BaseHTTPResponse +from urllib3.util import Timeout from . import time -from .credentials.providers import Provider +from .credentials import Provider +from .crypto import decrypt, encrypt +from .datatypes import PeerInfo, PeerSite, SiteReplicationStatusOptions from .error import MinioAdminException from .helpers import (_DEFAULT_USER_AGENT, _REGION_REGEX, _parse_url, headers_to_strings, queryencode, sha256_hash, @@ -83,35 +88,45 @@ ) +class CommandType(Protocol): + """typing stub for enum.Command class""" + + @property + def value(self) -> str: + """Get value of the command.""" + + class MinioAdmin: """Client to perform MinIO administration operations.""" - def __init__(self, - endpoint: str, - credentials: Provider, - region: str = "", - secure: bool = True, - cert_check: bool = True, - http_client: urllib3.poolmanager.PoolManager = None): + def __init__( + self, + endpoint: str, + credentials: Provider, + region: str = "", + secure: bool = True, + cert_check: bool = True, + http_client: PoolManager | None = None, + ): url = _parse_url(("https://" if secure else "http://") + endpoint) if not isinstance(credentials, Provider): raise ValueError("valid credentials must be provided") if region and not _REGION_REGEX.match(region): raise ValueError(f"invalid region {region}") if http_client: - if not isinstance(http_client, urllib3.poolmanager.PoolManager): + if not isinstance(http_client, PoolManager): raise ValueError( "HTTP client should be instance of " "`urllib3.poolmanager.PoolManager`" ) else: timeout = timedelta(minutes=5).seconds - http_client = urllib3.PoolManager( - timeout=urllib3.util.Timeout(connect=timeout, read=timeout), + http_client = PoolManager( + timeout=Timeout(connect=timeout, read=timeout), maxsize=10, cert_reqs='CERT_REQUIRED' if cert_check else 'CERT_NONE', ca_certs=os.environ.get('SSL_CERT_FILE') or certifi.where(), - retries=urllib3.Retry( + retries=Retry( total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504] @@ -125,19 +140,20 @@ def __init__(self, self._cert_check = cert_check self._http = http_client self._user_agent = _DEFAULT_USER_AGENT - self._trace_stream = None + self._trace_stream: TextIO | None = None def __del__(self): self._http.clear() def _url_open( self, - method, - command, - query_params=None, - body=None, - preload_content=True, - ): + method: str, + command: CommandType, + query_params: + dict[str, str | list[str] | tuple[str]] | None = None, + body: bytes | None = None, + preload_content: bool = True, + ) -> BaseHTTPResponse: """Execute HTTP request.""" creds = self._provider.retrieve() @@ -151,12 +167,13 @@ def _url_open( ] url = url_replace(url, query="&".join(query)) + content_sha256 = sha256_hash(body) date = time.utcnow() - headers = { + headers: dict[str, str | list[str] | tuple[str]] = { "Host": url.netloc, "User-Agent": self._user_agent, "x-amz-date": time.to_amz_date(date), - "x-amz-content-sha256": sha256_hash(body), + "x-amz-content-sha256": content_sha256, "Content-Type": "application/octet-stream" } if creds.session_token: @@ -170,14 +187,16 @@ def _url_open( self._region, headers, creds, - headers.get("x-amz-content-sha256"), + content_sha256, date, ) if self._trace_stream: self._trace_stream.write("---------START-HTTP---------\n") - query = ("?" + url.query) if url.query else "" - self._trace_stream.write(f"{method} {url.path}{query} HTTP/1.1\n") + query_string = ("?" + url.query) if url.query else "" + self._trace_stream.write( + f"{method} {url.path}{query_string} HTTP/1.1\n", + ) self._trace_stream.write( headers_to_strings(headers, titled_key=True), ) @@ -193,7 +212,8 @@ def _url_open( http_headers = HTTPHeaderDict() for key, value in headers.items(): if isinstance(value, (list, tuple)): - _ = [http_headers.add(key, val) for val in value] + for val in value: + http_headers.add(key, val) else: http_headers.add(key, value) @@ -221,7 +241,7 @@ def _url_open( raise MinioAdminException(response.status, response.data.decode()) - def set_app_info(self, app_name, app_version): + def set_app_info(self, app_name: str, app_version: str): """ Set your application name and version to user agent header. @@ -235,7 +255,7 @@ def set_app_info(self, app_name, app_version): raise ValueError("Application name/version cannot be empty.") self._user_agent = f"{_DEFAULT_USER_AGENT} {app_name}/{app_version}" - def trace_on(self, stream): + def trace_on(self, stream: TextIO): """ Enable http trace. @@ -252,7 +272,7 @@ def trace_off(self): """ self._trace_stream = None - def service_restart(self): + def service_restart(self) -> str: """Restart MinIO service.""" response = self._url_open( "POST", @@ -261,7 +281,7 @@ def service_restart(self): ) return response.data.decode() - def service_stop(self): + def service_stop(self) -> str: """Stop MinIO service.""" response = self._url_open( "POST", @@ -270,7 +290,7 @@ def service_stop(self): ) return response.data.decode() - def update(self): + def update(self) -> str: """Update MinIO.""" response = self._url_open( "POST", @@ -279,7 +299,7 @@ def update(self): ) return response.data.decode() - def info(self): + def info(self) -> str: """Get MinIO server information.""" response = self._url_open( "GET", @@ -287,7 +307,7 @@ def info(self): ) return response.data.decode() - def user_add(self, access_key, secret_key): + def user_add(self, access_key: str, secret_key: str) -> str: """Create user with access and secret keys""" body = json.dumps( {"status": "enabled", "secretKey": secret_key}).encode() @@ -299,7 +319,7 @@ def user_add(self, access_key, secret_key): ) return response.data.decode() - def user_disable(self, access_key): + def user_disable(self, access_key: str) -> str: """Disable user.""" response = self._url_open( "PUT", @@ -308,7 +328,7 @@ def user_disable(self, access_key): ) return response.data.decode() - def user_enable(self, access_key): + def user_enable(self, access_key: str) -> str: """Enable user.""" response = self._url_open( "PUT", @@ -317,7 +337,7 @@ def user_enable(self, access_key): ) return response.data.decode() - def user_remove(self, access_key): + def user_remove(self, access_key: str) -> str: """Delete user""" response = self._url_open( "DELETE", @@ -326,7 +346,7 @@ def user_remove(self, access_key): ) return response.data.decode() - def user_info(self, access_key): + def user_info(self, access_key: str) -> str: """Get information about user""" response = self._url_open( "GET", @@ -335,7 +355,7 @@ def user_info(self, access_key): ) return response.data.decode() - def user_list(self): + def user_list(self) -> str: """List all users""" response = self._url_open( "GET", _COMMAND.LIST_USERS, preload_content=False, @@ -345,7 +365,7 @@ def user_list(self): ) return plain_data.decode() - def group_add(self, group_name, members): + def group_add(self, group_name: str, members: str) -> str: """Add users a new or existing group.""" body = json.dumps({ "group": group_name, @@ -359,7 +379,7 @@ def group_add(self, group_name, members): ) return response.data.decode() - def group_disable(self, group_name): + def group_disable(self, group_name: str) -> str: """Disable group.""" response = self._url_open( "PUT", @@ -368,7 +388,7 @@ def group_disable(self, group_name): ) return response.data.decode() - def group_enable(self, group_name): + def group_enable(self, group_name: str) -> str: """Enable group.""" response = self._url_open( "PUT", @@ -377,21 +397,23 @@ def group_enable(self, group_name): ) return response.data.decode() - def group_remove(self, group_name, members=None): + def group_remove(self, group_name: str, members: str | None = None) -> str: """Remove group or members from a group.""" - body = json.dumps({ + data = { "group": group_name, - "members": members, "isRemove": True - }).encode() + } + if members is not None: + data["members"] = members + response = self._url_open( "PUT", _COMMAND.ADD_UPDATE_REMOVE_GROUP, - body=body, + body=json.dumps(data).encode(), ) return response.data.decode() - def group_info(self, group_name): + def group_info(self, group_name: str) -> str: """Get group information.""" response = self._url_open( "GET", @@ -400,12 +422,12 @@ def group_info(self, group_name): ) return response.data.decode() - def group_list(self): + def group_list(self) -> str: """List groups.""" response = self._url_open("GET", _COMMAND.LIST_GROUPS) return response.data.decode() - def policy_add(self, policy_name, policy_file): + def policy_add(self, policy_name: str, policy_file: str) -> str: """Add new policy.""" with open(policy_file, encoding='utf-8') as file: response = self._url_open( @@ -416,7 +438,7 @@ def policy_add(self, policy_name, policy_file): ) return response.data.decode() - def policy_remove(self, policy_name): + def policy_remove(self, policy_name: str) -> str: """Remove policy.""" response = self._url_open( "DELETE", @@ -425,7 +447,7 @@ def policy_remove(self, policy_name): ) return response.data.decode() - def policy_info(self, policy_name): + def policy_info(self, policy_name: str) -> str: """Get policy information.""" response = self._url_open( "GET", @@ -434,12 +456,17 @@ def policy_info(self, policy_name): ) return response.data.decode() - def policy_list(self): + def policy_list(self) -> str: """List policies.""" response = self._url_open("GET", _COMMAND.LIST_CANNED_POLICIES) return response.data.decode() - def policy_set(self, policy_name, user=None, group=None): + def policy_set( + self, + policy_name: str | list[str], + user: str | None = None, + group: str | None = None, + ) -> str: """Set IAM policy on a user or group.""" if (user is not None) ^ (group is not None): response = self._url_open( @@ -452,18 +479,29 @@ def policy_set(self, policy_name, user=None, group=None): return response.data.decode() raise ValueError("either user or group must be set") - def policy_unset(self, policy_name, user=None, group=None): + def policy_unset( + self, + policy_name: str | list[str], + user: str | None = None, + group: str | None = None, + ) -> str: """Unset an IAM policy for a user or group.""" - body = json.dumps({ - "policies": [policy_name], - "group": group, - "user": user - }).encode() if (user is not None) ^ (group is not None): + policies = ( + policy_name if isinstance(policy_name, list) else [policy_name] + ) + data: dict[str, str | list[str]] = {"policies": policies} + if user: + data["user"] = user + if group: + data["group"] = group response = self._url_open( "POST", _COMMAND.UNSET_USER_OR_GROUP_POLICY, - body=encrypt(body, self._provider.retrieve().secret_key), + body=encrypt( + json.dumps(data).encode(), + self._provider.retrieve().secret_key, + ), preload_content=False, ) plain_data = decrypt( @@ -472,32 +510,35 @@ def policy_unset(self, policy_name, user=None, group=None): return plain_data.decode() raise ValueError("either user or group must be set") - def config_get(self, key=None): + def config_get(self, key: str | None = None) -> str: """Get configuration parameters.""" - if not key: + try: response = self._url_open( "GET", - _COMMAND.HELP_CONFIG, - query_params={"key": "", "subSys": ""}, + _COMMAND.GET_CONFIG, + query_params={"key": key or "", "subSys": ""}, + preload_content=False, ) - return response.data.decode() - - response = self._url_open( - "GET", - _COMMAND.GET_CONFIG, - query_params={"key": key, "subSys": ""}, - preload_content=False, - ) - plain_text = decrypt( - response, self._provider.retrieve().secret_key, - ) - return plain_text.decode() + if key is None: + return response.read().decode() + return decrypt( + response, self._provider.retrieve().secret_key, + ).decode() + finally: + if response: + response.close() + response.release_conn() - def config_set(self, key=None, config=None): + def config_set( + self, + key: str, + config: dict[str, str] | None = None, + ) -> str: """Set configuration parameters.""" - body = " ".join( - [key] + [f"{name}={value}" for name, value in config.items()] - ).encode() + data = [key] + if config: + data += [f"{name}={value}" for name, value in config.items()] + body = " ".join(data).encode() response = self._url_open( "PUT", _COMMAND.SET_CONFIG, @@ -505,7 +546,7 @@ def config_set(self, key=None, config=None): ) return response.data.decode() - def config_reset(self, key, name=None): + def config_reset(self, key: str, name: str | None = None) -> str: """Reset configuration parameters.""" if name: key += ":" + name @@ -517,20 +558,25 @@ def config_reset(self, key, name=None): ) return response.data.decode() - def config_history(self): + def config_history(self) -> str: """Get historic configuration changes.""" - response = self._url_open( - "GET", - _COMMAND.LIST_CONFIG_HISTORY, - query_params={"count": "10"}, - preload_content=False, - ) - plain_text = decrypt( - response, self._provider.retrieve().secret_key, - ) - return plain_text.decode() + try: + response = self._url_open( + "GET", + _COMMAND.LIST_CONFIG_HISTORY, + query_params={"count": "10"}, + preload_content=False, + ) + plain_text = decrypt( + response, self._provider.retrieve().secret_key, + ) + return plain_text.decode() + finally: + if response: + response.close() + response.release_conn() - def config_restore(self, restore_id): + def config_restore(self, restore_id: str) -> str: """Restore to a specific configuration history.""" response = self._url_open( "PUT", @@ -539,16 +585,16 @@ def config_restore(self, restore_id): ) return response.data.decode() - def profile_start(self, profilers=()): + def profile_start(self, profilers: tuple[str] = ()) -> str: """Runs a system profile""" response = self._url_open( "POST", _COMMAND.START_PROFILE, query_params={"profilerType;": ",".join(profilers)}, ) - return response.data + return response.data.decode() - def top_locks(self): + def top_locks(self) -> str: """Get a list of the 10 oldest locks on a MinIO cluster.""" response = self._url_open( "GET", @@ -556,7 +602,7 @@ def top_locks(self): ) return response.data.decode() - def kms_key_create(self, key=None): + def kms_key_create(self, key: str | None = None) -> str: """Create a new KMS master key.""" response = self._url_open( "POST", @@ -565,7 +611,7 @@ def kms_key_create(self, key=None): ) return response.data.decode() - def kms_key_status(self, key=None): + def kms_key_status(self, key: str | None = None) -> str: """Get status information of a KMS master key.""" response = self._url_open( "GET", @@ -574,7 +620,7 @@ def kms_key_status(self, key=None): ) return response.data.decode() - def add_site_replication(self, peer_sites): + def add_site_replication(self, peer_sites: list[PeerSite]) -> str: """Add peer sites to site replication.""" body = json.dumps( [peer_site.to_dict() for peer_site in peer_sites]).encode() @@ -586,12 +632,15 @@ def add_site_replication(self, peer_sites): ) return response.data.decode() - def get_site_replication_info(self): + def get_site_replication_info(self) -> str: """Get site replication information.""" response = self._url_open("GET", _COMMAND.SITE_REPLICATION_INFO) return response.data.decode() - def get_site_replication_status(self, options): + def get_site_replication_status( + self, + options: SiteReplicationStatusOptions, + ) -> str: """Get site replication information.""" response = self._url_open( "GET", @@ -600,7 +649,7 @@ def get_site_replication_status(self, options): ) return response.data.decode() - def edit_site_replication(self, peer_info): + def edit_site_replication(self, peer_info: PeerInfo) -> str: """Edit site replication with given peer information.""" body = json.dumps(peer_info.to_dict()).encode() response = self._url_open( @@ -611,7 +660,11 @@ def edit_site_replication(self, peer_info): ) return response.data.decode() - def remove_site_replication(self, sites=None, all_sites=False): + def remove_site_replication( + self, + sites: str | None = None, + all_sites: bool = False, + ) -> str: """Remove given sites or all sites from site replication.""" data = {} if all_sites: @@ -629,7 +682,7 @@ def remove_site_replication(self, sites=None, all_sites=False): ) return response.data.decode() - def bucket_quota_set(self, bucket, size): + def bucket_quota_set(self, bucket: str, size: int) -> str: """Set bucket quota configuration.""" body = json.dumps({"quota": size, "quotatype": "hard"}).encode() response = self._url_open( @@ -640,18 +693,11 @@ def bucket_quota_set(self, bucket, size): ) return response.data.decode() - def bucket_quota_clear(self, bucket): + def bucket_quota_clear(self, bucket: str) -> str: """Clear bucket quota configuration.""" - body = json.dumps({"quota": 0, "quotatype": "hard"}).encode() - response = self._url_open( - "PUT", - _COMMAND.SET_BUCKET_QUOTA, - query_params={"bucket": bucket}, - body=body - ) - return response.data.decode() + return self.bucket_quota_set(bucket, 0) - def bucket_quota_get(self, bucket): + def bucket_quota_get(self, bucket: str) -> str: """Get bucket quota configuration.""" response = self._url_open( "GET",