Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update resumption strategy to use format-independent row count. #5658

Merged
merged 2 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.gax.retrying.StreamResumptionStrategy;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
import javax.annotation.Nonnull;

/**
* An implementation of a {@link StreamResumptionStrategy} for the ReadRows API. This class tracks
Expand All @@ -36,13 +37,15 @@ public class ReadRowsResumptionStrategy
private long rowsProcessed = 0;

@Override
@Nonnull
public StreamResumptionStrategy<ReadRowsRequest, ReadRowsResponse> createNew() {
return new ReadRowsResumptionStrategy();
}

@Override
@Nonnull
public ReadRowsResponse processResponse(ReadRowsResponse response) {
rowsProcessed += response.getAvroRows().getRowCount();
rowsProcessed += response.getRowCount();
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.bigquery.storage.v1beta1.it;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceOptions;
Expand Down Expand Up @@ -121,12 +120,12 @@ public Long call() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
List<Future<Long>> results = executor.invokeAll(tasks);

long avroRowCount = 0;
long rowCount = 0;
for (Future<Long> result : results) {
avroRowCount += result.get();
rowCount += result.get();
}

assertEquals(313_797_035, avroRowCount);
assertEquals(313_797_035, rowCount);
}

private long readAllRowsFromStream(Stream stream) {
Expand All @@ -135,19 +134,13 @@ private long readAllRowsFromStream(Stream stream) {
ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

long avroRowCount = 0;
long rowCount = 0;
ServerStream<ReadRowsResponse> serverStream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : serverStream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far from stream '%s'. ReadRows response:%n%s",
avroRowCount, stream.getName(), response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();
rowCount += response.getRowCount();
}

LOG.info(
String.format("Read total of %d rows from stream '%s'.", avroRowCount, stream.getName()));
return avroRowCount;
LOG.info(String.format("Read total of %d rows from stream '%s'.", rowCount, stream.getName()));
return rowCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.RetryOption;
Expand Down Expand Up @@ -149,18 +148,13 @@ public void testSimpleRead() {
ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

long avroRowCount = 0;
long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s",
avroRowCount, response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();
rowCount += response.getRowCount();
}

assertEquals(164_656, avroRowCount);
assertEquals(164_656, rowCount);
}

@Test
Expand All @@ -187,31 +181,23 @@ public void testSimpleReadAndResume() {
// We have to read some number of rows in order to be able to resume. More details:
// https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#google.cloud.bigquery.storage.v1beta1.ReadRowsRequest

long avroRowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846);
long rowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846);

StreamPosition readPosition =
StreamPosition.newBuilder()
.setStream(session.getStreams(0))
.setOffset(avroRowCount)
.build();
StreamPosition.newBuilder().setStream(session.getStreams(0)).setOffset(rowCount).build();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);

for (ReadRowsResponse response : stream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s",
avroRowCount, response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();
rowCount += response.getRowCount();
}

// Verifies that the number of rows skipped and read equals to the total number of rows in the
// table.
assertEquals(164_656, avroRowCount);
assertEquals(164_656, rowCount);
}

@Test
Expand Down Expand Up @@ -252,17 +238,11 @@ public void testFilter() throws IOException {
SimpleRowReader reader =
new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema()));

long avroRowCount = 0;
long rowCount = 0;

ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s",
avroRowCount, response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();

rowCount += response.getRowCount();
reader.processRows(
response.getAvroRows(),
new SimpleRowReader.AvroRowConsumer() {
Expand All @@ -276,7 +256,7 @@ public void accept(GenericData.Record record) {
});
}

assertEquals(1_333, avroRowCount);
assertEquals(1_333, rowCount);
}

@Test
Expand Down Expand Up @@ -336,15 +316,10 @@ public void testColumnSelection() throws IOException {

SimpleRowReader reader = new SimpleRowReader(avroSchema);

long avroRowCount = 0;
long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
assertTrue(
String.format(
"Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s",
avroRowCount, response.toString()),
response.hasAvroRows());
avroRowCount += response.getAvroRows().getRowCount();
rowCount += response.getRowCount();
reader.processRows(
response.getAvroRows(),
new SimpleRowReader.AvroRowConsumer() {
Expand All @@ -362,7 +337,7 @@ public void accept(GenericData.Record record) {
});
}

assertEquals(1_333, avroRowCount);
assertEquals(1_333, rowCount);
}

@Test
Expand Down Expand Up @@ -864,19 +839,19 @@ private long ReadStreamToOffset(Stream stream, long rowOffset) {
ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

long avroRowCount = 0;
long rowCount = 0;
ServerStream<ReadRowsResponse> serverStream = client.readRowsCallable().call(readRowsRequest);
Iterator<ReadRowsResponse> responseIterator = serverStream.iterator();

while (responseIterator.hasNext()) {
ReadRowsResponse response = responseIterator.next();
avroRowCount += response.getAvroRows().getRowCount();
if (avroRowCount >= rowOffset) {
rowCount += response.getRowCount();
if (rowCount >= rowOffset) {
return rowOffset;
}
}

return avroRowCount;
return rowCount;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1beta1.AvroProto.AvroRows;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc.BigQueryStorageImplBase;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
Expand Down Expand Up @@ -167,7 +166,7 @@ private int getRowCount(ReadRowsRequest request) {
ServerStream<ReadRowsResponse> serverStream = client.readRowsCallable().call(request);
int rowCount = 0;
for (ReadRowsResponse readRowsResponse : serverStream) {
rowCount += readRowsResponse.getAvroRows().getRowCount();
rowCount += readRowsResponse.getRowCount();
}
return rowCount;
}
Expand Down Expand Up @@ -232,9 +231,7 @@ static ReadRowsRequest createRequest(String streamName, long offset) {
}

static ReadRowsResponse createResponse(int numberOfRows) {
return ReadRowsResponse.newBuilder()
.setAvroRows(AvroRows.newBuilder().setRowCount(numberOfRows))
.build();
return ReadRowsResponse.newBuilder().setRowCount(numberOfRows).build();
}

RpcExpectation expectRequest(String streamName, long offset) {
Expand Down