Skip to content

Commit 127b48e

Browse files
jbmscopybara-github
authored andcommitted
Add KvStore URL pipeline support
This is in line with zarr-developers/zeps#48 and the syntax supported by Neuroglancer. Currently, zip is supported. OCDBT support will be added in a subsequent commit. PiperOrigin-RevId: 755691199 Change-Id: Ia6cb84c12a986a7dd0ba65e41454fbe6d415aed0
1 parent 115d273 commit 127b48e

28 files changed

+467
-110
lines changed

tensorstore/internal/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1612,6 +1612,7 @@ tensorstore_cc_library(
16121612
hdrs = ["uri_utils.h"],
16131613
deps = [
16141614
":ascii_set",
1615+
"@abseil-cpp//absl/status",
16151616
"@abseil-cpp//absl/strings",
16161617
],
16171618
)

tensorstore/internal/driver_kind_registry.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ struct DriverKindRegistry {
3535
absl::Mutex mutex;
3636
absl::flat_hash_map<std::string, DriverKind> driver_kinds
3737
ABSL_GUARDED_BY(mutex);
38+
absl::flat_hash_map<std::string, UrlSchemeKind> scheme_kinds
39+
ABSL_GUARDED_BY(mutex);
3840
};
3941

4042
DriverKindRegistry& GetDriverKindRegistry() {
@@ -84,5 +86,44 @@ std::ostream& operator<<(std::ostream& os, DriverKind x) {
8486
return os << DriverKindToStringView(x);
8587
}
8688

89+
void RegisterUrlSchemeKind(std::string_view scheme, UrlSchemeKind scheme_kind) {
90+
auto& registry = GetDriverKindRegistry();
91+
absl::MutexLock lock(&registry.mutex);
92+
if (auto result = registry.scheme_kinds.emplace(scheme, scheme_kind);
93+
!result.second) {
94+
ABSL_LOG(FATAL) << scheme << " already registered as "
95+
<< result.first->second;
96+
}
97+
}
98+
99+
std::optional<UrlSchemeKind> GetUrlSchemeKind(std::string_view scheme) {
100+
auto& registry = GetDriverKindRegistry();
101+
absl::MutexLock lock(&registry.mutex);
102+
auto it = registry.scheme_kinds.find(scheme);
103+
if (it == registry.scheme_kinds.end()) return std::nullopt;
104+
return it->second;
105+
}
106+
107+
std::string_view UrlSchemeKindToStringView(UrlSchemeKind x) {
108+
switch (x) {
109+
case UrlSchemeKind::kKvStoreRoot:
110+
return "root kvstore";
111+
case UrlSchemeKind::kKvStoreAdapter:
112+
return "kvstore adapter";
113+
case UrlSchemeKind::kTensorStoreRoot:
114+
return "root TensorStore";
115+
case UrlSchemeKind::kTensorStoreKvStoreAdapter:
116+
return "kvstore-based TensorStore";
117+
case UrlSchemeKind::kTensorStoreAdapter:
118+
return "TensorStore adapter";
119+
default:
120+
ABSL_UNREACHABLE();
121+
}
122+
}
123+
124+
std::ostream& operator<<(std::ostream& os, UrlSchemeKind x) {
125+
return os << UrlSchemeKindToStringView(x);
126+
}
127+
87128
} // namespace internal
88129
} // namespace tensorstore

tensorstore/internal/driver_kind_registry.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,53 @@ void RegisterDriverKind(std::string_view id, DriverKind kind,
5454
// registered.
5555
std::optional<DriverKind> GetDriverKind(std::string_view id);
5656

57+
// Indicates the kind of a registered URL scheme in the tensorstore library.
58+
//
59+
// The kvstore URL scheme handlers are registered in
60+
// tensorstore/kvstore/registry.h and the TensorStore URL scheme
61+
// handlers are registered in tensorstore/driver/registry.h.
62+
enum class UrlSchemeKind {
63+
// KvStore driver that directly accesses storage, e.g. "file"
64+
//
65+
// This must be first in a URL pipeline.
66+
kKvStoreRoot,
67+
// KvStore driver that adapts a base kvstore, e.g. "zip" or "ocdbt".
68+
//
69+
// This must immediately follow a `kKvStoreBase` component in a URL pipeline.
70+
kKvStoreAdapter,
71+
// TensorStore driver that directly accesses storage.
72+
//
73+
// This must be first in a URL pipeline.
74+
kTensorStoreRoot,
75+
// TensorStore driver that adapts a base kvstore, e.g. "zarr3"
76+
//
77+
// This must immediately follow a `kKvStoreBase` component in a URL pipeline.
78+
kTensorStoreKvStoreAdapter,
79+
// TensorStore driver that adapts a base TensorStore, e.g. "cast"
80+
//
81+
// This must immediately follow a `kTensorStoreBase` or
82+
// `kTensorStoreKvStoreAdapter` component in a URL pipeline.
83+
kTensorStoreAdapter,
84+
};
85+
86+
// Returns the name of a URL scheme kind for use in error messages.
87+
std::string_view UrlSchemeKindToStringView(UrlSchemeKind x);
88+
89+
template <typename Sink>
90+
void AbslStringify(Sink& sink, UrlSchemeKind x) {
91+
return sink.Append(UrlSchemeKindToStringView(x));
92+
}
93+
std::ostream& operator<<(std::ostream& os, UrlSchemeKind x);
94+
95+
// Registers a URL scheme. This is called automatically by
96+
// `internal_kvstore::UrlSchemeRegistration` and
97+
// `internal::UrlSchemeRegistration`.
98+
void RegisterUrlSchemeKind(std::string_view scheme, UrlSchemeKind scheme_kind);
99+
100+
// Returns the kind of a registered URL scheme, or `std::nullopt` if not
101+
// registered.
102+
std::optional<UrlSchemeKind> GetUrlSchemeKind(std::string_view scheme);
103+
57104
} // namespace internal
58105
} // namespace tensorstore
59106

tensorstore/internal/uri_utils.cc

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <string>
2424
#include <string_view>
2525

26+
#include "absl/status/status.h"
2627
#include "absl/strings/ascii.h"
2728
#include "absl/strings/match.h"
2829
#include "tensorstore/internal/ascii_set.h"
@@ -87,17 +88,18 @@ void PercentDecodeAppend(std::string_view src, std::string& dest) {
8788
}
8889
}
8990

90-
ParsedGenericUri ParseGenericUri(std::string_view uri) {
91-
static constexpr std::string_view kSchemeSep("://");
91+
namespace {
92+
ParsedGenericUri ParseGenericUriImpl(std::string_view uri,
93+
std::string_view scheme_delimiter) {
9294
ParsedGenericUri result;
93-
const auto scheme_start = uri.find(kSchemeSep);
95+
const auto scheme_start = uri.find(scheme_delimiter);
9496
std::string_view uri_suffix;
9597
if (scheme_start == std::string_view::npos) {
9698
// No scheme
9799
uri_suffix = uri;
98100
} else {
99101
result.scheme = uri.substr(0, scheme_start);
100-
uri_suffix = uri.substr(scheme_start + kSchemeSep.size());
102+
uri_suffix = uri.substr(scheme_start + scheme_delimiter.size());
101103
}
102104
const auto fragment_start = uri_suffix.find('#');
103105
const auto query_start = uri_suffix.substr(0, fragment_start).find('?');
@@ -127,6 +129,32 @@ ParsedGenericUri ParseGenericUri(std::string_view uri) {
127129
}
128130
return result;
129131
}
132+
} // namespace
133+
134+
ParsedGenericUri ParseGenericUri(std::string_view uri) {
135+
return ParseGenericUriImpl(uri, "://");
136+
}
137+
138+
ParsedGenericUri ParseGenericUriWithoutSlashSlash(std::string_view uri) {
139+
return ParseGenericUriImpl(uri, ":");
140+
}
141+
142+
absl::Status EnsureNoQueryOrFragment(const ParsedGenericUri& parsed_uri) {
143+
if (!parsed_uri.query.empty()) {
144+
return absl::InvalidArgumentError("Query string not supported");
145+
}
146+
if (!parsed_uri.fragment.empty()) {
147+
return absl::InvalidArgumentError("Fragment identifier not supported");
148+
}
149+
return absl::OkStatus();
150+
}
151+
152+
absl::Status EnsureNoPathOrQueryOrFragment(const ParsedGenericUri& parsed_uri) {
153+
if (!parsed_uri.authority_and_path.empty()) {
154+
return absl::InvalidArgumentError("Path not supported");
155+
}
156+
return EnsureNoQueryOrFragment(parsed_uri);
157+
}
130158

131159
std::optional<HostPort> SplitHostPort(std::string_view host_port) {
132160
if (host_port.empty()) return std::nullopt;

tensorstore/internal/uri_utils.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <string>
2323
#include <string_view>
2424

25+
#include "absl/status/status.h"
2526
#include "tensorstore/internal/ascii_set.h"
2627

2728
namespace tensorstore {
@@ -120,9 +121,20 @@ struct ParsedGenericUri {
120121
};
121122

122123
/// Parses a "generic" URI of the form
123-
/// `<scheme>://<authority-and-path>?<query>#<fragment>` where the `?<query>`
124-
/// and `#<fragment>` portions are optional.
124+
/// `<scheme><scheme_delimiter>//<authority-and-path>?<query>#<fragment>`
125+
/// where the `?<query>` and `#<fragment>` portions are optional.
126+
///
127+
/// `<scheme_delimiter>` is:
128+
/// - "://" for `ParseGenericUri`
129+
/// - ":" for `ParseGenericUriWithoutSlashSlash`
125130
ParsedGenericUri ParseGenericUri(std::string_view uri);
131+
ParsedGenericUri ParseGenericUriWithoutSlashSlash(std::string_view uri);
132+
133+
// Returns an error if there is a query or fragment.
134+
absl::Status EnsureNoQueryOrFragment(const ParsedGenericUri& parsed_uri);
135+
136+
// Returns an error if there is a path, query or fragment.
137+
absl::Status EnsureNoPathOrQueryOrFragment(const ParsedGenericUri& parsed_uri);
126138

127139
struct HostPort {
128140
std::string_view host;

tensorstore/kvstore/file/file_key_value_store.cc

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -910,12 +910,7 @@ Future<kvstore::DriverPtr> FileKeyValueStoreSpec::DoOpen() const {
910910
Result<kvstore::Spec> ParseFileUrl(std::string_view url) {
911911
auto parsed = internal::ParseGenericUri(url);
912912
assert(parsed.scheme == internal_file_kvstore::FileKeyValueStoreSpec::id);
913-
if (!parsed.query.empty()) {
914-
return absl::InvalidArgumentError("Query string not supported");
915-
}
916-
if (!parsed.fragment.empty()) {
917-
return absl::InvalidArgumentError("Fragment identifier not supported");
918-
}
913+
TENSORSTORE_RETURN_IF_ERROR(internal::EnsureNoQueryOrFragment(parsed));
919914
std::string path = internal::PercentDecode(parsed.authority_and_path);
920915
auto driver_spec = internal::MakeIntrusivePtr<FileKeyValueStoreSpec>();
921916
driver_spec->data_.file_io_concurrency =

tensorstore/kvstore/file/file_key_value_store_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ TEST(FileKeyValueStoreTest, SpecRoundtrip) {
401401
std::string root = tempdir.path() + "/root";
402402
tensorstore::internal::KeyValueStoreSpecRoundtripOptions options;
403403
options.full_spec = {{"driver", "file"}, {"path", root}};
404+
options.url = "file://" + root;
404405
tensorstore::internal::TestKeyValueStoreSpecRoundtrip(options);
405406
}
406407

@@ -419,6 +420,7 @@ TEST(FileKeyValueStoreTest, SpecRoundtripSync) {
419420
{"file_io_locking", {{"mode", "lockfile"}}},
420421
}},
421422
};
423+
options.url = "file://" + root;
422424
options.spec_request_options.Set(tensorstore::retain_context).IgnoreError();
423425
tensorstore::internal::TestKeyValueStoreSpecRoundtrip(options);
424426
}

tensorstore/kvstore/gcs_grpc/gcs_grpc.cc

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,12 +1026,7 @@ Future<kvstore::DriverPtr> GcsGrpcKeyValueStoreSpec::DoOpen() const {
10261026
Result<kvstore::Spec> ParseGcsGrpcUrl(std::string_view url) {
10271027
auto parsed = internal::ParseGenericUri(url);
10281028
assert(parsed.scheme == kUriScheme);
1029-
if (!parsed.query.empty()) {
1030-
return absl::InvalidArgumentError("Query string not supported");
1031-
}
1032-
if (!parsed.fragment.empty()) {
1033-
return absl::InvalidArgumentError("Fragment identifier not supported");
1034-
}
1029+
TENSORSTORE_RETURN_IF_ERROR(internal::EnsureNoQueryOrFragment(parsed));
10351030
if (!IsValidBucketName(parsed.authority)) {
10361031
return absl::InvalidArgumentError(tensorstore::StrCat(
10371032
"Invalid GCS bucket name: ", QuoteString(parsed.authority)));

tensorstore/kvstore/gcs_http/gcs_key_value_store.cc

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,12 +1272,7 @@ Future<const void> GcsKeyValueStore::DeleteRange(KeyRange range) {
12721272
Result<kvstore::Spec> ParseGcsUrl(std::string_view url) {
12731273
auto parsed = internal::ParseGenericUri(url);
12741274
assert(parsed.scheme == kUriScheme);
1275-
if (!parsed.query.empty()) {
1276-
return absl::InvalidArgumentError("Query string not supported");
1277-
}
1278-
if (!parsed.fragment.empty()) {
1279-
return absl::InvalidArgumentError("Fragment identifier not supported");
1280-
}
1275+
TENSORSTORE_RETURN_IF_ERROR(internal::EnsureNoQueryOrFragment(parsed));
12811276
if (!IsValidBucketName(parsed.authority)) {
12821277
return absl::InvalidArgumentError(absl::StrCat(
12831278
"Invalid GCS bucket name: ", QuoteString(parsed.authority)));

tensorstore/kvstore/gcs_http/gcs_key_value_store_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ TEST(GcsKeyValueStoreTest, SpecRoundtrip) {
426426

427427
tensorstore::internal::KeyValueStoreSpecRoundtripOptions options;
428428
options.full_spec = {{"driver", kDriver}, {"bucket", "my-bucket"}};
429-
429+
options.url = "gs://my-bucket/";
430430
tensorstore::internal::TestKeyValueStoreSpecRoundtrip(options);
431431
}
432432

0 commit comments

Comments
 (0)