diff --git a/minio/error.py b/minio/error.py index f0fa8646..7b9bed9d 100644 --- a/minio/error.py +++ b/minio/error.py @@ -53,6 +53,22 @@ def __reduce__(self): return type(self), (self._code, self._content_type, self._body) +class AdminResponseError(Exception): + """Raised to indicate error response from server.""" + + def __init__(self, code, content_type, body): + self._code = code + self._content_type = content_type + self._body = body + super().__init__( + f"Error admin response from server; " + f"Response code: {code}, Body: {body}" + ) + + def __reduce__(self): + return type(self), (self._code, self._content_type, self._body) + + class ServerError(MinioException): """Raised to indicate that S3 service returning HTTP server error.""" diff --git a/minio/helpers.py b/minio/helpers.py index 712050a2..2448de42 100644 --- a/minio/helpers.py +++ b/minio/helpers.py @@ -650,6 +650,71 @@ def build( return url_replace(url, netloc=netloc, path=path) +class AdminURL: + """URL of Minio Admin endpoint""" + + def __init__(self, endpoint): + url = urllib.parse.urlsplit(endpoint) + host = url.hostname + + if url.scheme.lower() not in ["http", "https"]: + raise ValueError("scheme in endpoint must be http or https") + + url = url_replace(url, scheme=url.scheme.lower()) + + if url.path and url.path != "/": + raise ValueError("path in endpoint is not allowed") + + url = url_replace(url, path="") + + if url.query: + raise ValueError("query in endpoint is not allowed") + + if url.fragment: + raise ValueError("fragment in endpoint is not allowed") + + try: + url.port + except ValueError as exc: + raise ValueError("invalid port") from exc + + if url.username: + raise ValueError("username in endpoint is not allowed") + + if url.password: + raise ValueError("password in endpoint is not allowed") + + if ( + (url.scheme == "http" and url.port == 80) or + (url.scheme == "https" and url.port == 443) + ): + url = url_replace(url, netloc=host) + + self._url = url + + @property + def is_https(self): + """Check if scheme is HTTPS.""" + return self._url.scheme == "https" + + def build( + self, path, query_params=None + ): + """Build URL for given information.""" + url = url_replace(self._url, path=path) + + query = [] + for key, values in sorted((query_params or {}).items()): + values = values if isinstance(values, (list, tuple)) else [values] + query += [ + f"{queryencode(key)}={queryencode(value)}" + for value in sorted(values) + ] + url = url_replace(url, query="&".join(query)) + + return url + + class ObjectWriteResult: """Result class of any APIs doing object creation.""" diff --git a/minio/minioadmin.py b/minio/minioadmin.py index d30dd6de..dd3ee70a 100644 --- a/minio/minioadmin.py +++ b/minio/minioadmin.py @@ -20,278 +20,401 @@ from __future__ import absolute_import -import json -import subprocess +from datetime import timedelta +from urllib.parse import urlunsplit +import os + +import certifi +import urllib3 + +from . import time + +from .credentials.providers import StaticProvider +from .error import AdminResponseError +from .helpers import AdminURL, sha256_hash +from .signer import sign_v4_s3 + + +_ADMIN_PATH_PREFIX = "/minio/admin/v3" class MinioAdmin: """MinIO Admin wrapper using MinIO Client (mc) tool.""" - def __init__( - self, target, - binary_path=None, config_dir=None, ignore_cert_check=False, - timeout=None, env=None, - ): - self._target = target - self._timeout = timeout - self._env = env - self._base_args = [binary_path or "mc", "--json"] - if config_dir: - self._base_args += ["--config-dir", config_dir] - if ignore_cert_check: - self._base_args.append("--insecure") - self._base_args.append("admin") - - def _run(self, args, multiline=False): - """Execute mc command and return JSON output.""" - proc = subprocess.run( - self._base_args + args, - capture_output=True, - timeout=self._timeout, - env=self._env, - check=True, - text=True, + def __init__(self, endpoint, access_key, + secret_key, + secure=True, + cert_check=True): + self._base_url = AdminURL( + ("https://" if secure else "http://") + endpoint ) - if not proc.stdout: - return [] if multiline else {} - if multiline: - return [json.loads(line) for line in proc.stdout.splitlines()] - return json.loads(proc.stdout) - - def service_restart(self): - """Restart MinIO service.""" - return self._run(["service", "restart", self._target]) - - def service_stop(self): - """Stop MinIO service.""" - return self._run(["service", "stop", self._target]) - - def update(self): - """Update MinIO.""" - return self._run(["update", self._target]) - - def info(self): - """Get MinIO server information.""" - return self._run(["info", self._target]) - - def user_add(self, access_key, secret_key): - """Add a new user.""" - return self._run(["user", "add", self._target, access_key, secret_key]) - - def user_disable(self, access_key): - """Disable user.""" - return self._run(["user", "disable", self._target, access_key]) - - def user_enable(self, access_key): - """Enable user.""" - return self._run(["user", "enable", self._target, access_key]) - - def user_remove(self, access_key): - """Remove user.""" - return self._run(["user", "remove", self._target, access_key]) - - def user_info(self, access_key): - """Get user information.""" - return self._run(["user", "info", self._target, access_key]) - - def user_list(self): - """List users.""" - return self._run(["user", "list", self._target], multiline=True) - - def group_add(self, group_name, members): - """Add users a new or existing group.""" - return self._run(["group", "add", self._target, group_name] + members) - - def group_disable(self, group_name): - """Disable group.""" - return self._run(["group", "disable", self._target, group_name]) - - def group_enable(self, group_name): - """Enable group.""" - return self._run(["group", "enable", self._target, group_name]) - - def group_remove(self, group_name, members=None): - """Remove group or members from a group.""" - return self._run( - ["group", "remove", self._target, group_name] + (members or []), + self._credentials = StaticProvider(access_key, secret_key).retrieve() + + timeout = timedelta(minutes=5).seconds + self._http = urllib3.PoolManager( + timeout=urllib3.util.Timeout(connect=timeout, read=timeout), + maxsize=10, + cert_reqs='CERT_REQUIRED' if cert_check else 'CERT_NONE', + ca_certs=os.environ.get('SSL_CERT_FILE') or certifi.where(), + retries=urllib3.Retry( + total=5, + backoff_factor=0.2, + status_forcelist=[500, 502, 503, 504] + ) ) - def group_info(self, group_name): - """Get group information.""" - return self._run(["group", "info", self._target, group_name]) + def __del__(self): + self._http.clear() + + def _build_headers(self, host, body): + """Build headers with given parameters.""" + headers = {} + headers["Host"] = host + sha256 = None + + if self._base_url.is_https: + sha256 = "UNSIGNED-PAYLOAD" + else: + sha256 = sha256_hash(body) + if sha256: + headers["x-amz-content-sha256"] = sha256 + date = time.utcnow() + headers["x-amz-date"] = time.to_amz_date(date) + return headers, date + + def _build_signed_headers(self, url, body, method): + """Build signed headers""" + headers, date = self._build_headers(url.netloc, body) + headers = sign_v4_s3( + method, + url, + '', + headers, + self._credentials, + headers.get("x-amz-content-sha256"), + date, + ) - def group_list(self): - """List groups.""" - return self._run(["group", "list", self._target], multiline=True) + return headers - def policy_add(self, policy_name, policy_file): - """Add new policy.""" - return self._run( - ["policy", "create", self._target, policy_name, policy_file], + def _send_request(self, method, url, body): + """Send HTTP request with given parameters""" + + headers = self._build_signed_headers( + url, + body, + method ) - def policy_remove(self, policy_name): - """Remove policy.""" - return self._run(["policy", "remove", self._target, policy_name]) - - def policy_info(self, policy_name): - """Get policy information.""" - return self._run(["policy", "info", self._target, policy_name]) - - def policy_list(self): - """List policies.""" - return self._run(["policy", "list", self._target], multiline=True) - - def policy_set(self, policy_name, user=None, group=None): - """Set IAM policy on a user or group.""" - if (user is not None) ^ (group is not None): - return self._run( - [ - "policy", "attach", self._target, policy_name, - "--user" if user else "--group", user or group, - ], - ) - raise ValueError("either user or group must be set") - - def policy_unset(self, policy_name, user=None, group=None): - """Unset an IAM policy for a user or group.""" - if (user is not None) ^ (group is not None): - return self._run( - [ - "policy", "detach", self._target, policy_name, - "--user" if user else "--group", user or group, - ], - ) - raise ValueError("either user or group must be set") + return self._http.urlopen( + method=method, + url=urlunsplit(url), + body=body, + headers=_convert_to_urllib3_headers(headers) + ) - def config_get(self, key=None): - """Get configuration parameters.""" - return self._run( - ["config", "get", self._target] + [key] if key else [], - key, + def _url_open( + self, + method, + path, + body=None, + query_params=None, + ): + """Execute HTTP request.""" + url = self._base_url.build( + path=_ADMIN_PATH_PREFIX + path, + query_params=query_params, ) - def config_set(self, key, config): - """Set configuration parameters.""" - args = [name + "=" + value for name, value in config.items()] - return self._run(["config", "set", self._target, key] + args) - - def config_reset(self, key, name=None): - """Reset configuration parameters.""" - if name: - key += ":" + name - return self._run(["config", "reset", self._target, key]) - - def config_remove(self, access_key): - """Remove config.""" - return self._run(["config", "remove", self._target, access_key]) - - def config_history(self): - """Get historic configuration changes.""" - return self._run(["config", "history", self._target], multiline=True) - - def config_restore(self, restore_id): - """Restore to a specific configuration history.""" - return self._run(["config", "restore", self._target, restore_id]) - - def profile_start(self, profilers=()): - """Start recording profile data.""" - args = ["profile", "start"] - if profilers: - args += ["--type", ",".join(profilers)] - args.append(self._target) - return self._run(args) - - def profile_stop(self): - """Stop and download profile data.""" - return self._run(["profile", "stop", self._target]) - - def top_locks(self): - """Get a list of the 10 oldest locks on a MinIO cluster.""" - return self._run(["top", "locks", self._target], multiline=True) - - def prometheus_generate(self): - """Generate prometheus configuration.""" - return self._run(["prometheus", "generate", self._target]) - - def kms_key_create(self, key=None): - """Create a new KMS master key.""" - return self._run( - [ - "kms", "key", "create", self._target, key - ] + ([key] if key else []), + response = self._send_request( + method, + url, + body, ) - def kms_key_status(self, key=None): - """Get status information of a KMS master key.""" - return self._run( - [ - "kms", "key", "status", self._target, key - ] + ([key] if key else []), + if response.status in [200, 204, 206]: + return response + + raise AdminResponseError( + response.status, + response.headers.get("content-type"), + response.data.decode() if response.data else None ) - def bucket_remote_add( - self, src_bucket, dest_url, - path=None, region=None, bandwidth=None, service=None, + def _execute( + self, + method, + path, + body=None, + query_params=None, ): - """Add a new remote target.""" - args = [ - "bucket", "remote", "add", self._target + "/" + src_bucket, - dest_url, "--service", service or "replication", - ] - if path: - args += ["--path", path] - if region: - args += ["--region", region] - if bandwidth: - args += ["--bandwidth", bandwidth] - return self._run(args) - - def bucket_remote_edit(self, src_bucket, dest_url, arn): - """Edit credentials of remote target.""" - return self._run( - [ - "bucket", "remote", "edit", self._target + "/" + src_bucket, - dest_url, "--arn", arn, - ], + """Execute HTTP request.""" + url = self._base_url.build( + path=_ADMIN_PATH_PREFIX + path, + query_params=query_params, ) - def bucket_remote_list(self, src_bucket=None, service=None): - """List remote targets.""" - return self._run( - [ - "bucket", "remote", "ls", - self._target + ("/" + src_bucket if src_bucket else ""), - "--service", service or "replication", - ], + response = self._send_request( + method, + url, + body, ) - def bucket_remote_remove(self, src_bucket, arn): - """Remove configured remote target.""" - return self._run( - [ - "bucket", "remote", "rm", self._target + "/" + src_bucket, - "--arn", arn, - ], - ) + if response.status in [200, 204, 206]: + return response - def bucket_quota_set(self, bucket, fifo=None, hard=None): - """Set bucket quota configuration.""" - if fifo is None and hard is None: - raise ValueError("fifo or hard must be set") - args = ["bucket", "quota", self._target + "/" + bucket] - if fifo: - args += ["--fifo", fifo] - if hard: - args += ["--hard", hard] - return self._run(args) - - def bucket_quota_clear(self, bucket): - """Clear bucket quota configuration.""" - return self._run( - ["bucket", "quota", self._target + "/" + bucket, "--clear"], + raise AdminResponseError( + response.status, + response.headers.get("content-type"), + response.data.decode() if response.data else None ) - def bucket_quota_get(self, bucket): - """Get bucket quota configuration.""" - return self._run(["bucket", "quota", self._target + "/" + bucket]) + # def service_restart(self): + # """Restart MinIO service.""" + # return self._run(["service", "restart", self._target]) + + # def service_stop(self): + # """Stop MinIO service.""" + # return self._run(["service", "stop", self._target]) + + # def update(self): + # """Update MinIO.""" + # return self._run(["update", self._target]) + + # def info(self): + # """Get MinIO server information.""" + # return self._run(["info", self._target]) + + # def user_add(self, access_key, secret_key): + # """Add a new user.""" + # return self._run(["user", "add", self._target, + # access_key, secret_key]) + + # def user_disable(self, access_key): + # """Disable user.""" + # return self._run(["user", "disable", self._target, access_key]) + + # def user_enable(self, access_key): + # """Enable user.""" + # return self._run(["user", "enable", self._target, access_key]) + + # def user_remove(self, access_key): + # """Remove user.""" + # return self._run(["user", "remove", self._target, access_key]) + + # def user_info(self, access_key): + # """Get user information.""" + # return self._run(["user", "info", self._target, access_key]) + + # def user_list(self): + # """List users.""" + # return self._run(["user", "list", self._target], multiline=True) + + # def group_add(self, group_name, members): + # """Add users a new or existing group.""" + # return self._run(["group", "add", self._target, group_name] + members) + + # def group_disable(self, group_name): + # """Disable group.""" + # return self._run(["group", "disable", self._target, group_name]) + + # def group_enable(self, group_name): + # """Enable group.""" + # return self._run(["group", "enable", self._target, group_name]) + + # def group_remove(self, group_name, members=None): + # """Remove group or members from a group.""" + # return self._run( + # ["group", "remove", self._target, group_name] + (members or []), + # ) + + # def group_info(self, group_name): + # """Get group information.""" + # return self._run(["group", "info", self._target, group_name]) + + # def group_list(self): + # """List groups.""" + # return self._run(["group", "list", self._target], multiline=True) + + # def policy_add(self, policy_name, policy_file): + # """Add new policy.""" + # return self._run( + # ["policy", "create", self._target, policy_name, policy_file], + # ) + + # def policy_remove(self, policy_name): + # """Remove policy.""" + # return self._run(["policy", "remove", self._target, policy_name]) + + # def policy_info(self, policy_name): + # """Get policy information.""" + # return self._run(["policy", "info", self._target, policy_name]) + + # def policy_list(self): + # """List policies.""" + # return self._run(["policy", "list", self._target], multiline=True) + + # def policy_set(self, policy_name, user=None, group=None): + # """Set IAM policy on a user or group.""" + # if (user is not None) ^ (group is not None): + # return self._run( + # [ + # "policy", "attach", self._target, policy_name, + # "--user" if user else "--group", user or group, + # ], + # ) + # raise ValueError("either user or group must be set") + + # def policy_unset(self, policy_name, user=None, group=None): + # """Unset an IAM policy for a user or group.""" + # if (user is not None) ^ (group is not None): + # return self._run( + # [ + # "policy", "detach", self._target, policy_name, + # "--user" if user else "--group", user or group, + # ], + # ) + # raise ValueError("either user or group must be set") + + # def config_get(self, key=None): + # """Get configuration parameters.""" + # return self._run( + # ["config", "get", self._target] + [key] if key else [], + # key, + # ) + + # def config_set(self, key, config): + # """Set configuration parameters.""" + # args = [name + "=" + value for name, value in config.items()] + # return self._run(["config", "set", self._target, key] + args) + + # def config_reset(self, key, name=None): + # """Reset configuration parameters.""" + # if name: + # key += ":" + name + # return self._run(["config", "reset", self._target, key]) + + # def config_remove(self, access_key): + # """Remove config.""" + # return self._run(["config", "remove", self._target, access_key]) + + # def config_history(self): + # """Get historic configuration changes.""" + # return self._run(["config", "history", self._target], multiline=True) + + # def config_restore(self, restore_id): + # """Restore to a specific configuration history.""" + # return self._run(["config", "restore", self._target, restore_id]) + + # def profile_start(self, profilers=()): + # """Start recording profile data.""" + # args = ["profile", "start"] + # if profilers: + # args += ["--type", ",".join(profilers)] + # args.append(self._target) + # return self._run(args) + + # def profile_stop(self): + # """Stop and download profile data.""" + # return self._run(["profile", "stop", self._target]) + + # def top_locks(self): + # """Get a list of the 10 oldest locks on a MinIO cluster.""" + # return self._run(["top", "locks", self._target], multiline=True) + + # def prometheus_generate(self): + # """Generate prometheus configuration.""" + # return self._run(["prometheus", "generate", self._target]) + + # def kms_key_create(self, key=None): + # """Create a new KMS master key.""" + # return self._run( + # [ + # "kms", "key", "create", self._target, key + # ] + ([key] if key else []), + # ) + + # def kms_key_status(self, key=None): + # """Get status information of a KMS master key.""" + # return self._run( + # [ + # "kms", "key", "status", self._target, key + # ] + ([key] if key else []), + # ) + + # def bucket_remote_add( + # self, src_bucket, dest_url, + # path=None, region=None, bandwidth=None, service=None, + # ): + # """Add a new remote target.""" + # args = [ + # "bucket", "remote", "add", self._target + "/" + src_bucket, + # dest_url, "--service", service or "replication", + # ] + # if path: + # args += ["--path", path] + # if region: + # args += ["--region", region] + # if bandwidth: + # args += ["--bandwidth", bandwidth] + # return self._run(args) + + # def bucket_remote_edit(self, src_bucket, dest_url, arn): + # """Edit credentials of remote target.""" + # return self._run( + # [ + # "bucket", "remote", "edit", self._target + "/" + src_bucket, + # dest_url, "--arn", arn, + # ], + # ) + + # def bucket_remote_list(self, src_bucket=None, service=None): + # """List remote targets.""" + # return self._run( + # [ + # "bucket", "remote", "ls", + # self._target + ("/" + src_bucket if src_bucket else ""), + # "--service", service or "replication", + # ], + # ) + + # def bucket_remote_remove(self, src_bucket, arn): + # """Remove configured remote target.""" + # return self._run( + # [ + # "bucket", "remote", "rm", self._target + "/" + src_bucket, + # "--arn", arn, + # ], + # ) + + # def bucket_quota_set(self, bucket, fifo=None, hard=None): + # """Set bucket quota configuration.""" + # if fifo is None and hard is None: + # raise ValueError("fifo or hard must be set") + # args = ["bucket", "quota", self._target + "/" + bucket] + # if fifo: + # args += ["--fifo", fifo] + # if hard: + # args += ["--hard", hard] + # return self._run(args) + + # def bucket_quota_clear(self, bucket): + # """Clear bucket quota configuration.""" + # return self._run( + # ["bucket", "quota", self._target + "/" + bucket, "--clear"], + # ) + + # def bucket_quota_get(self, bucket): + # """Get bucket quota configuration.""" + # return self._run(["bucket", "quota", self._target + "/" + bucket]) + + +def _convert_to_urllib3_headers(headers): + """Convert headers to urllib3 format""" + http_headers = urllib3.HTTPHeaderDict() + for key, value in (headers or {}).items(): + if isinstance(value, (list, tuple)): + _ = [http_headers.add(key, val) for val in value] + else: + http_headers.add(key, value) + return http_headers