Skip to content

Commit

Permalink
Merge pull request #273 from ajkannan/gql-pagination
Browse files Browse the repository at this point in the history
Query continuation implemented for GqlQuery + tests
  • Loading branch information
aozarov committed Oct 22, 2015
2 parents 9e4692d + 6ffe927 commit b54c501
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ protected void populatePb(com.google.datastore.v1beta3.RunQueryRequest.Builder r
}

@Override
protected GqlQuery<V> nextQuery(com.google.datastore.v1beta3.QueryResultBatch responsePb) {
// See issue #17
throw new UnsupportedOperationException("paging for this query is not implemented yet");
protected Query<V> nextQuery(com.google.datastore.v1beta3.RunQueryResponse responsePb) {
return StructuredQuery.<V>fromPb(type(), namespace(), responsePb.getQuery())
.nextQuery(responsePb);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ protected abstract Object fromPb(ResultType<V> resultType, String namespace, byt
protected abstract void populatePb(
com.google.datastore.v1beta3.RunQueryRequest.Builder requestPb);

protected abstract Query<V> nextQuery(com.google.datastore.v1beta3.QueryResultBatch responsePb);
protected abstract Query<V> nextQuery(com.google.datastore.v1beta3.RunQueryResponse responsePb);

/**
* Returns a new {@link GqlQuery} builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class QueryResultsImpl<T> extends AbstractIterator<T> implements QueryResults<T>
private final ResultType<T> queryResultType;
private Query<T> query;
private ResultType<?> actualResultType;
private com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb;
private com.google.datastore.v1beta3.RunQueryResponse runQueryResponsePb;
private com.google.datastore.v1beta3.Query mostRecentQueryPb;
private boolean lastBatch;
private Iterator<com.google.datastore.v1beta3.EntityResult> entityResultPbIter;
Expand All @@ -56,8 +56,8 @@ class QueryResultsImpl<T> extends AbstractIterator<T> implements QueryResults<T>
}
partitionIdPb = pbBuilder.build();
sendRequest();
if (queryResultBatchPb.getSkippedResults() > 0) {
cursor = queryResultBatchPb.getSkippedCursor();
if (runQueryResponsePb.getBatch().getSkippedResults() > 0) {
cursor = runQueryResponsePb.getBatch().getSkippedCursor();
} else {
cursor = mostRecentQueryPb.getStartCursor();
}
Expand All @@ -71,16 +71,14 @@ private void sendRequest() {
}
requestPb.setPartitionId(partitionIdPb);
query.populatePb(requestPb);
com.google.datastore.v1beta3.RunQueryResponse runQueryResponsePb =
datastore.runQuery(requestPb.build());
queryResultBatchPb = runQueryResponsePb.getBatch();
runQueryResponsePb = datastore.runQuery(requestPb.build());
mostRecentQueryPb = runQueryResponsePb.getQuery();
if (mostRecentQueryPb == null) {
mostRecentQueryPb = requestPb.getQuery();
}
lastBatch = queryResultBatchPb.getMoreResults() != MoreResultsType.NOT_FINISHED;
entityResultPbIter = queryResultBatchPb.getEntityResultsList().iterator();
actualResultType = ResultType.fromPb(queryResultBatchPb.getEntityResultType());
lastBatch = runQueryResponsePb.getBatch().getMoreResults() != MoreResultsType.NOT_FINISHED;
entityResultPbIter = runQueryResponsePb.getBatch().getEntityResultsList().iterator();
actualResultType = ResultType.fromPb(runQueryResponsePb.getBatch().getEntityResultType());
if (Objects.equals(queryResultType, ResultType.PROJECTION_ENTITY)) {
// projection entity can represent all type of results
actualResultType = ResultType.PROJECTION_ENTITY;
Expand All @@ -92,7 +90,7 @@ private void sendRequest() {
@Override
protected T computeNext() {
while (!entityResultPbIter.hasNext() && !lastBatch) {
query = query.nextQuery(queryResultBatchPb);
query = query.nextQuery(runQueryResponsePb);
sendRequest();
}
if (!entityResultPbIter.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -844,16 +844,16 @@ protected void populatePb(com.google.datastore.v1beta3.RunQueryRequest.Builder r
}

@Override
protected StructuredQuery<V> nextQuery(com.google.datastore.v1beta3.QueryResultBatch responsePb) {
protected Query<V> nextQuery(com.google.datastore.v1beta3.RunQueryResponse responsePb) {
Builder<V> builder = new Builder<>(type());
builder.mergeFrom(toPb());
builder.startCursor(new Cursor(responsePb.getEndCursor()));
if (offset > 0 && responsePb.getSkippedResults() < offset) {
builder.offset(offset - responsePb.getSkippedResults());
builder.startCursor(new Cursor(responsePb.getBatch().getEndCursor()));
if (offset > 0 && responsePb.getBatch().getSkippedResults() < offset) {
builder.offset(offset - responsePb.getBatch().getSkippedResults());
} else {
builder.offset(0);
if (limit != null) {
builder.limit(limit - responsePb.getEntityResultsCount());
builder.limit(limit - responsePb.getBatch().getEntityResultsCount());
}
}
return builder.build();
Expand Down Expand Up @@ -904,7 +904,9 @@ protected Object fromPb(ResultType<V> resultType, String namespace, byte[] bytes
return fromPb(resultType, namespace, com.google.datastore.v1beta3.Query.parseFrom(bytesPb));
}

private static StructuredQuery<?> fromPb(ResultType<?> resultType, String namespace,
@SuppressWarnings("unchecked")
static <V> StructuredQuery<V> fromPb(
ResultType<?> resultType, String namespace,
com.google.datastore.v1beta3.Query queryPb) {
BaseBuilder<?, ?> builder;
if (resultType.equals(ResultType.ENTITY)) {
Expand All @@ -914,6 +916,6 @@ private static StructuredQuery<?> fromPb(ResultType<?> resultType, String namesp
} else {
builder = new ProjectionEntityQueryBuilder();
}
return builder.namespace(namespace).mergeFrom(queryPb).build();
return (StructuredQuery<V>) builder.namespace(namespace).mergeFrom(queryPb).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.gcloud.datastore.StructuredQuery.OrderBy;
import com.google.gcloud.datastore.StructuredQuery.PropertyFilter;
import com.google.gcloud.spi.DatastoreRpc;
import com.google.gcloud.spi.DatastoreRpc.DatastoreRpcException;
import com.google.gcloud.spi.DatastoreRpc.DatastoreRpcException.Reason;
import com.google.gcloud.spi.DatastoreRpcFactory;

Expand All @@ -43,6 +44,7 @@
import org.junit.runners.JUnit4;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -407,6 +409,38 @@ public void testRunGqlQueryWithCasting() {
assertFalse(results3.hasNext());
}

@Test
public void testGqlQueryPagination() throws DatastoreRpcException {
DatastoreRpcFactory rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class);
DatastoreRpc rpcMock = EasyMock.createStrictMock(DatastoreRpc.class);
EasyMock.expect(rpcFactoryMock.create(EasyMock.anyObject(DatastoreOptions.class)))
.andReturn(rpcMock);
List<com.google.datastore.v1beta3.RunQueryResponse> responses =
buildResponsesForQueryPagination();
for (int i = 0; i < responses.size(); i++) {
EasyMock
.expect(rpcMock.runQuery(
EasyMock.anyObject(com.google.datastore.v1beta3.RunQueryRequest.class)))
.andReturn(responses.get(i));
}
EasyMock.replay(rpcFactoryMock, rpcMock);
DatastoreOptions options =
this.options.toBuilder()
.retryParams(RetryParams.getDefaultInstance())
.serviceRpcFactory(rpcFactoryMock)
.build();
Datastore mockDatastore = DatastoreFactory.instance().get(options);
QueryResults<Key> results =
mockDatastore.run(Query.gqlQueryBuilder(ResultType.KEY, "select __key__ from *").build());
int count = 0;
while (results.hasNext()) {
count += 1;
results.next();
}
assertEquals(count, 5);
EasyMock.verify(rpcFactoryMock, rpcMock);
}

@Test
public void testRunStructuredQuery() {
Query<Entity> query =
Expand Down Expand Up @@ -449,7 +483,92 @@ public void testRunStructuredQuery() {
assertEquals(20, entity.getLong("age"));
assertEquals(1, entity.properties().size());
assertFalse(results4.hasNext());
// TODO(ozarov): construct a test to verify nextQuery/pagination
}

@Test
public void testStructuredQueryPagination() throws DatastoreRpcException {
DatastoreRpcFactory rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class);
DatastoreRpc rpcMock = EasyMock.createStrictMock(DatastoreRpc.class);
EasyMock.expect(rpcFactoryMock.create(EasyMock.anyObject(DatastoreOptions.class)))
.andReturn(rpcMock);
List<com.google.datastore.v1beta3.RunQueryResponse> responses =
buildResponsesForQueryPagination();
for (int i = 0; i < responses.size(); i++) {
EasyMock
.expect(rpcMock.runQuery(
EasyMock.anyObject(com.google.datastore.v1beta3.RunQueryRequest.class)))
.andReturn(responses.get(i));
}
EasyMock.replay(rpcFactoryMock, rpcMock);
DatastoreOptions options =
this.options.toBuilder()
.retryParams(RetryParams.getDefaultInstance())
.serviceRpcFactory(rpcFactoryMock)
.build();
Datastore mockDatastore = DatastoreFactory.instance().get(options);
QueryResults<Key> results = mockDatastore.run(Query.keyQueryBuilder().build());
int count = 0;
while (results.hasNext()) {
count += 1;
results.next();
}
assertEquals(count, 5);
EasyMock.verify(rpcFactoryMock, rpcMock);
}

private List<com.google.datastore.v1beta3.RunQueryResponse> buildResponsesForQueryPagination() {
Entity entity4 = Entity.builder(KEY4).set("value", StringValue.of("value")).build();
Entity entity5 = Entity.builder(KEY5).set("value", "value").build();
datastore.add(ENTITY3, entity4, entity5);
List<com.google.datastore.v1beta3.RunQueryResponse> responses = new ArrayList<>();
Query<Key> query = Query.keyQueryBuilder().build();
com.google.datastore.v1beta3.RunQueryRequest.Builder requestPb =
com.google.datastore.v1beta3.RunQueryRequest.newBuilder();
query.populatePb(requestPb);
com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb =
com.google.datastore.v1beta3.RunQueryResponse.newBuilder()
.mergeFrom(((DatastoreImpl) datastore).runQuery(requestPb.build()))
.getBatch();
com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb1 =
com.google.datastore.v1beta3.QueryResultBatch.newBuilder()
.mergeFrom(queryResultBatchPb)
.setMoreResults(
com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED)
.clearEntityResults()
.addAllEntityResults(queryResultBatchPb.getEntityResultsList().subList(0, 1))
.setEndCursor(queryResultBatchPb.getEntityResultsList().get(0).getCursor())
.build();
responses.add(
com.google.datastore.v1beta3.RunQueryResponse.newBuilder()
.setBatch(queryResultBatchPb1)
.build());
com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb2 =
com.google.datastore.v1beta3.QueryResultBatch.newBuilder()
.mergeFrom(queryResultBatchPb)
.setMoreResults(
com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED)
.clearEntityResults()
.addAllEntityResults(queryResultBatchPb.getEntityResultsList().subList(1, 3))
.setEndCursor(queryResultBatchPb.getEntityResultsList().get(2).getCursor())
.build();
responses.add(
com.google.datastore.v1beta3.RunQueryResponse.newBuilder()
.setBatch(queryResultBatchPb2)
.build());
com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb3 =
com.google.datastore.v1beta3.QueryResultBatch.newBuilder()
.mergeFrom(queryResultBatchPb)
.setMoreResults(
com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS)
.clearEntityResults()
.addAllEntityResults(queryResultBatchPb.getEntityResultsList().subList(3, 5))
.setEndCursor(queryResultBatchPb.getEntityResultsList().get(4).getCursor())
.build();
responses.add(
com.google.datastore.v1beta3.RunQueryResponse.newBuilder()
.setBatch(queryResultBatchPb3)
.build());
return responses;
}

@Test
Expand Down

0 comments on commit b54c501

Please sign in to comment.