From ad9e00393b135264fd7f7685419f31903d03b3e2 Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Fri, 30 Jun 2023 15:53:50 +0100 Subject: [PATCH] #473 Warn if bucket cannot be accessed + warn if AWS auth is being used and machine identity is disabled + Refactor S3 fixtures so various errors can be simulated --- cpp/arcticdb/storage/python_bindings.cpp | 7 + cpp/arcticdb/storage/s3/s3_api.cpp | 1 + cpp/arcticdb/storage/s3/s3_api.hpp | 4 + cpp/arcticdb/storage/s3/s3_storage.cpp | 45 +++ cpp/arcticdb/storage/s3/s3_storage.hpp | 17 +- .../adapters/arctic_library_adapter.py | 9 + python/arcticdb/arctic.py | 2 + python/arcticdb/util/test.py | 10 + python/tests/conftest.py | 141 ++-------- .../tests/integration/arcticdb/test_arctic.py | 12 +- python/tests/util/storage_fixtures.py | 258 ++++++++++++++++++ 11 files changed, 375 insertions(+), 131 deletions(-) create mode 100644 python/tests/util/storage_fixtures.py diff --git a/cpp/arcticdb/storage/python_bindings.cpp b/cpp/arcticdb/storage/python_bindings.cpp index 0d30e34e84b..84cd25d45b2 100644 --- a/cpp/arcticdb/storage/python_bindings.cpp +++ b/cpp/arcticdb/storage/python_bindings.cpp @@ -100,6 +100,13 @@ void register_bindings(py::module& storage, py::exception(); }); }) + .def("check_primary_storage_is_accessible", [](Library& library) { + std::optional out; + library.storage_specific([&out](storage::s3::S3Storage& s3) { + out = s3.check_creds_and_bucket(); + }); + return out; + }, "Currently only implemented by S3 storage. Calling on other storage types does nothing.") ; py::class_(storage, "S3CredentialsOverride") diff --git a/cpp/arcticdb/storage/s3/s3_api.cpp b/cpp/arcticdb/storage/s3/s3_api.cpp index 42375138c72..e6cf7322f05 100644 --- a/cpp/arcticdb/storage/s3/s3_api.cpp +++ b/cpp/arcticdb/storage/s3/s3_api.cpp @@ -42,6 +42,7 @@ S3ApiInstance::S3ApiInstance(Aws::Utils::Logging::LogLevel log_level) : return; ARCTICDB_RUNTIME_DEBUG(log::Loggers::instance()->storage(), "Does not appear to be using AWS. Will set AWS_EC2_METADATA_DISABLED"); + ec2_metadata_disabled_by_us_ = true; #ifdef WIN32 _putenv_s("AWS_EC2_METADATA_DISABLED", "true"); #else diff --git a/cpp/arcticdb/storage/s3/s3_api.hpp b/cpp/arcticdb/storage/s3/s3_api.hpp index f8ef20a15d2..cae056be75a 100644 --- a/cpp/arcticdb/storage/s3/s3_api.hpp +++ b/cpp/arcticdb/storage/s3/s3_api.hpp @@ -25,9 +25,13 @@ class S3ApiInstance { static std::shared_ptr instance(); static void destroy_instance(); + bool is_ec2_metadata_disabled_by_us() const { + return ec2_metadata_disabled_by_us_; + } private: Aws::Utils::Logging::LogLevel log_level_; Aws::SDKOptions options_; + bool ec2_metadata_disabled_by_us_; }; } //namespace arcticdb::storage::s3 \ No newline at end of file diff --git a/cpp/arcticdb/storage/s3/s3_storage.cpp b/cpp/arcticdb/storage/s3/s3_storage.cpp index 8ab2f6da68a..125c61335b4 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.cpp +++ b/cpp/arcticdb/storage/s3/s3_storage.cpp @@ -7,6 +7,8 @@ #include #include +#include +#include #include #include #include @@ -32,6 +34,48 @@ std::streamsize S3StreamBuffer::xsputn(const char_type* s, std::streamsize n) { } } +void check_ec2_metadata_endpoint(const S3ApiInstance& s3_api) { + auto disabled_env = Aws::Environment::GetEnv("AWS_EC2_METADATA_DISABLED"); + if (Aws::Utils::StringUtils::ToLower(disabled_env.c_str()) == "true") { + const char* who_disabled = s3_api.is_ec2_metadata_disabled_by_us() ? + "EC2 metadata endpoint did not respond in time so was bypassed": + "AWS_EC2_METADATA_DISABLED environment variable is set to true"; + log::storage().warn("{}. This means machine identity authentication will not work."); + } +} + +bool S3Storage::check_creds_and_bucket() { + using namespace Aws::S3; + // We cannot easily change the timeouts of the s3_client_, so use async: + auto future = s3_client_.HeadBucketCallable(Model::HeadBucketRequest().WithBucket(bucket_name_.c_str())); + auto wait = std::chrono::milliseconds(ConfigsMap::instance()->get_int("S3Storage.CheckBucketMaxWait", 1000)); + if (future.wait_for(wait) == std::future_status::ready) { + auto outcome = future.get(); + if (!outcome.IsSuccess()) { + auto& error = outcome.GetError(); + +#define BUCKET_LOG(level, msg, ...) log::storage().level(msg "\nHTTP Status: {}. Server response: {}", \ + ## __VA_ARGS__, int(error.GetResponseCode()), error.GetMessage().c_str()); break + + // HEAD request can't return the error details, so can't use the more detailed error codes. + switch (error.GetResponseCode()) { + case Aws::Http::HttpResponseCode::UNAUTHORIZED: + case Aws::Http::HttpResponseCode::FORBIDDEN: + BUCKET_LOG(warn, "Unable to access bucket. Subsequent operations may fail."); + case Aws::Http::HttpResponseCode::NOT_FOUND: + BUCKET_LOG(error, "The specified bucket {} does not exist.", bucket_name_); + default: + BUCKET_LOG(info, "Unable to determine if the bucket is accessible."); + } +#undef BUCKET_LOG + } + return outcome.IsSuccess(); + } else { + log::storage().info("Unable to determine if the bucket is accessible. Request timed out."); + } + return false; +} + S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const Config &conf) : Parent(library_path, mode), s3_api_(S3ApiInstance::instance()), @@ -42,6 +86,7 @@ S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const Confi if (creds.GetAWSAccessKeyId() == USE_AWS_CRED_PROVIDERS_TOKEN && creds.GetAWSSecretKey() == USE_AWS_CRED_PROVIDERS_TOKEN){ ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using AWS auth mechanisms"); + check_ec2_metadata_endpoint(*s3_api_); s3_client_ = Aws::S3::S3Client(get_s3_config(conf), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, conf.use_virtual_addressing()); } else { ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using provided auth credentials"); diff --git a/cpp/arcticdb/storage/s3/s3_storage.hpp b/cpp/arcticdb/storage/s3/s3_storage.hpp index 3f6f3a287c3..7463b1f871f 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.hpp +++ b/cpp/arcticdb/storage/s3/s3_storage.hpp @@ -43,6 +43,9 @@ class S3Storage final : public Storage { */ std::string get_key_path(const VariantKey& key) const; + /* Logs warning(s) and returns false if there are likely to be access problems.*/ + bool check_creds_and_bucket(); + protected: void do_write(Composite&& kvs); @@ -232,6 +235,12 @@ inline Aws::Client::ClientConfiguration get_proxy_config(Aws::Http::Scheme endpo } } +namespace { +uint32_t fallback(uint32_t protobuf_config, const std::string& label, uint32_t default_val) { + return protobuf_config != 0 ? protobuf_config : ConfigsMap::instance()->get_int(label, default_val); +} +} + template auto get_s3_config(const ConfigType& conf) { auto endpoint_scheme = conf.https() ? Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP; @@ -246,11 +255,9 @@ auto get_s3_config(const ConfigType& conf) { util::check_arg(!endpoint.empty(), "S3 Endpoint must be specified"); client_configuration.endpointOverride = endpoint; client_configuration.verifySSL = false; - client_configuration.maxConnections = conf.max_connections() == 0 ? - ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", 16) : - conf.max_connections(); - client_configuration.connectTimeoutMs = conf.connect_timeout() == 0 ? 30000 : conf.connect_timeout(); - client_configuration.requestTimeoutMs = conf.request_timeout() == 0 ? 200000 : conf.request_timeout(); + client_configuration.maxConnections = fallback(conf.max_connections(), "VersionStore.NumIOThreads", 16); + client_configuration.connectTimeoutMs = fallback(conf.connect_timeout(), "S3Storage.ConnectTimeoutMs", 30000); + client_configuration.requestTimeoutMs = fallback(conf.request_timeout(), "S3Storage.RequestTimeoutMs", 200000); return client_configuration; } diff --git a/python/arcticdb/adapters/arctic_library_adapter.py b/python/arcticdb/adapters/arctic_library_adapter.py index 94d1d146526..1cf3dfc4b4f 100644 --- a/python/arcticdb/adapters/arctic_library_adapter.py +++ b/python/arcticdb/adapters/arctic_library_adapter.py @@ -69,3 +69,12 @@ def delete_library(self, library: Library, library_config: LibraryConfig): def get_storage_override(self) -> StorageOverride: return StorageOverride() + + def check_storage_is_accessible(self) -> Optional[bool]: + """Logs message(s) to inform the user if there are obvious issues with the storage. + + Returns + ------- + None if no check is implemented. Otherwise returns if all checks have passed + """ + return self.config_library.check_primary_storage_is_accessible() diff --git a/python/arcticdb/arctic.py b/python/arcticdb/arctic.py index 2b45c4982b1..fd7f79f7525 100644 --- a/python/arcticdb/arctic.py +++ b/python/arcticdb/arctic.py @@ -98,6 +98,8 @@ def __init__(self, uri: str, encoding_version: EncodingVersion = EncodingVersion self._library_manager = LibraryManager(self._library_adapter.config_library) self._uri = uri + self._library_adapter.check_storage_is_accessible() + def __getitem__(self, name: str) -> Library: storage_override = self._library_adapter.get_storage_override() lib = NativeVersionStore( diff --git a/python/arcticdb/util/test.py b/python/arcticdb/util/test.py index 90a121c5531..6b8101cb75f 100644 --- a/python/arcticdb/util/test.py +++ b/python/arcticdb/util/test.py @@ -6,6 +6,8 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ import os +import sys +import signal from contextlib import contextmanager from typing import Mapping, Any, Optional, Iterable, NamedTuple, List, AnyStr import numpy as np @@ -195,6 +197,14 @@ def configure_test_logger(level="INFO"): configure(get_test_logger_config(level=level, outputs=outputs), force=True) +def gracefully_terminate_process(p): + p.terminate() + if sys.platform != "win32": + p.join(timeout=2) + if p.exitcode is None: + os.kill(p.pid, signal.SIGKILL) # TODO (python37): use Process.kill() + + CustomThing = NamedTuple( "CustomThing", [("custom_index", np.ndarray), ("custom_columns", List[AnyStr]), ("custom_values", List[np.ndarray])] ) diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 75e17e00a6b..2636a146b9a 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -6,136 +6,60 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ import functools -import multiprocessing import shutil - -import boto3 -import werkzeug -from moto.server import DomainDispatcherApplication, create_backend_app - -import sys -import signal import enum - -if sys.platform == "win32": - # Hack to define signal.SIGKILL as some deps eg pytest-test-fixtures hardcode SIGKILL terminations. - signal.SIGKILL = signal.SIGINT - -import time import os import pytest +import requests import numpy as np import pandas as pd import random from datetime import datetime from typing import Optional, Any, Dict -import requests -from pytest_server_fixtures.base import get_ephemeral_port - from arcticdb.arctic import Arctic -from arcticdb.version_store.helper import create_test_lmdb_cfg, create_test_s3_cfg, create_test_mongo_cfg -from arcticdb.config import Defaults -from arcticdb.util.test import configure_test_logger, apply_lib_cfg -from arcticdb.version_store.helper import ArcticMemoryConfig +from arcticdb.version_store.helper import create_test_lmdb_cfg, create_test_mongo_cfg from arcticdb.version_store import NativeVersionStore +from arcticdb.util.test import configure_test_logger from arcticdb.version_store._normalization import MsgPackNormalizer +from tests.util.storage_fixtures import MotoS3StorageFixtureFactory, _version_store_factory_impl configure_test_logger() -BUCKET_ID = 0 # Use a smaller memory mapped limit for all tests MsgPackNormalizer.MMAP_DEFAULT_SIZE = 20 * (1 << 20) -def run_server(port): - werkzeug.run_simple( - "0.0.0.0", port, DomainDispatcherApplication(create_backend_app, service="s3"), threaded=True, ssl_context=None - ) - - -@pytest.fixture(scope="module") -def _moto_s3_uri_module(): - port = get_ephemeral_port() - p = multiprocessing.Process(target=run_server, args=(port,)) - p.start() - - time.sleep(0.5) - - yield f"http://localhost:{port}" - - try: - # terminate sends SIGTERM - no need to be polite here so... - os.kill(p.pid, signal.SIGKILL) - - p.terminate() - p.join() - except Exception: - pass - - -@pytest.fixture(scope="function") -def boto_client(_moto_s3_uri_module): - endpoint = _moto_s3_uri_module - client = boto3.client( - service_name="s3", endpoint_url=endpoint, aws_access_key_id="awd", aws_secret_access_key="awd" - ) - - yield client - - -@pytest.fixture -def aws_access_key(): - return "awd" +@pytest.fixture(scope="session") +def s3_storage_factory(): + with MotoS3StorageFixtureFactory() as f: + yield f @pytest.fixture -def aws_secret_key(): - return "awd" - - -@pytest.fixture(scope="function") -def moto_s3_endpoint_and_credentials(_moto_s3_uri_module, aws_access_key, aws_secret_key): - global BUCKET_ID - - endpoint = _moto_s3_uri_module - port = endpoint.rsplit(":", 1)[1] - client = boto3.client( - service_name="s3", endpoint_url=endpoint, aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key - ) - - bucket = f"test_bucket_{BUCKET_ID}" - client.create_bucket(Bucket=bucket) - BUCKET_ID = BUCKET_ID + 1 - yield endpoint, port, bucket, aws_access_key, aws_secret_key - - -@pytest.fixture(scope="function") -def moto_s3_uri_incl_bucket(moto_s3_endpoint_and_credentials): - endpoint, port, bucket, aws_access_key, aws_secret_key = moto_s3_endpoint_and_credentials - yield endpoint.replace("http://", "s3://").rsplit(":", 1)[ - 0 - ] + ":" + bucket + "?access=" + aws_access_key + "&secret=" + aws_secret_key + "&port=" + port +def s3_bucket(s3_storage_factory): + with s3_storage_factory.create_fixture() as f: + yield f @pytest.fixture(scope="function", params=("S3", "LMDB")) -def arctic_client(request, moto_s3_uri_incl_bucket, tmpdir, encoding_version): +def arctic_client(request, s3_bucket, tmpdir, encoding_version): if request.param == "S3": - ac = Arctic(moto_s3_uri_incl_bucket, encoding_version) + ac = Arctic(s3_bucket.get_arctic_uri(), encoding_version) elif request.param == "LMDB": ac = Arctic(f"lmdb://{tmpdir}", encoding_version) else: raise NotImplementedError() assert not ac.list_libraries() - yield ac + return ac @pytest.fixture(scope="function") def arctic_library(arctic_client): arctic_client.create_library("pytest_test_lib") - yield arctic_client["pytest_test_lib"] + return arctic_client["pytest_test_lib"] @pytest.fixture() @@ -148,25 +72,6 @@ def lib_name(): return f"local.test_{random.randint(0, 999)}_{datetime.utcnow().strftime('%Y-%m-%dT%H_%M_%S_%f')}" -def _version_store_factory_impl( - used, make_cfg, default_name, *, name: str = None, reuse_name=False, **kwargs -) -> NativeVersionStore: - """Common logic behind all the factory fixtures""" - name = name or default_name - if name == "_unique_": - name = name + str(len(used)) - - assert (name not in used) or reuse_name, f"{name} is already in use" - cfg = make_cfg(name) - lib = cfg.env_by_id[Defaults.ENV].lib_by_path[name] - # Use symbol list by default (can still be overridden by kwargs) - lib.version.symbol_list = True - apply_lib_cfg(lib, kwargs) - out = ArcticMemoryConfig(cfg, Defaults.ENV)[name] - used[name] = out - return out - - @enum.unique class EncodingVersion(enum.IntEnum): V1 = 0 @@ -232,25 +137,13 @@ def create_version_store( @pytest.fixture -def s3_store_factory(lib_name, moto_s3_endpoint_and_credentials): +def s3_store_factory(lib_name, s3_bucket): """Factory to create any number of S3 libs with the given WriteOptions or VersionStoreConfig. `name` can be a magical value "_unique_" which will create libs with unique names. This factory will clean up any libraries requested """ - endpoint, port, bucket, aws_access_key, aws_secret_key = moto_s3_endpoint_and_credentials - - # Not exposing the config factory to discourage people from creating libs that won't get cleaned up - def make_cfg(name): - # with_prefix=False to allow reuse_name to work correctly - return create_test_s3_cfg(name, aws_access_key, aws_secret_key, bucket, endpoint, with_prefix=False) - - used = {} - try: - yield functools.partial(_version_store_factory_impl, used, make_cfg, lib_name) - finally: - for lib in used.values(): - lib.version_store.clear() + return s3_bucket.create_version_store_factory(lib_name) @pytest.fixture diff --git a/python/tests/integration/arcticdb/test_arctic.py b/python/tests/integration/arcticdb/test_arctic.py index 46ec8c4a304..aed70bd429f 100644 --- a/python/tests/integration/arcticdb/test_arctic.py +++ b/python/tests/integration/arcticdb/test_arctic.py @@ -6,12 +6,10 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ import sys -import os import pytz from arcticdb_ext.exceptions import InternalException from arcticdb.exceptions import ArcticNativeNotYetImplemented -from pandas import Timestamp try: from arcticdb.version_store import VersionedItem as PythonVersionedItem @@ -59,6 +57,16 @@ ) +@pytest.fixture +def boto_client(s3_bucket): + return s3_bucket.make_boto_client() + + +@pytest.fixture +def moto_s3_uri_incl_bucket(s3_bucket): + return s3_bucket.get_arctic_uri() + + def test_library_creation_deletion(arctic_client): ac = arctic_client assert ac.list_libraries() == [] diff --git a/python/tests/util/storage_fixtures.py b/python/tests/util/storage_fixtures.py new file mode 100644 index 00000000000..13a247bfafa --- /dev/null +++ b/python/tests/util/storage_fixtures.py @@ -0,0 +1,258 @@ +""" +Copyright 2023 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file LICENSE.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. +""" + +import multiprocessing +import os +import sys +import time +import functools +import pytest +import json +import requests +from typing import Optional, Any, Dict, Callable, NamedTuple +from contextlib import AbstractContextManager, contextmanager +from abc import abstractmethod +from tempfile import mkdtemp + +import boto3 +import werkzeug +from pytest_server_fixtures.base import get_ephemeral_port +from moto.server import DomainDispatcherApplication, create_backend_app + +from arcticdb.config import Defaults +from arcticdb.version_store import NativeVersionStore +from arcticdb.version_store.helper import ArcticMemoryConfig, add_s3_library_to_env, add_lmdb_library_to_env +from arcticdb.util.test import apply_lib_cfg, gracefully_terminate_process +from arcticc.pb2.storage_pb2 import EnvironmentConfigsMap + + +def _version_store_factory_impl( + used, make_cfg, default_name, *, name: str = None, reuse_name=False, **kwargs +) -> NativeVersionStore: + """Common logic behind all the factory fixtures""" + name = name or default_name + if name == "_unique_": + name = name + str(len(used)) + assert (name not in used) or reuse_name, f"{name} is already in use" + cfg = make_cfg(name) + lib = cfg.env_by_id[Defaults.ENV].lib_by_path[name] + # Use symbol list by default (can still be overridden by kwargs) + lib.version.symbol_list = True + apply_lib_cfg(lib, kwargs) + out = ArcticMemoryConfig(cfg, Defaults.ENV)[name] + used[name] = out + return out + + +class StorageFixture(AbstractContextManager): + """Manages the life-cycle of a piece of storage and provides test facing methods:""" + + generated_libs: Dict[str, NativeVersionStore] = {} + + def __enter__(self): + """Fixtures are typically set up in __init__, so this just returns self""" + return self + + @abstractmethod + def get_arctic_uri(self) -> str: + """Return the URI for use with the ``Arctic`` constructor""" + + @abstractmethod + def create_test_cfg(self, lib_name: str) -> EnvironmentConfigsMap: + """Creates a new storage config instance. + If ``lib_name`` is the same as a previous call on this instance, then that storage should be reused.""" + + def create_version_store_factory(self, default_lib_name: str): + """Returns a function that takes optional library options and produces ``NativeVersionStore``s""" + return functools.partial( + _version_store_factory_impl, self.generated_libs, self.create_test_cfg, default_lib_name + ) + + @abstractmethod + def set_permission(self, *, read: bool, write: bool): + """Makes the connection to the storage have the given permissions. If unsupported, call ``pytest.skip()``. + See ``set_enforcing_permissions`` below.""" + + +class StorageFixtureFactory(AbstractContextManager): + """For ``StorageFixture``s backed by shared/expensive resources, implement this class to manage those.""" + + enforcing_permissions = False + + def __exit__(self, exc_type, exc_value, traceback): + """Properly clean up the fixture. This should be safe to be called multiple times.""" + + def set_enforcing_permissions(self, enforcing: bool): + """Controls permission enforcement. + Should affect subsequent calls to ``create_fixture()`` and possibly existing fixtures.""" + pytest.skip(f"{self.__class__.__name__} does not support permissions") + + @contextmanager + def enforcing_permissions_context(self, set_to=True): + try: + saved = self.enforcing_permissions + self.set_enforcing_permissions(set_to) + yield + finally: + self.set_enforcing_permissions(saved) + + @abstractmethod + def create_fixture(self) -> StorageFixture: + ... + + +Key = NamedTuple("Key", [("id", str), ("secret", str), ("user_name", str)]) + + +# ========================== Concrete sub-types ========================== +class S3Bucket(StorageFixture): + _bucket_id = 0 + bucket: str + key: Key + + def __init__(self, factory: "MotoS3StorageFixtureFactory"): + self.factory = factory + + bucket = self.bucket = f"test_bucket_{self._bucket_id}" + factory._s3_admin.create_bucket(Bucket=bucket) + self._bucket_id = self._bucket_id + 1 + + if factory.enforcing_permissions: + self.key = factory._create_user_get_key(bucket + "_user") + else: + self.key = factory._DUMMY_KEY + + def __exit__(self, exc_type, exc_value, traceback): + for lib in self.generated_libs.values(): + lib.version_store.clear() + + self.factory._s3_admin.delete_bucket(Bucket=self.bucket) + + def get_arctic_uri(self): + s3 = self.factory + return f"s3://{s3.host}:{self.bucket}?access={self.key.id}&secret={self.key.secret}&port={s3.port}" + + def make_boto_client(self): + return self.factory._boto("s3", self.key) + + def create_test_cfg(self, lib_name: str) -> EnvironmentConfigsMap: + cfg = EnvironmentConfigsMap() + add_s3_library_to_env( + cfg, + lib_name=lib_name, + env_name=Defaults.ENV, + credential_name=self.key.id, + credential_key=self.key.secret, + bucket_name=self.bucket, + endpoint=self.factory.endpoint, + with_prefix=False, # to allow s3_store_factory reuse_name to work correctly + ) + return cfg + + def set_permission(self, *, read: bool, write: bool): + f = self.factory + assert f.enforcing_permissions and self.key is not f._DUMMY_KEY + if read: + f._iam_admin.put_user_policy( + UserName=self.key.user_name, PolicyName="bucket", PolicyDocument=f._RW_POLICY if write else f._RO_POLICY + ) + else: + f._iam_admin.delete_user_policy(UserName=self.key.user_name, PolicyName="bucket") + + +class _HostDispatcherApplication(DomainDispatcherApplication): + """The stand-alone server needs a way to distinguish between S3 and IAM. We use the host for that""" + + _MAP = {"localhost": "s3", "127.0.0.1": "iam"} + + def get_backend_for_host(self, host): + return self._MAP.get(host, host) + + +class MotoS3StorageFixtureFactory(StorageFixtureFactory): + _DUMMY_KEY = Key("awd", "awd", "dummy") + _RO_POLICY: str + _RW_POLICY: str + host = "localhost" + port: int + endpoint: str + _iam_endpoint: str + _p: multiprocessing.Process + _s3_admin: Any + _iam_admin: Any = None + + @staticmethod + def run_server(port): + werkzeug.run_simple( + "0.0.0.0", + port, + _HostDispatcherApplication(create_backend_app), + threaded=True, + ssl_context=None, + ) + + def _boto(self, service: str, key: Key): + return boto3.client( + service_name=service, + endpoint_url=self.endpoint if service == "s3" else self._iam_endpoint, + region_name="us-east-1", + aws_access_key_id=key.id, + aws_secret_access_key=key.secret, + ) + + def __enter__(self): + port = self.port = get_ephemeral_port() + self.endpoint = f"http://{self.host}:{port}" + self._iam_endpoint = f"http://127.0.0.1:{port}" + + p = self._p = multiprocessing.Process(target=self.run_server, args=(port,)) + p.start() + time.sleep(0.5) + + self._s3_admin = self._boto("s3", self._DUMMY_KEY) + return self + + def __exit__(self, exc_type, exc_value, traceback): + gracefully_terminate_process(self._p) + + def _create_user_get_key(self, user: str, iam=None): + iam = iam or self._iam_admin + user_id = iam.create_user(UserName=user)["User"]["UserId"] + response = iam.create_access_key(UserName=user)["AccessKey"] + return Key(response["AccessKeyId"], response["SecretAccessKey"], user) + + def set_enforcing_permissions(self, enforcing: bool): + # Inspired by https://github.com/getmoto/moto/blob/master/tests/test_s3/test_s3_auth.py + if enforcing == self.enforcing_permissions: + return + if enforcing and not self._iam_admin: + iam = self._boto("iam", self._DUMMY_KEY) + + def _policy(*statements): + return json.dumps({"Version": "2012-10-17", "Statement": statements}) + + policy = _policy( + {"Effect": "Allow", "Action": "s3:*", "Resource": "*"}, + {"Effect": "Allow", "Action": "iam:*", "Resource": "*"}, + ) + policy_arn = iam.create_policy(PolicyName="admin", PolicyDocument=policy)["Policy"]["Arn"] + + self._RO_POLICY = _policy({"Effect": "Allow", "Action": ["s3:List*", "s3:Get*"], "Resource": "*"}) + self._RW_POLICY = _policy({"Effect": "Allow", "Action": "s3:*", "Resource": "*"}) + + key = self._create_user_get_key("admin", iam) + iam.attach_user_policy(UserName="admin", PolicyArn=policy_arn) + self._iam_admin = self._boto("iam", key) + self._s3_admin = self._boto("s3", key) + + # The number is the remaining requests before permission checks kick in + requests.post(self._iam_endpoint + "/moto-api/reset-auth", "0" if enforcing else "inf") + self.enforcing_permissions = enforcing + + def create_fixture(self) -> S3Bucket: + return S3Bucket(self)