From 61758e02c30c8410dd397d0cc77c987332c4f11c Mon Sep 17 00:00:00 2001 From: yixiaoshen Date: Tue, 21 Jun 2022 07:02:12 -0700 Subject: [PATCH] feat: support readTime in Datastore query splitter. (#763) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-datastore/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # ☕️ If you write sample code, please follow the [samples format]( https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md). --- .../clirr-ignored-differences.xml | 9 + datastore-v1-proto-client/pom.xml | 5 + .../datastore/v1/client/QuerySplitter.java | 14 + .../v1/client/QuerySplitterImpl.java | 42 ++- .../v1/client/testing/MockCredential.java | 36 ++ .../client/testing/MockDatastoreFactory.java | 137 ++++++++ .../v1/client/DatastoreClientTest.java | 121 +------ .../v1/client/QuerySplitterTest.java | 326 ++++++++++++++++++ 8 files changed, 568 insertions(+), 122 deletions(-) create mode 100644 datastore-v1-proto-client/clirr-ignored-differences.xml create mode 100644 datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/testing/MockCredential.java create mode 100644 datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/testing/MockDatastoreFactory.java create mode 100644 datastore-v1-proto-client/src/test/java/com/google/datastore/v1/client/QuerySplitterTest.java diff --git a/datastore-v1-proto-client/clirr-ignored-differences.xml b/datastore-v1-proto-client/clirr-ignored-differences.xml new file mode 100644 index 000000000..e8c0b27f4 --- /dev/null +++ b/datastore-v1-proto-client/clirr-ignored-differences.xml @@ -0,0 +1,9 @@ + + + + + com/google/datastore/v1/client/QuerySplitter + java.util.List getSplits(com.google.datastore.v1.Query, com.google.datastore.v1.PartitionId, int, com.google.datastore.v1.client.Datastore, com.google.protobuf.Timestamp) + 7012 + + diff --git a/datastore-v1-proto-client/pom.xml b/datastore-v1-proto-client/pom.xml index cead29652..f1bc4067e 100644 --- a/datastore-v1-proto-client/pom.xml +++ b/datastore-v1-proto-client/pom.xml @@ -83,6 +83,11 @@ protobuf-java + + com.google.api + api-common + + junit diff --git a/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitter.java b/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitter.java index 5286f7842..97268d38d 100644 --- a/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitter.java +++ b/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitter.java @@ -15,8 +15,10 @@ */ package com.google.datastore.v1.client; +import com.google.api.core.BetaApi; import com.google.datastore.v1.PartitionId; import com.google.datastore.v1.Query; +import com.google.protobuf.Timestamp; import java.util.List; /** Provides the ability to split a query into multiple shards. */ @@ -39,4 +41,16 @@ public interface QuerySplitter { */ List getSplits(Query query, PartitionId partition, int numSplits, Datastore datastore) throws DatastoreException; + + /** + * Same as {@link #getSplits(Query, PartitionId, int, Datastore)} but the splits are based on + * {@code readTime}, and the returned sharded {@link Query}s should also be executed with {@code + * readTime}. Reading from a timestamp is currently a private preview feature in Datastore. + */ + @BetaApi + default List getSplits( + Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime) + throws DatastoreException { + throw new UnsupportedOperationException("Not implemented."); + } } diff --git a/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitterImpl.java b/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitterImpl.java index 92fb38418..6143bdd59 100644 --- a/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitterImpl.java +++ b/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/QuerySplitterImpl.java @@ -17,6 +17,7 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter; +import com.google.api.core.BetaApi; import com.google.datastore.v1.EntityResult; import com.google.datastore.v1.Filter; import com.google.datastore.v1.Key; @@ -29,11 +30,14 @@ import com.google.datastore.v1.Query; import com.google.datastore.v1.QueryResultBatch; import com.google.datastore.v1.QueryResultBatch.MoreResultsType; +import com.google.datastore.v1.ReadOptions; import com.google.datastore.v1.RunQueryRequest; +import com.google.protobuf.Timestamp; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import javax.annotation.Nullable; /** * Provides the ability to split a query into multiple shards using Cloud Datastore. @@ -63,7 +67,24 @@ private QuerySplitterImpl() { public List getSplits( Query query, PartitionId partition, int numSplits, Datastore datastore) throws DatastoreException, IllegalArgumentException { + return getSplitsInternal(query, partition, numSplits, datastore, null); + } + @BetaApi + @Override + public List getSplits( + Query query, PartitionId partition, int numSplits, Datastore datastore, Timestamp readTime) + throws DatastoreException, IllegalArgumentException { + return getSplitsInternal(query, partition, numSplits, datastore, readTime); + } + + private List getSplitsInternal( + Query query, + PartitionId partition, + int numSplits, + Datastore datastore, + @Nullable Timestamp readTime) + throws DatastoreException, IllegalArgumentException { List splits = new ArrayList(numSplits); if (numSplits == 1) { splits.add(query); @@ -72,7 +93,7 @@ public List getSplits( validateQuery(query); validateSplitSize(numSplits); - List scatterKeys = getScatterKeys(numSplits, query, partition, datastore); + List scatterKeys = getScatterKeys(numSplits, query, partition, datastore, readTime); Key lastKey = null; for (Key nextKey : getSplitKey(scatterKeys, numSplits)) { splits.add(createSplit(lastKey, nextKey, query)); @@ -182,10 +203,15 @@ private Query createSplit(Key lastKey, Key nextKey, Query query) { * @param query the user query. * @param partition the partition to run the query in. * @param datastore the datastore containing the data. + * @param readTime read time at which to get the split keys from the datastore. * @throws DatastoreException if there was an error when executing the datastore query. */ private List getScatterKeys( - int numSplits, Query query, PartitionId partition, Datastore datastore) + int numSplits, + Query query, + PartitionId partition, + Datastore datastore, + @Nullable Timestamp readTime) throws DatastoreException { Query.Builder scatterPointQuery = createScatterQuery(query, numSplits); @@ -193,12 +219,12 @@ private List getScatterKeys( QueryResultBatch batch; do { - RunQueryRequest scatterRequest = - RunQueryRequest.newBuilder() - .setPartitionId(partition) - .setQuery(scatterPointQuery) - .build(); - batch = datastore.runQuery(scatterRequest).getBatch(); + RunQueryRequest.Builder scatterRequest = + RunQueryRequest.newBuilder().setPartitionId(partition).setQuery(scatterPointQuery); + if (readTime != null) { + scatterRequest.setReadOptions(ReadOptions.newBuilder().setReadTime(readTime).build()); + } + batch = datastore.runQuery(scatterRequest.build()).getBatch(); for (EntityResult result : batch.getEntityResultsList()) { keySplits.add(result.getEntity().getKey()); } diff --git a/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/testing/MockCredential.java b/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/testing/MockCredential.java new file mode 100644 index 000000000..7579f58b3 --- /dev/null +++ b/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/testing/MockCredential.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.v1.client.testing; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequest; +import java.io.IOException; + +/** Fake credential used for testing purpose. */ +public class MockCredential extends Credential { + public MockCredential() { + super( + new AccessMethod() { + @Override + public void intercept(HttpRequest request, String accessToken) throws IOException {} + + @Override + public String getAccessTokenFromRequest(HttpRequest request) { + return "MockAccessToken"; + } + }); + } +} diff --git a/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/testing/MockDatastoreFactory.java b/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/testing/MockDatastoreFactory.java new file mode 100644 index 000000000..6942a5d79 --- /dev/null +++ b/datastore-v1-proto-client/src/main/java/com/google/datastore/v1/client/testing/MockDatastoreFactory.java @@ -0,0 +1,137 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.v1.client.testing; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import com.google.api.client.testing.util.TestableByteArrayInputStream; +import com.google.common.collect.Iterables; +import com.google.datastore.v1.client.DatastoreFactory; +import com.google.datastore.v1.client.DatastoreOptions; +import com.google.protobuf.Message; +import com.google.rpc.Code; +import com.google.rpc.Status; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; + +/** Fake Datastore factory used for testing purposes when a true Datastore service is not needed. */ +public class MockDatastoreFactory extends DatastoreFactory { + private int nextStatus; + private Message nextResponse; + private Status nextError; + private IOException nextException; + + private String lastPath; + private String lastMimeType; + private byte[] lastBody; + private List lastCookies; + private String lastApiFormatHeaderValue; + + public void setNextResponse(Message response) { + nextStatus = HttpStatusCodes.STATUS_CODE_OK; + nextResponse = response; + nextError = null; + nextException = null; + } + + public void setNextError(int status, Code code, String message) { + nextStatus = status; + nextResponse = null; + nextError = makeErrorContent(message, code); + nextException = null; + } + + public void setNextException(IOException exception) { + nextStatus = 0; + nextResponse = null; + nextError = null; + nextException = exception; + } + + @Override + public HttpRequestFactory makeClient(DatastoreOptions options) { + HttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest(url) { + @Override + public LowLevelHttpResponse execute() throws IOException { + lastPath = new GenericUrl(getUrl()).getRawPath(); + lastMimeType = getContentType(); + lastCookies = getHeaderValues("Cookie"); + lastApiFormatHeaderValue = + Iterables.getOnlyElement(getHeaderValues("X-Goog-Api-Format-Version")); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + getStreamingContent().writeTo(out); + lastBody = out.toByteArray(); + if (nextException != null) { + throw nextException; + } + MockLowLevelHttpResponse response = + new MockLowLevelHttpResponse() + .setStatusCode(nextStatus) + .setContentType("application/x-protobuf"); + if (nextError != null) { + checkState(nextResponse == null); + response.setContent(new TestableByteArrayInputStream(nextError.toByteArray())); + } else { + response.setContent(new TestableByteArrayInputStream(nextResponse.toByteArray())); + } + return response; + } + }; + } + }; + Credential credential = options.getCredential(); + return transport.createRequestFactory(credential); + } + + public String getLastPath() { + return lastPath; + } + + public String getLastMimeType() { + return lastMimeType; + } + + public String getLastApiFormatHeaderValue() { + return lastApiFormatHeaderValue; + } + + public byte[] getLastBody() { + return lastBody; + } + + public List getLastCookies() { + return lastCookies; + } + + private static Status makeErrorContent(String message, Code code) { + return Status.newBuilder().setCode(code.getNumber()).setMessage(message).build(); + } +} diff --git a/datastore-v1-proto-client/src/test/java/com/google/datastore/v1/client/DatastoreClientTest.java b/datastore-v1-proto-client/src/test/java/com/google/datastore/v1/client/DatastoreClientTest.java index d8376dc29..2ab2c89f8 100644 --- a/datastore-v1-proto-client/src/test/java/com/google/datastore/v1/client/DatastoreClientTest.java +++ b/datastore-v1-proto-client/src/test/java/com/google/datastore/v1/client/DatastoreClientTest.java @@ -18,25 +18,12 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.http.GenericUrl; import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpRequestFactory; import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.HttpStatusCodes; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.LowLevelHttpRequest; -import com.google.api.client.http.LowLevelHttpResponse; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import com.google.api.client.testing.util.TestableByteArrayInputStream; -import com.google.common.collect.Iterables; import com.google.datastore.v1.AllocateIdsRequest; import com.google.datastore.v1.AllocateIdsResponse; import com.google.datastore.v1.BeginTransactionRequest; @@ -53,16 +40,15 @@ import com.google.datastore.v1.RollbackResponse; import com.google.datastore.v1.RunQueryRequest; import com.google.datastore.v1.RunQueryResponse; +import com.google.datastore.v1.client.testing.MockCredential; +import com.google.datastore.v1.client.testing.MockDatastoreFactory; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.google.rpc.Code; -import com.google.rpc.Status; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.SocketTimeoutException; -import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -290,7 +276,7 @@ public void initialize(HttpRequest request) { AllocateIdsResponse response = AllocateIdsResponse.newBuilder().build(); mockClient.setNextResponse(response); assertEquals(response, datastore.allocateIds(request)); - assertEquals("magic", mockClient.lastCookies.get(0)); + assertEquals("magic", mockClient.getLastCookies().get(0)); } @Test @@ -361,10 +347,10 @@ private void expectRpc(String methodName, Message request, Message response) thr Object[] callArgs = {request}; assertEquals(response, call.invoke(datastore, callArgs)); - assertEquals("/v1/projects/project-id:" + methodName, mockClient.lastPath); - assertEquals("application/x-protobuf", mockClient.lastMimeType); - assertEquals("2", mockClient.lastApiFormatHeaderValue); - assertArrayEquals(request.toByteArray(), mockClient.lastBody); + assertEquals("/v1/projects/project-id:" + methodName, mockClient.getLastPath()); + assertEquals("application/x-protobuf", mockClient.getLastMimeType()); + assertEquals("2", mockClient.getLastApiFormatHeaderValue()); + assertArrayEquals(request.toByteArray(), mockClient.getLastBody()); assertEquals(1, datastore.getRpcCount()); datastore.resetRpcCount(); @@ -409,97 +395,4 @@ private void expectRpc(String methodName, Message request, Message response) thr assertEquals(3, datastore.getRpcCount()); } - - private static class MockCredential extends Credential { - MockCredential() { - super( - new AccessMethod() { - @Override - public void intercept(HttpRequest request, String accessToken) throws IOException {} - - @Override - public String getAccessTokenFromRequest(HttpRequest request) { - return "MockAccessToken"; - } - }); - } - } - - private static class MockDatastoreFactory extends DatastoreFactory { - int nextStatus; - Message nextResponse; - Status nextError; - IOException nextException; - - String lastPath; - String lastMimeType; - byte[] lastBody; - List lastCookies; - String lastApiFormatHeaderValue; - - void setNextResponse(Message response) { - nextStatus = HttpStatusCodes.STATUS_CODE_OK; - nextResponse = response; - nextError = null; - nextException = null; - } - - void setNextError(int status, Code code, String message) { - nextStatus = status; - nextResponse = null; - nextError = makeErrorContent(message, code); - nextException = null; - } - - void setNextException(IOException exception) { - nextStatus = 0; - nextResponse = null; - nextError = null; - nextException = exception; - } - - @Override - public HttpRequestFactory makeClient(DatastoreOptions options) { - HttpTransport transport = - new MockHttpTransport() { - @Override - public LowLevelHttpRequest buildRequest(String method, String url) { - return new MockLowLevelHttpRequest(url) { - @Override - public LowLevelHttpResponse execute() throws IOException { - lastPath = new GenericUrl(getUrl()).getRawPath(); - lastMimeType = getContentType(); - lastCookies = getHeaderValues("Cookie"); - lastApiFormatHeaderValue = - Iterables.getOnlyElement(getHeaderValues("X-Goog-Api-Format-Version")); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - getStreamingContent().writeTo(out); - lastBody = out.toByteArray(); - if (nextException != null) { - throw nextException; - } - MockLowLevelHttpResponse response = - new MockLowLevelHttpResponse() - .setStatusCode(nextStatus) - .setContentType("application/x-protobuf"); - if (nextError != null) { - assertNull(nextResponse); - response.setContent(new TestableByteArrayInputStream(nextError.toByteArray())); - } else { - response.setContent( - new TestableByteArrayInputStream(nextResponse.toByteArray())); - } - return response; - } - }; - } - }; - Credential credential = options.getCredential(); - return transport.createRequestFactory(credential); - } - } - - private static Status makeErrorContent(String message, Code code) { - return Status.newBuilder().setCode(code.getNumber()).setMessage(message).build(); - } } diff --git a/datastore-v1-proto-client/src/test/java/com/google/datastore/v1/client/QuerySplitterTest.java b/datastore-v1-proto-client/src/test/java/com/google/datastore/v1/client/QuerySplitterTest.java new file mode 100644 index 000000000..e86943724 --- /dev/null +++ b/datastore-v1-proto-client/src/test/java/com/google/datastore/v1/client/QuerySplitterTest.java @@ -0,0 +1,326 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.datastore.v1.client; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter; +import static com.google.datastore.v1.client.DatastoreHelper.makeFilter; +import static com.google.datastore.v1.client.DatastoreHelper.makeKey; +import static com.google.datastore.v1.client.DatastoreHelper.makeOrder; +import static com.google.datastore.v1.client.DatastoreHelper.makePropertyReference; +import static com.google.datastore.v1.client.DatastoreHelper.makeValue; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThrows; + +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.EntityResult; +import com.google.datastore.v1.EntityResult.ResultType; +import com.google.datastore.v1.Filter; +import com.google.datastore.v1.Key; +import com.google.datastore.v1.KindExpression; +import com.google.datastore.v1.PartitionId; +import com.google.datastore.v1.Projection; +import com.google.datastore.v1.PropertyFilter.Operator; +import com.google.datastore.v1.PropertyOrder.Direction; +import com.google.datastore.v1.Query; +import com.google.datastore.v1.QueryResultBatch; +import com.google.datastore.v1.QueryResultBatch.MoreResultsType; +import com.google.datastore.v1.ReadOptions; +import com.google.datastore.v1.RunQueryRequest; +import com.google.datastore.v1.RunQueryResponse; +import com.google.datastore.v1.client.testing.MockCredential; +import com.google.datastore.v1.client.testing.MockDatastoreFactory; +import com.google.protobuf.Int32Value; +import com.google.protobuf.Timestamp; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link QuerySplitterImpl}. */ +@RunWith(JUnit4.class) +public class QuerySplitterTest { + private static final String PROJECT_ID = "project-id"; + private static final PartitionId PARTITION = + PartitionId.newBuilder().setProjectId(PROJECT_ID).build(); + private static final String KIND = "test-kind"; + + private DatastoreFactory factory = new MockDatastoreFactory(); + private DatastoreOptions.Builder options = + new DatastoreOptions.Builder().projectId(PROJECT_ID).credential(new MockCredential()); + + private Filter propertyFilter = makeFilter("foo", Operator.EQUAL, makeValue("value")).build(); + + private Query query = + Query.newBuilder() + .addKind(KindExpression.newBuilder().setName(KIND).build()) + .setFilter(propertyFilter) + .build(); + + private Query splitQuery = + Query.newBuilder() + .addKind(KindExpression.newBuilder().setName(KIND).build()) + .addOrder(makeOrder("__scatter__", Direction.ASCENDING)) + .addProjection(Projection.newBuilder().setProperty(makePropertyReference("__key__"))) + .build(); + + private Key splitKey0 = makeKey(KIND, String.format("%05d", 1)).setPartitionId(PARTITION).build(); + private Key splitKey1 = + makeKey(KIND, String.format("%05d", 101)).setPartitionId(PARTITION).build(); + private Key splitKey2 = + makeKey(KIND, String.format("%05d", 201)).setPartitionId(PARTITION).build(); + private Key splitKey3 = + makeKey(KIND, String.format("%05d", 301)).setPartitionId(PARTITION).build(); + + @Test + public void disallowsSortOrder() { + Datastore datastore = factory.create(options.build()); + Query queryWithOrder = + query.toBuilder().addOrder(makeOrder("bar", Direction.ASCENDING)).build(); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> QuerySplitterImpl.INSTANCE.getSplits(queryWithOrder, PARTITION, 2, datastore)); + assertThat(exception).hasMessageThat().contains("Query cannot have any sort orders."); + } + + @Test + public void disallowsMultipleKinds() { + Datastore datastore = factory.create(options.build()); + Query queryWithMultipleKinds = + query + .toBuilder() + .addKind(KindExpression.newBuilder().setName("another-kind").build()) + .build(); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + QuerySplitterImpl.INSTANCE.getSplits( + queryWithMultipleKinds, PARTITION, 2, datastore)); + assertThat(exception).hasMessageThat().contains("Query must have exactly one kind."); + } + + @Test + public void disallowsKindlessQuery() { + Datastore datastore = factory.create(options.build()); + Query kindlessQuery = query.toBuilder().clearKind().build(); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> QuerySplitterImpl.INSTANCE.getSplits(kindlessQuery, PARTITION, 2, datastore)); + assertThat(exception).hasMessageThat().contains("Query must have exactly one kind."); + } + + @Test + public void disallowsInequalityFilter() { + Datastore datastore = factory.create(options.build()); + Query queryWithInequality = + query + .toBuilder() + .setFilter(makeFilter("foo", Operator.GREATER_THAN, makeValue("value"))) + .build(); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + QuerySplitterImpl.INSTANCE.getSplits(queryWithInequality, PARTITION, 2, datastore)); + assertThat(exception).hasMessageThat().contains("Query cannot have any inequality filters."); + } + + @Test + public void splitsMustBePositive() { + Datastore datastore = factory.create(options.build()); + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> QuerySplitterImpl.INSTANCE.getSplits(query, PARTITION, 0, datastore)); + assertThat(exception).hasMessageThat().contains("The number of splits must be greater than 0."); + } + + @Test + public void getSplits() throws Exception { + Datastore datastore = factory.create(options.build()); + MockDatastoreFactory mockClient = (MockDatastoreFactory) factory; + + RunQueryResponse splitQueryResponse = + RunQueryResponse.newBuilder() + .setQuery(splitQuery) + .setBatch( + QueryResultBatch.newBuilder() + .setEntityResultType(ResultType.KEY_ONLY) + .setMoreResults(MoreResultsType.NO_MORE_RESULTS) + .addEntityResults(makeKeyOnlyEntity(splitKey0)) + .addEntityResults(makeKeyOnlyEntity(splitKey1)) + .addEntityResults(makeKeyOnlyEntity(splitKey2)) + .addEntityResults(makeKeyOnlyEntity(splitKey3)) + .build()) + .build(); + + mockClient.setNextResponse(splitQueryResponse); + + List splittedQueries = + QuerySplitterImpl.INSTANCE.getSplits(query, PARTITION, 3, datastore); + + assertThat(splittedQueries) + .containsExactly( + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, null, splitKey1)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey1, splitKey3)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey3, null)) + .build()); + + RunQueryRequest expectedSplitQueryRequest = + RunQueryRequest.newBuilder() + .setPartitionId(PARTITION) + .setQuery( + splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build())) + .build(); + + assertArrayEquals(expectedSplitQueryRequest.toByteArray(), mockClient.getLastBody()); + } + + @Test + public void notEnoughSplits() throws Exception { + Datastore datastore = factory.create(options.build()); + MockDatastoreFactory mockClient = (MockDatastoreFactory) factory; + + RunQueryResponse splitQueryResponse = + RunQueryResponse.newBuilder() + .setQuery(splitQuery) + .setBatch( + QueryResultBatch.newBuilder() + .setEntityResultType(ResultType.KEY_ONLY) + .setMoreResults(MoreResultsType.NO_MORE_RESULTS) + .addEntityResults(makeKeyOnlyEntity(splitKey0)) + .build()) + .build(); + + mockClient.setNextResponse(splitQueryResponse); + + List splittedQueries = + QuerySplitterImpl.INSTANCE.getSplits(query, PARTITION, 100, datastore); + + assertThat(splittedQueries) + .containsExactly( + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, null, splitKey0)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey0, null)) + .build()); + + RunQueryRequest expectedSplitQueryRequest = + RunQueryRequest.newBuilder() + .setPartitionId(PARTITION) + .setQuery( + splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(99 * 32).build())) + .build(); + + assertArrayEquals(expectedSplitQueryRequest.toByteArray(), mockClient.getLastBody()); + } + + @Test + public void getSplits_withReadTime() throws Exception { + Datastore datastore = factory.create(options.build()); + MockDatastoreFactory mockClient = (MockDatastoreFactory) factory; + + RunQueryResponse splitQueryResponse = + RunQueryResponse.newBuilder() + .setQuery(splitQuery) + .setBatch( + QueryResultBatch.newBuilder() + .setEntityResultType(ResultType.KEY_ONLY) + .setMoreResults(MoreResultsType.NO_MORE_RESULTS) + .addEntityResults(makeKeyOnlyEntity(splitKey0)) + .addEntityResults(makeKeyOnlyEntity(splitKey1)) + .addEntityResults(makeKeyOnlyEntity(splitKey2)) + .addEntityResults(makeKeyOnlyEntity(splitKey3)) + .build()) + .build(); + + mockClient.setNextResponse(splitQueryResponse); + + Timestamp readTime = Timestamp.newBuilder().setSeconds(1654651341L).build(); + + List splittedQueries = + QuerySplitterImpl.INSTANCE.getSplits(query, PARTITION, 3, datastore, readTime); + + assertThat(splittedQueries) + .containsExactly( + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, null, splitKey1)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey1, splitKey3)) + .build(), + query + .toBuilder() + .setFilter(makeFilterWithKeyRange(propertyFilter, splitKey3, null)) + .build()); + + RunQueryRequest expectedSplitQueryRequest = + RunQueryRequest.newBuilder() + .setPartitionId(PARTITION) + .setQuery( + splitQuery.toBuilder().setLimit(Int32Value.newBuilder().setValue(2 * 32).build())) + .setReadOptions(ReadOptions.newBuilder().setReadTime(readTime)) + .build(); + + assertArrayEquals(expectedSplitQueryRequest.toByteArray(), mockClient.getLastBody()); + } + + private static EntityResult makeKeyOnlyEntity(Key key) { + return EntityResult.newBuilder().setEntity(Entity.newBuilder().setKey(key).build()).build(); + } + + private static Filter makeFilterWithKeyRange(Filter originalFilter, Key startKey, Key endKey) { + Filter startKeyFilter = + startKey == null + ? null + : makeFilter("__key__", Operator.GREATER_THAN_OR_EQUAL, makeValue(startKey)).build(); + + Filter endKeyFilter = + endKey == null + ? null + : makeFilter("__key__", Operator.LESS_THAN, makeValue(endKey)).build(); + + if (startKeyFilter == null && endKeyFilter == null) { + throw new IllegalArgumentException(); + } + + if (startKeyFilter != null && endKeyFilter == null) { + return makeAndFilter(originalFilter, startKeyFilter).build(); + } + + if (startKeyFilter == null && endKeyFilter != null) { + return makeAndFilter(originalFilter, endKeyFilter).build(); + } + + return makeAndFilter(originalFilter, startKeyFilter, endKeyFilter).build(); + } +}