Skip to content

Commit

Permalink
ARROW-15165: [Python] Expose function to resolve S3 bucket region
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Dec 20, 2021
1 parent 238b363 commit 62c8d3b
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 15 deletions.
9 changes: 7 additions & 2 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {

if (!region_set && !bucket.empty() && options.endpoint_override.empty()) {
// XXX Should we use a dedicated resolver with the given credentials?
ARROW_ASSIGN_OR_RAISE(options.region, ResolveBucketRegion(bucket));
ARROW_ASSIGN_OR_RAISE(options.region, ResolveS3BucketRegion(bucket));
}

return options;
Expand Down Expand Up @@ -2474,7 +2474,12 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
// Top-level utility functions
//

Result<std::string> ResolveBucketRegion(const std::string& bucket) {
Result<std::string> ResolveS3BucketRegion(const std::string& bucket) {
if (bucket.empty() || bucket.find_first_of(kSep) != bucket.npos ||
internal::IsLikelyUri(bucket)) {
return Status::Invalid("Not a valid bucket name: '", bucket, "'");
}

ARROW_ASSIGN_OR_RAISE(auto resolver, RegionResolver::DefaultInstance());
return resolver->ResolveRegion(bucket);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ ARROW_EXPORT
Status FinalizeS3();

ARROW_EXPORT
Result<std::string> ResolveBucketRegion(const std::string& bucket);
Result<std::string> ResolveS3BucketRegion(const std::string& bucket);

} // namespace fs
} // namespace arrow
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/s3fs_narrative_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void TestMain(int argc, char** argv) {
ASSERT_OK(InitializeS3(options));

if (FLAGS_region.empty()) {
ASSERT_OK_AND_ASSIGN(FLAGS_region, ResolveBucketRegion(FLAGS_bucket));
ASSERT_OK_AND_ASSIGN(FLAGS_region, ResolveS3BucketRegion(FLAGS_bucket));
}

if (FLAGS_clear) {
Expand Down
19 changes: 13 additions & 6 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,28 +303,35 @@ TEST_F(S3OptionsTest, FromAssumeRole) {
class S3RegionResolutionTest : public AwsTestMixin {};

TEST_F(S3RegionResolutionTest, PublicBucket) {
ASSERT_OK_AND_EQ("us-east-2", ResolveBucketRegion("ursa-labs-taxi-data"));
ASSERT_OK_AND_EQ("us-east-2", ResolveS3BucketRegion("ursa-labs-taxi-data"));

// Taken from a registry of open S3-hosted datasets
// at https://github.com/awslabs/open-data-registry
ASSERT_OK_AND_EQ("eu-west-2", ResolveBucketRegion("aws-earth-mo-atmospheric-ukv-prd"));
ASSERT_OK_AND_EQ("eu-west-2",
ResolveS3BucketRegion("aws-earth-mo-atmospheric-ukv-prd"));
// Same again, cached
ASSERT_OK_AND_EQ("eu-west-2", ResolveBucketRegion("aws-earth-mo-atmospheric-ukv-prd"));
ASSERT_OK_AND_EQ("eu-west-2",
ResolveS3BucketRegion("aws-earth-mo-atmospheric-ukv-prd"));
}

TEST_F(S3RegionResolutionTest, RestrictedBucket) {
ASSERT_OK_AND_EQ("us-west-2", ResolveBucketRegion("ursa-labs-r-test"));
ASSERT_OK_AND_EQ("us-west-2", ResolveS3BucketRegion("ursa-labs-r-test"));
// Same again, cached
ASSERT_OK_AND_EQ("us-west-2", ResolveBucketRegion("ursa-labs-r-test"));
ASSERT_OK_AND_EQ("us-west-2", ResolveS3BucketRegion("ursa-labs-r-test"));
}

TEST_F(S3RegionResolutionTest, NonExistentBucket) {
auto maybe_region = ResolveBucketRegion("ursa-labs-non-existent-bucket");
auto maybe_region = ResolveS3BucketRegion("ursa-labs-non-existent-bucket");
ASSERT_RAISES(IOError, maybe_region);
ASSERT_THAT(maybe_region.status().message(),
::testing::HasSubstr("Bucket 'ursa-labs-non-existent-bucket' not found"));
}

TEST_F(S3RegionResolutionTest, InvalidBucketName) {
ASSERT_RAISES(Invalid, ResolveS3BucketRegion("s3:bucket"));
ASSERT_RAISES(Invalid, ResolveS3BucketRegion("foo/bar"));
}

////////////////////////////////////////////////////////////////////////////
// S3FileSystem region test

Expand Down
10 changes: 6 additions & 4 deletions python/pyarrow/_fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -336,15 +336,17 @@ cdef class FileSystem(_Weakrefable):
the FileSystem instance.
"""
cdef:
c_string path
c_string c_path
c_string c_uri
CResult[shared_ptr[CFileSystem]] result

if isinstance(uri, pathlib.Path):
# Make absolute
uri = uri.resolve().absolute()
uri = _stringify_path(uri)
result = CFileSystemFromUriOrPath(tobytes(uri), &path)
return FileSystem.wrap(GetResultValue(result)), frombytes(path)
c_uri = tobytes(_stringify_path(uri))
with nogil:
result = CFileSystemFromUriOrPath(c_uri, &c_path)
return FileSystem.wrap(GetResultValue(result)), frombytes(c_path)

cdef init(self, const shared_ptr[CFileSystem]& wrapped):
self.wrapped = wrapped
Expand Down
25 changes: 25 additions & 0 deletions python/pyarrow/_s3fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,31 @@ def finalize_s3():
check_status(CFinalizeS3())


def resolve_s3_region(bucket):
"""
Resolve the S3 region of a bucket.
Parameters
----------
bucket : str
A S3 bucket name
Returns
-------
region : str
A S3 region name
"""
cdef:
c_string c_bucket
c_string c_region

c_bucket = tobytes(bucket)
with nogil:
c_region = GetResultValue(ResolveS3BucketRegion(c_bucket))

return frombytes(c_region)


cdef class S3FileSystem(FileSystem):
"""
S3-backed FileSystem implementation
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@

try:
from pyarrow._s3fs import ( # noqa
S3FileSystem, S3LogLevel, initialize_s3, finalize_s3)
S3FileSystem, S3LogLevel, initialize_s3, finalize_s3,
resolve_s3_region)
except ImportError:
_not_imported.append("S3FileSystem")
else:
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow_fs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
const CS3GlobalOptions& options)
cdef CStatus CFinalizeS3 "arrow::fs::FinalizeS3"()

cdef CResult[c_string] ResolveS3BucketRegion(const c_string& bucket)

cdef cppclass CHdfsOptions "arrow::fs::HdfsOptions":
HdfsConnectionConfig connection_config
int32_t buffer_size
Expand Down
12 changes: 12 additions & 0 deletions python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,18 @@ def test_s3_real_aws_region_selection():
assert fs.region == 'us-east-3'


@pytest.mark.s3
def test_resolve_s3_region():
from pyarrow.fs import resolve_s3_region
assert resolve_s3_region('ursa-labs-taxi-data') == 'us-east-2'
assert resolve_s3_region('mf-nwp-models') == 'eu-west-1'

with pytest.raises(ValueError, match="Not a valid bucket name"):
resolve_s3_region('foo/bar')
with pytest.raises(ValueError, match="Not a valid bucket name"):
resolve_s3_region('s3:bucket')


@pytest.mark.s3
def test_copy_files(s3_connection, s3fs, tempdir):
fs = s3fs["fs"]
Expand Down

0 comments on commit 62c8d3b

Please sign in to comment.