Skip to content

Commit

Permalink
#473 Warn if bucket cannot be accessed
Browse files Browse the repository at this point in the history
+ warn if AWS auth is being used and machine identity is disabled
+ Refactor S3 fixtures so various errors can be simulated
  • Loading branch information
qc00 committed Jul 3, 2023
1 parent 3bc665d commit ad9e003
Show file tree
Hide file tree
Showing 11 changed files with 375 additions and 131 deletions.
7 changes: 7 additions & 0 deletions cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
return none.cast<py::object>();
});
})
.def("check_primary_storage_is_accessible", [](Library& library) {
std::optional<bool> out;
library.storage_specific([&out](storage::s3::S3Storage& s3) {
out = s3.check_creds_and_bucket();
});
return out;
}, "Currently only implemented by S3 storage. Calling on other storage types does nothing.")
;

py::class_<S3CredentialsOverride>(storage, "S3CredentialsOverride")
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/storage/s3/s3_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ S3ApiInstance::S3ApiInstance(Aws::Utils::Logging::LogLevel log_level) :
return;
ARCTICDB_RUNTIME_DEBUG(log::Loggers::instance()->storage(),
"Does not appear to be using AWS. Will set AWS_EC2_METADATA_DISABLED");
ec2_metadata_disabled_by_us_ = true;
#ifdef WIN32
_putenv_s("AWS_EC2_METADATA_DISABLED", "true");
#else
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/s3/s3_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ class S3ApiInstance {
static std::shared_ptr<S3ApiInstance> instance();
static void destroy_instance();

bool is_ec2_metadata_disabled_by_us() const {
return ec2_metadata_disabled_by_us_;
}
private:
Aws::Utils::Logging::LogLevel log_level_;
Aws::SDKOptions options_;
bool ec2_metadata_disabled_by_us_;
};

} //namespace arcticdb::storage::s3
45 changes: 45 additions & 0 deletions cpp/arcticdb/storage/s3/s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <arcticdb/storage/s3/s3_storage.hpp>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/platform/Environment.h>
#include <aws/s3/model/HeadBucketRequest.h>
#include <arcticdb/storage/s3/s3_api.hpp>
#include <arcticdb/log/log.hpp>
#include <locale>
Expand All @@ -32,6 +34,48 @@ std::streamsize S3StreamBuffer::xsputn(const char_type* s, std::streamsize n) {
}
}

void check_ec2_metadata_endpoint(const S3ApiInstance& s3_api) {
auto disabled_env = Aws::Environment::GetEnv("AWS_EC2_METADATA_DISABLED");
if (Aws::Utils::StringUtils::ToLower(disabled_env.c_str()) == "true") {
const char* who_disabled = s3_api.is_ec2_metadata_disabled_by_us() ?
"EC2 metadata endpoint did not respond in time so was bypassed":
"AWS_EC2_METADATA_DISABLED environment variable is set to true";
log::storage().warn("{}. This means machine identity authentication will not work.");
}
}

bool S3Storage::check_creds_and_bucket() {
using namespace Aws::S3;
// We cannot easily change the timeouts of the s3_client_, so use async:
auto future = s3_client_.HeadBucketCallable(Model::HeadBucketRequest().WithBucket(bucket_name_.c_str()));
auto wait = std::chrono::milliseconds(ConfigsMap::instance()->get_int("S3Storage.CheckBucketMaxWait", 1000));
if (future.wait_for(wait) == std::future_status::ready) {
auto outcome = future.get();
if (!outcome.IsSuccess()) {
auto& error = outcome.GetError();

#define BUCKET_LOG(level, msg, ...) log::storage().level(msg "\nHTTP Status: {}. Server response: {}", \
## __VA_ARGS__, int(error.GetResponseCode()), error.GetMessage().c_str()); break

// HEAD request can't return the error details, so can't use the more detailed error codes.
switch (error.GetResponseCode()) {
case Aws::Http::HttpResponseCode::UNAUTHORIZED:
case Aws::Http::HttpResponseCode::FORBIDDEN:
BUCKET_LOG(warn, "Unable to access bucket. Subsequent operations may fail.");
case Aws::Http::HttpResponseCode::NOT_FOUND:
BUCKET_LOG(error, "The specified bucket {} does not exist.", bucket_name_);
default:
BUCKET_LOG(info, "Unable to determine if the bucket is accessible.");
}
#undef BUCKET_LOG
}
return outcome.IsSuccess();
} else {
log::storage().info("Unable to determine if the bucket is accessible. Request timed out.");
}
return false;
}

S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const Config &conf) :
Parent(library_path, mode),
s3_api_(S3ApiInstance::instance()),
Expand All @@ -42,6 +86,7 @@ S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const Confi

if (creds.GetAWSAccessKeyId() == USE_AWS_CRED_PROVIDERS_TOKEN && creds.GetAWSSecretKey() == USE_AWS_CRED_PROVIDERS_TOKEN){
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using AWS auth mechanisms");
check_ec2_metadata_endpoint(*s3_api_);
s3_client_ = Aws::S3::S3Client(get_s3_config(conf), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, conf.use_virtual_addressing());
} else {
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using provided auth credentials");
Expand Down
17 changes: 12 additions & 5 deletions cpp/arcticdb/storage/s3/s3_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class S3Storage final : public Storage<S3Storage> {
*/
std::string get_key_path(const VariantKey& key) const;

/* Logs warning(s) and returns false if there are likely to be access problems.*/
bool check_creds_and_bucket();

protected:
void do_write(Composite<KeySegmentPair>&& kvs);

Expand Down Expand Up @@ -232,6 +235,12 @@ inline Aws::Client::ClientConfiguration get_proxy_config(Aws::Http::Scheme endpo
}
}

namespace {
uint32_t fallback(uint32_t protobuf_config, const std::string& label, uint32_t default_val) {
return protobuf_config != 0 ? protobuf_config : ConfigsMap::instance()->get_int(label, default_val);
}
}

template<typename ConfigType>
auto get_s3_config(const ConfigType& conf) {
auto endpoint_scheme = conf.https() ? Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP;
Expand All @@ -246,11 +255,9 @@ auto get_s3_config(const ConfigType& conf) {
util::check_arg(!endpoint.empty(), "S3 Endpoint must be specified");
client_configuration.endpointOverride = endpoint;
client_configuration.verifySSL = false;
client_configuration.maxConnections = conf.max_connections() == 0 ?
ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", 16) :
conf.max_connections();
client_configuration.connectTimeoutMs = conf.connect_timeout() == 0 ? 30000 : conf.connect_timeout();
client_configuration.requestTimeoutMs = conf.request_timeout() == 0 ? 200000 : conf.request_timeout();
client_configuration.maxConnections = fallback(conf.max_connections(), "VersionStore.NumIOThreads", 16);
client_configuration.connectTimeoutMs = fallback(conf.connect_timeout(), "S3Storage.ConnectTimeoutMs", 30000);
client_configuration.requestTimeoutMs = fallback(conf.request_timeout(), "S3Storage.RequestTimeoutMs", 200000);
return client_configuration;
}

Expand Down
9 changes: 9 additions & 0 deletions python/arcticdb/adapters/arctic_library_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,12 @@ def delete_library(self, library: Library, library_config: LibraryConfig):

def get_storage_override(self) -> StorageOverride:
return StorageOverride()

def check_storage_is_accessible(self) -> Optional[bool]:
"""Logs message(s) to inform the user if there are obvious issues with the storage.
Returns
-------
None if no check is implemented. Otherwise returns if all checks have passed
"""
return self.config_library.check_primary_storage_is_accessible()
2 changes: 2 additions & 0 deletions python/arcticdb/arctic.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def __init__(self, uri: str, encoding_version: EncodingVersion = EncodingVersion
self._library_manager = LibraryManager(self._library_adapter.config_library)
self._uri = uri

self._library_adapter.check_storage_is_accessible()

def __getitem__(self, name: str) -> Library:
storage_override = self._library_adapter.get_storage_override()
lib = NativeVersionStore(
Expand Down
10 changes: 10 additions & 0 deletions python/arcticdb/util/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
import os
import sys
import signal
from contextlib import contextmanager
from typing import Mapping, Any, Optional, Iterable, NamedTuple, List, AnyStr
import numpy as np
Expand Down Expand Up @@ -195,6 +197,14 @@ def configure_test_logger(level="INFO"):
configure(get_test_logger_config(level=level, outputs=outputs), force=True)


def gracefully_terminate_process(p):
p.terminate()
if sys.platform != "win32":
p.join(timeout=2)
if p.exitcode is None:
os.kill(p.pid, signal.SIGKILL) # TODO (python37): use Process.kill()


CustomThing = NamedTuple(
"CustomThing", [("custom_index", np.ndarray), ("custom_columns", List[AnyStr]), ("custom_values", List[np.ndarray])]
)
Expand Down
141 changes: 17 additions & 124 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,136 +6,60 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
import functools
import multiprocessing
import shutil

import boto3
import werkzeug
from moto.server import DomainDispatcherApplication, create_backend_app

import sys
import signal
import enum

if sys.platform == "win32":
# Hack to define signal.SIGKILL as some deps eg pytest-test-fixtures hardcode SIGKILL terminations.
signal.SIGKILL = signal.SIGINT

import time
import os
import pytest
import requests
import numpy as np
import pandas as pd
import random
from datetime import datetime
from typing import Optional, Any, Dict

import requests
from pytest_server_fixtures.base import get_ephemeral_port

from arcticdb.arctic import Arctic
from arcticdb.version_store.helper import create_test_lmdb_cfg, create_test_s3_cfg, create_test_mongo_cfg
from arcticdb.config import Defaults
from arcticdb.util.test import configure_test_logger, apply_lib_cfg
from arcticdb.version_store.helper import ArcticMemoryConfig
from arcticdb.version_store.helper import create_test_lmdb_cfg, create_test_mongo_cfg
from arcticdb.version_store import NativeVersionStore
from arcticdb.util.test import configure_test_logger
from arcticdb.version_store._normalization import MsgPackNormalizer
from tests.util.storage_fixtures import MotoS3StorageFixtureFactory, _version_store_factory_impl


configure_test_logger()

BUCKET_ID = 0
# Use a smaller memory mapped limit for all tests
MsgPackNormalizer.MMAP_DEFAULT_SIZE = 20 * (1 << 20)


def run_server(port):
werkzeug.run_simple(
"0.0.0.0", port, DomainDispatcherApplication(create_backend_app, service="s3"), threaded=True, ssl_context=None
)


@pytest.fixture(scope="module")
def _moto_s3_uri_module():
port = get_ephemeral_port()
p = multiprocessing.Process(target=run_server, args=(port,))
p.start()

time.sleep(0.5)

yield f"http://localhost:{port}"

try:
# terminate sends SIGTERM - no need to be polite here so...
os.kill(p.pid, signal.SIGKILL)

p.terminate()
p.join()
except Exception:
pass


@pytest.fixture(scope="function")
def boto_client(_moto_s3_uri_module):
endpoint = _moto_s3_uri_module
client = boto3.client(
service_name="s3", endpoint_url=endpoint, aws_access_key_id="awd", aws_secret_access_key="awd"
)

yield client


@pytest.fixture
def aws_access_key():
return "awd"
@pytest.fixture(scope="session")
def s3_storage_factory():
with MotoS3StorageFixtureFactory() as f:
yield f


@pytest.fixture
def aws_secret_key():
return "awd"


@pytest.fixture(scope="function")
def moto_s3_endpoint_and_credentials(_moto_s3_uri_module, aws_access_key, aws_secret_key):
global BUCKET_ID

endpoint = _moto_s3_uri_module
port = endpoint.rsplit(":", 1)[1]
client = boto3.client(
service_name="s3", endpoint_url=endpoint, aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key
)

bucket = f"test_bucket_{BUCKET_ID}"
client.create_bucket(Bucket=bucket)
BUCKET_ID = BUCKET_ID + 1
yield endpoint, port, bucket, aws_access_key, aws_secret_key


@pytest.fixture(scope="function")
def moto_s3_uri_incl_bucket(moto_s3_endpoint_and_credentials):
endpoint, port, bucket, aws_access_key, aws_secret_key = moto_s3_endpoint_and_credentials
yield endpoint.replace("http://", "s3://").rsplit(":", 1)[
0
] + ":" + bucket + "?access=" + aws_access_key + "&secret=" + aws_secret_key + "&port=" + port
def s3_bucket(s3_storage_factory):
with s3_storage_factory.create_fixture() as f:
yield f


@pytest.fixture(scope="function", params=("S3", "LMDB"))
def arctic_client(request, moto_s3_uri_incl_bucket, tmpdir, encoding_version):
def arctic_client(request, s3_bucket, tmpdir, encoding_version):
if request.param == "S3":
ac = Arctic(moto_s3_uri_incl_bucket, encoding_version)
ac = Arctic(s3_bucket.get_arctic_uri(), encoding_version)
elif request.param == "LMDB":
ac = Arctic(f"lmdb://{tmpdir}", encoding_version)
else:
raise NotImplementedError()

assert not ac.list_libraries()
yield ac
return ac


@pytest.fixture(scope="function")
def arctic_library(arctic_client):
arctic_client.create_library("pytest_test_lib")
yield arctic_client["pytest_test_lib"]
return arctic_client["pytest_test_lib"]


@pytest.fixture()
Expand All @@ -148,25 +72,6 @@ def lib_name():
return f"local.test_{random.randint(0, 999)}_{datetime.utcnow().strftime('%Y-%m-%dT%H_%M_%S_%f')}"


def _version_store_factory_impl(
used, make_cfg, default_name, *, name: str = None, reuse_name=False, **kwargs
) -> NativeVersionStore:
"""Common logic behind all the factory fixtures"""
name = name or default_name
if name == "_unique_":
name = name + str(len(used))

assert (name not in used) or reuse_name, f"{name} is already in use"
cfg = make_cfg(name)
lib = cfg.env_by_id[Defaults.ENV].lib_by_path[name]
# Use symbol list by default (can still be overridden by kwargs)
lib.version.symbol_list = True
apply_lib_cfg(lib, kwargs)
out = ArcticMemoryConfig(cfg, Defaults.ENV)[name]
used[name] = out
return out


@enum.unique
class EncodingVersion(enum.IntEnum):
V1 = 0
Expand Down Expand Up @@ -232,25 +137,13 @@ def create_version_store(


@pytest.fixture
def s3_store_factory(lib_name, moto_s3_endpoint_and_credentials):
def s3_store_factory(lib_name, s3_bucket):
"""Factory to create any number of S3 libs with the given WriteOptions or VersionStoreConfig.
`name` can be a magical value "_unique_" which will create libs with unique names.
This factory will clean up any libraries requested
"""
endpoint, port, bucket, aws_access_key, aws_secret_key = moto_s3_endpoint_and_credentials

# Not exposing the config factory to discourage people from creating libs that won't get cleaned up
def make_cfg(name):
# with_prefix=False to allow reuse_name to work correctly
return create_test_s3_cfg(name, aws_access_key, aws_secret_key, bucket, endpoint, with_prefix=False)

used = {}
try:
yield functools.partial(_version_store_factory_impl, used, make_cfg, lib_name)
finally:
for lib in used.values():
lib.version_store.clear()
return s3_bucket.create_version_store_factory(lib_name)


@pytest.fixture
Expand Down
Loading

0 comments on commit ad9e003

Please sign in to comment.