Skip to content

Commit

Permalink
Warn if bucket cannot be accessed (#473)
Browse files Browse the repository at this point in the history
+ warn if AWS auth is being used and machine identity is disabled
+ Refactor S3 fixtures so various errors can be simulated
* Removed pytest-server-fixtures dependency which is a bit old and we only use one function
  • Loading branch information
qc00 committed Jul 6, 2023
1 parent a542111 commit 698bc87
Show file tree
Hide file tree
Showing 13 changed files with 451 additions and 152 deletions.
7 changes: 7 additions & 0 deletions cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
return none.cast<py::object>();
});
})
.def("check_primary_storage_is_accessible", [](Library& library) {
std::optional<bool> out;
library.storage_specific([&out](storage::s3::S3Storage& s3) {
out = s3.check_creds_and_bucket();
});
return out;
}, "Currently only implemented by S3 storage. Calling on other storage types does nothing.")
;

py::class_<S3CredentialsOverride>(storage, "S3CredentialsOverride")
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/storage/s3/s3_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ S3ApiInstance::S3ApiInstance(Aws::Utils::Logging::LogLevel log_level) :
return;
ARCTICDB_RUNTIME_DEBUG(log::Loggers::instance()->storage(),
"Does not appear to be using AWS. Will set AWS_EC2_METADATA_DISABLED");
ec2_metadata_disabled_by_us_ = true;
#ifdef WIN32
_putenv_s("AWS_EC2_METADATA_DISABLED", "true");
#else
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/s3/s3_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ class S3ApiInstance {
static std::shared_ptr<S3ApiInstance> instance();
static void destroy_instance();

bool is_ec2_metadata_disabled_by_us() const {
return ec2_metadata_disabled_by_us_;
}
private:
Aws::Utils::Logging::LogLevel log_level_;
Aws::SDKOptions options_;
bool ec2_metadata_disabled_by_us_;
};

} //namespace arcticdb::storage::s3
45 changes: 45 additions & 0 deletions cpp/arcticdb/storage/s3/s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <arcticdb/storage/s3/s3_storage.hpp>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/platform/Environment.h>
#include <aws/s3/model/HeadBucketRequest.h>
#include <arcticdb/storage/s3/s3_api.hpp>
#include <arcticdb/log/log.hpp>
#include <locale>
Expand All @@ -32,6 +34,48 @@ std::streamsize S3StreamBuffer::xsputn(const char_type* s, std::streamsize n) {
}
}

void check_ec2_metadata_endpoint(const S3ApiInstance& s3_api) {
auto disabled_env = Aws::Environment::GetEnv("AWS_EC2_METADATA_DISABLED");
if (Aws::Utils::StringUtils::ToLower(disabled_env.c_str()) == "true") {
const char* who_disabled = s3_api.is_ec2_metadata_disabled_by_us() ?
"EC2 metadata endpoint did not respond in time so was bypassed":
"AWS_EC2_METADATA_DISABLED environment variable is set to true";
log::storage().warn("{}. This means machine identity authentication will not work.", who_disabled);
}
}

bool S3Storage::check_creds_and_bucket() {
using namespace Aws::S3;
// We cannot easily change the timeouts of the s3_client_, so use async:
auto future = s3_client_.HeadBucketCallable(Model::HeadBucketRequest().WithBucket(bucket_name_.c_str()));
auto wait = std::chrono::milliseconds(ConfigsMap::instance()->get_int("S3Storage.CheckBucketMaxWait", 1000));
if (future.wait_for(wait) == std::future_status::ready) {
auto outcome = future.get();
if (!outcome.IsSuccess()) {
auto& error = outcome.GetError();

#define BUCKET_LOG(level, msg, ...) log::storage().level(msg "\nHTTP Status: {}. Server response: {}", \
## __VA_ARGS__, int(error.GetResponseCode()), error.GetMessage().c_str()); break

// HEAD request can't return the error details, so can't use the more detailed error codes.
switch (error.GetResponseCode()) {
case Aws::Http::HttpResponseCode::UNAUTHORIZED:
case Aws::Http::HttpResponseCode::FORBIDDEN:
BUCKET_LOG(warn, "Unable to access bucket. Subsequent operations may fail.");
case Aws::Http::HttpResponseCode::NOT_FOUND:
BUCKET_LOG(error, "The specified bucket {} does not exist.", bucket_name_);
default:
BUCKET_LOG(info, "Unable to determine if the bucket is accessible.");
}
#undef BUCKET_LOG
}
return outcome.IsSuccess();
} else {
log::storage().info("Unable to determine if the bucket is accessible. Request timed out.");
}
return false;
}

S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const Config &conf) :
Parent(library_path, mode),
s3_api_(S3ApiInstance::instance()),
Expand All @@ -42,6 +86,7 @@ S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const Confi

if (creds.GetAWSAccessKeyId() == USE_AWS_CRED_PROVIDERS_TOKEN && creds.GetAWSSecretKey() == USE_AWS_CRED_PROVIDERS_TOKEN){
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using AWS auth mechanisms");
check_ec2_metadata_endpoint(*s3_api_);
s3_client_ = Aws::S3::S3Client(get_s3_config(conf), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, conf.use_virtual_addressing());
} else {
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using provided auth credentials");
Expand Down
19 changes: 14 additions & 5 deletions cpp/arcticdb/storage/s3/s3_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class S3Storage final : public Storage<S3Storage> {
*/
std::string get_key_path(const VariantKey& key) const;

/* Logs warning(s) and returns false if there are likely to be access problems.*/
bool check_creds_and_bucket();

protected:
void do_write(Composite<KeySegmentPair>&& kvs);

Expand Down Expand Up @@ -232,6 +235,12 @@ inline Aws::Client::ClientConfiguration get_proxy_config(Aws::Http::Scheme endpo
}
}

namespace {
inline uint32_t fallback(uint32_t protobuf_config, const std::string& label, uint32_t default_val) {
return protobuf_config != 0 ? protobuf_config : ConfigsMap::instance()->get_int(label, default_val);
}
}

template<typename ConfigType>
auto get_s3_config(const ConfigType& conf) {
auto endpoint_scheme = conf.https() ? Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP;
Expand All @@ -245,14 +254,14 @@ auto get_s3_config(const ConfigType& conf) {
auto endpoint = conf.endpoint();
util::check_arg(!endpoint.empty(), "S3 Endpoint must be specified");
client_configuration.endpointOverride = endpoint;

const bool verify_ssl = ConfigsMap::instance()->get_int("S3Storage.VerifySSL", conf.ssl());
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Verify ssl: {}", verify_ssl);
client_configuration.verifySSL = verify_ssl;
client_configuration.maxConnections = conf.max_connections() == 0 ?
ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", 16) :
conf.max_connections();
client_configuration.connectTimeoutMs = conf.connect_timeout() == 0 ? 30000 : conf.connect_timeout();
client_configuration.requestTimeoutMs = conf.request_timeout() == 0 ? 200000 : conf.request_timeout();

client_configuration.maxConnections = fallback(conf.max_connections(), "VersionStore.NumIOThreads", 16);
client_configuration.connectTimeoutMs = fallback(conf.connect_timeout(), "S3Storage.ConnectTimeoutMs", 30000);
client_configuration.requestTimeoutMs = fallback(conf.request_timeout(), "S3Storage.RequestTimeoutMs", 200000);
return client_configuration;
}

Expand Down
9 changes: 9 additions & 0 deletions python/arcticdb/adapters/arctic_library_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,12 @@ def delete_library(self, library: Library, library_config: LibraryConfig):

def get_storage_override(self) -> StorageOverride:
return StorageOverride()

def check_storage_is_accessible(self) -> Optional[bool]:
"""Logs message(s) to inform the user if there are obvious issues with the storage.
Returns
-------
None if no check is implemented. Otherwise returns if all checks have passed
"""
return self.config_library.check_primary_storage_is_accessible()
2 changes: 2 additions & 0 deletions python/arcticdb/arctic.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def __init__(self, uri: str, encoding_version: EncodingVersion = EncodingVersion
self._library_manager = LibraryManager(self._library_adapter.config_library)
self._uri = uri

self._library_adapter.check_storage_is_accessible()

def __getitem__(self, name: str) -> Library:
storage_override = self._library_adapter.get_storage_override()
lib = NativeVersionStore(
Expand Down
10 changes: 10 additions & 0 deletions python/arcticdb/util/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
import os
import sys
import signal
from contextlib import contextmanager
from typing import Mapping, Any, Optional, Iterable, NamedTuple, List, AnyStr
import numpy as np
Expand Down Expand Up @@ -195,6 +197,14 @@ def configure_test_logger(level="INFO"):
configure(get_test_logger_config(level=level, outputs=outputs), force=True)


def gracefully_terminate_process(p):
p.terminate()
if sys.platform != "win32":
p.join(timeout=2)
if p.exitcode is None:
os.kill(p.pid, signal.SIGKILL) # TODO (python37): use Process.kill()


CustomThing = NamedTuple(
"CustomThing", [("custom_index", np.ndarray), ("custom_columns", List[AnyStr]), ("custom_values", List[np.ndarray])]
)
Expand Down
Loading

0 comments on commit 698bc87

Please sign in to comment.