Skip to content

Commit

Permalink
BigQuery: Update resumption strategy to use format-independent row co…
Browse files Browse the repository at this point in the history
…unt. (#5658)

* Update resumption strategy to use format-independent row count.

This change modifies the ReadRowsResumptionStrategy helper class in the BigQuery
storage client to use the new format-independent row count value in the
ReadRowsResponse message in order to track stream position. It also modifies
various test files to use the new row count value.

* Fix checkstyle errors
  • Loading branch information
kmjung authored and chingor13 committed Jul 10, 2019
1 parent 3288c73 commit 84c92f9
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.gax.retrying.StreamResumptionStrategy;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
import javax.annotation.Nonnull;

/**
* An implementation of a {@link StreamResumptionStrategy} for the ReadRows API. This class tracks
Expand All @@ -36,13 +37,15 @@ public class ReadRowsResumptionStrategy
private long rowsProcessed = 0;

@Override
@Nonnull
public StreamResumptionStrategy<ReadRowsRequest, ReadRowsResponse> createNew() {
return new ReadRowsResumptionStrategy();
}

@Override
@Nonnull
public ReadRowsResponse processResponse(ReadRowsResponse response) {
rowsProcessed += response.getAvroRows().getRowCount();
rowsProcessed += response.getRowCount();
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.bigquery.storage.v1beta1.it;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceOptions;
Expand Down Expand Up @@ -121,12 +120,12 @@ public Long call() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
List<Future<Long>> results = executor.invokeAll(tasks);

long avroRowCount = 0;
long rowCount = 0;
for (Future<Long> result : results) {
avroRowCount += result.get();
rowCount += result.get();
}

assertEquals(313_797_035, avroRowCount);
assertEquals(313_797_035, rowCount);
}

private long readAllRowsFromStream(Stream stream) {
Expand All @@ -135,19 +134,13 @@ private long readAllRowsFromStream(Stream stream) {
ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

long avroRowCount = 0;
long rowCount = 0;
ServerStream<ReadRowsResponse> serverStream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : serverStream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far from stream '%s'. ReadRows response:%n%s",
avroRowCount, stream.getName(), response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();
rowCount += response.getRowCount();
}

LOG.info(
String.format("Read total of %d rows from stream '%s'.", avroRowCount, stream.getName()));
return avroRowCount;
LOG.info(String.format("Read total of %d rows from stream '%s'.", rowCount, stream.getName()));
return rowCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.RetryOption;
Expand Down Expand Up @@ -149,18 +148,13 @@ public void testSimpleRead() {
ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

long avroRowCount = 0;
long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s",
avroRowCount, response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();
rowCount += response.getRowCount();
}

assertEquals(164_656, avroRowCount);
assertEquals(164_656, rowCount);
}

@Test
Expand All @@ -187,31 +181,23 @@ public void testSimpleReadAndResume() {
// We have to read some number of rows in order to be able to resume. More details:
// https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#google.cloud.bigquery.storage.v1beta1.ReadRowsRequest

long avroRowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846);
long rowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846);

StreamPosition readPosition =
StreamPosition.newBuilder()
.setStream(session.getStreams(0))
.setOffset(avroRowCount)
.build();
StreamPosition.newBuilder().setStream(session.getStreams(0)).setOffset(rowCount).build();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);

for (ReadRowsResponse response : stream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s",
avroRowCount, response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();
rowCount += response.getRowCount();
}

// Verifies that the number of rows skipped and read equals to the total number of rows in the
// table.
assertEquals(164_656, avroRowCount);
assertEquals(164_656, rowCount);
}

@Test
Expand Down Expand Up @@ -252,17 +238,11 @@ public void testFilter() throws IOException {
SimpleRowReader reader =
new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema()));

long avroRowCount = 0;
long rowCount = 0;

ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s",
avroRowCount, response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();

rowCount += response.getRowCount();
reader.processRows(
response.getAvroRows(),
new SimpleRowReader.AvroRowConsumer() {
Expand All @@ -276,7 +256,7 @@ public void accept(GenericData.Record record) {
});
}

assertEquals(1_333, avroRowCount);
assertEquals(1_333, rowCount);
}

@Test
Expand Down Expand Up @@ -336,15 +316,10 @@ public void testColumnSelection() throws IOException {

SimpleRowReader reader = new SimpleRowReader(avroSchema);

long avroRowCount = 0;
long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s",
avroRowCount, response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();
rowCount += response.getRowCount();
reader.processRows(
response.getAvroRows(),
new SimpleRowReader.AvroRowConsumer() {
Expand All @@ -362,7 +337,7 @@ public void accept(GenericData.Record record) {
});
}

assertEquals(1_333, avroRowCount);
assertEquals(1_333, rowCount);
}

@Test
Expand Down Expand Up @@ -864,19 +839,19 @@ private long ReadStreamToOffset(Stream stream, long rowOffset) {
ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

long avroRowCount = 0;
long rowCount = 0;
ServerStream<ReadRowsResponse> serverStream = client.readRowsCallable().call(readRowsRequest);
Iterator<ReadRowsResponse> responseIterator = serverStream.iterator();

while (responseIterator.hasNext()) {
ReadRowsResponse response = responseIterator.next();
avroRowCount += response.getAvroRows().getRowCount();
if (avroRowCount >= rowOffset) {
rowCount += response.getRowCount();
if (rowCount >= rowOffset) {
return rowOffset;
}
}

return avroRowCount;
return rowCount;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1beta1.AvroProto.AvroRows;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc.BigQueryStorageImplBase;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
Expand Down Expand Up @@ -167,7 +166,7 @@ private int getRowCount(ReadRowsRequest request) {
ServerStream<ReadRowsResponse> serverStream = client.readRowsCallable().call(request);
int rowCount = 0;
for (ReadRowsResponse readRowsResponse : serverStream) {
rowCount += readRowsResponse.getAvroRows().getRowCount();
rowCount += readRowsResponse.getRowCount();
}
return rowCount;
}
Expand Down Expand Up @@ -232,9 +231,7 @@ static ReadRowsRequest createRequest(String streamName, long offset) {
}

static ReadRowsResponse createResponse(int numberOfRows) {
return ReadRowsResponse.newBuilder()
.setAvroRows(AvroRows.newBuilder().setRowCount(numberOfRows))
.build();
return ReadRowsResponse.newBuilder().setRowCount(numberOfRows).build();
}

RpcExpectation expectRequest(String streamName, long offset) {
Expand Down

0 comments on commit 84c92f9

Please sign in to comment.