Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multiple dbs support #1102

Merged
merged 10 commits into from
Jun 8, 2023
5 changes: 5 additions & 0 deletions .kokoro/nightly/integration.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ env_vars: {
value: "java-docs-samples-testing"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "java-docs-samples-testing"
jainsahab marked this conversation as resolved.
Show resolved Hide resolved
}

env_vars: {
key: "ENABLE_FLAKYBOT"
value: "true"
Expand Down
5 changes: 5 additions & 0 deletions .kokoro/nightly/java11-integration.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ env_vars: {
value: "gcloud-devel"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "gcloud-devel"
}

env_vars: {
key: "ENABLE_FLAKYBOT"
value: "true"
Expand Down
5 changes: 5 additions & 0 deletions .kokoro/presubmit/graalvm-native-17.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ env_vars: {
env_vars: {
key: "SECRET_MANAGER_KEYS"
value: "java-it-service-account"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "gcloud-devel"
}
5 changes: 5 additions & 0 deletions .kokoro/presubmit/graalvm-native.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ env_vars: {
key: "SECRET_MANAGER_KEYS"
value: "java-it-service-account"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "gcloud-devel"
}
5 changes: 5 additions & 0 deletions .kokoro/presubmit/integration.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ env_vars: {
value: "gcloud-devel"
}

env_vars: {
key: "DATASTORE_PROJECT_ID"
value: "gcloud-devel"
}

env_vars: {
key: "GOOGLE_APPLICATION_CREDENTIALS"
value: "secret_manager/java-it-service-account"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private DatastoreException invalidResponseException(String method, IOException e
}

public AllocateIdsResponse allocateIds(AllocateIdsRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("allocateIds", request)) {
try (InputStream is =
remoteRpc.call("allocateIds", request, request.getProjectId(), request.getDatabaseId())) {
return AllocateIdsResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("allocateIds", exception);
Expand All @@ -76,47 +77,54 @@ public AllocateIdsResponse allocateIds(AllocateIdsRequest request) throws Datast

public BeginTransactionResponse beginTransaction(BeginTransactionRequest request)
throws DatastoreException {
try (InputStream is = remoteRpc.call("beginTransaction", request)) {
try (InputStream is =
remoteRpc.call(
"beginTransaction", request, request.getProjectId(), request.getDatabaseId())) {
return BeginTransactionResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("beginTransaction", exception);
}
}

public CommitResponse commit(CommitRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("commit", request)) {
try (InputStream is =
remoteRpc.call("commit", request, request.getProjectId(), request.getDatabaseId())) {
return CommitResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("commit", exception);
}
}

public LookupResponse lookup(LookupRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("lookup", request)) {
try (InputStream is =
remoteRpc.call("lookup", request, request.getProjectId(), request.getDatabaseId())) {
return LookupResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("lookup", exception);
}
}

public ReserveIdsResponse reserveIds(ReserveIdsRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("reserveIds", request)) {
try (InputStream is =
remoteRpc.call("reserveIds", request, request.getProjectId(), request.getDatabaseId())) {
return ReserveIdsResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("reserveIds", exception);
}
}

public RollbackResponse rollback(RollbackRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("rollback", request)) {
try (InputStream is =
remoteRpc.call("rollback", request, request.getProjectId(), request.getDatabaseId())) {
return RollbackResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("rollback", exception);
}
}

public RunQueryResponse runQuery(RunQueryRequest request) throws DatastoreException {
try (InputStream is = remoteRpc.call("runQuery", request)) {
try (InputStream is =
remoteRpc.call("runQuery", request, request.getProjectId(), request.getDatabaseId())) {
return RunQueryResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("runQuery", exception);
Expand All @@ -125,7 +133,9 @@ public RunQueryResponse runQuery(RunQueryRequest request) throws DatastoreExcept

public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request)
throws DatastoreException {
try (InputStream is = remoteRpc.call("runAggregationQuery", request)) {
try (InputStream is =
remoteRpc.call(
"runAggregationQuery", request, request.getProjectId(), request.getDatabaseId())) {
return RunAggregationQueryResponse.parseFrom(is);
} catch (IOException exception) {
throw invalidResponseException("runAggregationQuery", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
*/
public class DatastoreOptions {
private final String projectId;
private final String databaseId;
private final String projectEndpoint;
private final String host;
private final String localHost;
Expand All @@ -56,6 +57,7 @@ public class DatastoreOptions {
b.projectId != null || b.projectEndpoint != null,
"Either project ID or project endpoint must be provided.");
this.projectId = b.projectId;
this.databaseId = b.databaseId;
this.projectEndpoint = b.projectEndpoint;
this.host = b.host;
this.localHost = b.localHost;
Expand All @@ -72,6 +74,7 @@ public static class Builder {
"Can set at most one of project endpoint, host, and local host.";

private String projectId;
private String databaseId;
private String projectEndpoint;
private String host;
private String localHost;
Expand All @@ -83,6 +86,7 @@ public Builder() {}

public Builder(DatastoreOptions options) {
this.projectId = options.projectId;
this.databaseId = options.databaseId;
this.projectEndpoint = options.projectEndpoint;
this.host = options.host;
this.localHost = options.localHost;
Expand All @@ -102,6 +106,12 @@ public Builder projectId(String projectId) {
return this;
}

/** Sets the database ID used to access Cloud Datastore. */
public Builder databaseId(String databaseId) {
this.databaseId = databaseId;
return this;
}

/**
* Sets the host used to access Cloud Datastore. To connect to the Cloud Datastore Emulator, use
* {@link #localHost} instead.
Expand Down Expand Up @@ -176,6 +186,10 @@ public String getProjectId() {
return projectId;
}

public String getDatabaseId() {
return databaseId;
}

public String getProjectEndpoint() {
return projectEndpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ private List<Key> getScatterKeys(
do {
RunQueryRequest.Builder scatterRequest =
RunQueryRequest.newBuilder().setPartitionId(partition).setQuery(scatterPointQuery);
scatterRequest.setProjectId(partition.getProjectId());
scatterRequest.setDatabaseId(partition.getDatabaseId());
if (readTime != null) {
scatterRequest.setReadOptions(ReadOptions.newBuilder().setReadTime(readTime).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.client.http.protobuf.ProtoHttpContent;
import com.google.api.client.util.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.MessageLite;
import com.google.rpc.Code;
import com.google.rpc.Status;
Expand All @@ -46,6 +47,8 @@ class RemoteRpc {
@VisibleForTesting static final String API_FORMAT_VERSION_HEADER = "X-Goog-Api-Format-Version";
private static final String API_FORMAT_VERSION = "2";

@VisibleForTesting static final String X_GOOG_REQUEST_PARAMS_HEADER = "x-goog-request-params";

private final HttpRequestFactory client;
private final HttpRequestInitializer initializer;
private final String url;
Expand Down Expand Up @@ -74,7 +77,9 @@ class RemoteRpc {
*
* @throws DatastoreException if the RPC fails.
*/
public InputStream call(String methodName, MessageLite request) throws DatastoreException {
public InputStream call(
String methodName, MessageLite request, String projectId, String databaseId)
throws DatastoreException {
logger.fine("remote datastore call " + methodName);

long startTime = System.currentTimeMillis();
Expand All @@ -84,7 +89,7 @@ public InputStream call(String methodName, MessageLite request) throws Datastore
rpcCount.incrementAndGet();
ProtoHttpContent payload = new ProtoHttpContent(request);
HttpRequest httpRequest = client.buildPostRequest(resolveURL(methodName), payload);
setHeaders(request, httpRequest);
setHeaders(request, httpRequest, projectId, databaseId);
// Don't throw an HTTPResponseException on error. It converts the response to a String and
// throws away the original, whereas we need the raw bytes to parse it as a proto.
httpRequest.setThrowExceptionOnExecuteError(false);
Expand Down Expand Up @@ -123,8 +128,16 @@ public InputStream call(String methodName, MessageLite request) throws Datastore
}

@VisibleForTesting
void setHeaders(MessageLite request, HttpRequest httpRequest) {
void setHeaders(
MessageLite request, HttpRequest httpRequest, String projectId, String databaseId) {
httpRequest.getHeaders().put(API_FORMAT_VERSION_HEADER, API_FORMAT_VERSION);
StringBuilder builder = new StringBuilder("project_id=");
builder.append(projectId);
if (!Strings.isNullOrEmpty(databaseId)) {
builder.append("&database_id=");
builder.append(databaseId);
}
httpRequest.getHeaders().put(X_GOOG_REQUEST_PARAMS_HEADER, builder.toString());
if (enableE2EChecksum && request != null) {
String checksum = EndToEndChecksumHandler.computeChecksum(request.toByteArray());
if (checksum != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ public void create_LocalHost() {
.isEqualTo("http://localhost:8080/v1/projects/project-id");
}

@Test
public void setDatabaseId() {
DatastoreOptions options =
new DatastoreOptions.Builder()
.projectId(PROJECT_ID)
.databaseId("test-db")
.localHost("localhost:8080")
.build();
assertThat(options.getProjectId()).isEqualTo(PROJECT_ID);
assertThat(options.getDatabaseId()).isEqualTo("test-db");
}

@Test
public void create_LocalHostIp() {
Datastore datastore =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,60 @@ public void getSplits() throws Exception {
RunQueryRequest expectedSplitQueryRequest =
RunQueryRequest.newBuilder()
.setPartitionId(PARTITION)
.setProjectId(PROJECT_ID)
.setQuery(
splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build()))
.build();

assertArrayEquals(expectedSplitQueryRequest.toByteArray(), mockClient.getLastBody());
}

@Test
public void getSplitsWithDatabaseId() throws Exception {
Datastore datastore = factory.create(options.build());
MockDatastoreFactory mockClient = (MockDatastoreFactory) factory;

PartitionId partition =
PartitionId.newBuilder().setProjectId(PROJECT_ID).setDatabaseId("test-database").build();

RunQueryResponse splitQueryResponse =
RunQueryResponse.newBuilder()
.setQuery(splitQuery)
.setBatch(
QueryResultBatch.newBuilder()
.setEntityResultType(ResultType.KEY_ONLY)
.setMoreResults(MoreResultsType.NO_MORE_RESULTS)
.addEntityResults(makeKeyOnlyEntity(splitKey0))
.addEntityResults(makeKeyOnlyEntity(splitKey1))
.addEntityResults(makeKeyOnlyEntity(splitKey2))
.addEntityResults(makeKeyOnlyEntity(splitKey3))
.build())
.build();

mockClient.setNextResponse(splitQueryResponse);

List<Query> splitQueries = QuerySplitterImpl.INSTANCE.getSplits(query, partition, 3, datastore);

assertThat(splitQueries)
.containsExactly(
query
.toBuilder()
.setFilter(makeFilterWithKeyRange(propertyFilter, null, splitKey1))
.build(),
query
.toBuilder()
.setFilter(makeFilterWithKeyRange(propertyFilter, splitKey1, splitKey3))
.build(),
query
.toBuilder()
.setFilter(makeFilterWithKeyRange(propertyFilter, splitKey3, null))
.build());

RunQueryRequest expectedSplitQueryRequest =
RunQueryRequest.newBuilder()
.setPartitionId(partition)
.setProjectId(PROJECT_ID)
.setDatabaseId("test-database")
.setQuery(
splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build()))
.build();
Expand Down Expand Up @@ -235,6 +289,7 @@ public void notEnoughSplits() throws Exception {
RunQueryRequest expectedSplitQueryRequest =
RunQueryRequest.newBuilder()
.setPartitionId(PARTITION)
.setProjectId(PROJECT_ID)
.setQuery(
splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(99 * 32).build()))
.build();
Expand Down Expand Up @@ -286,6 +341,7 @@ public void getSplits_withReadTime() throws Exception {
RunQueryRequest expectedSplitQueryRequest =
RunQueryRequest.newBuilder()
.setPartitionId(PARTITION)
.setProjectId(PROJECT_ID)
.setQuery(
splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build()))
.setReadOptions(ReadOptions.newBuilder().setReadTime(readTime))
Expand Down
Loading