diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java index 9684b8d2fc79..ecad11e15f6c 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java @@ -19,6 +19,7 @@ import com.google.api.gax.retrying.StreamResumptionStrategy; import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse; +import javax.annotation.Nonnull; /** * An implementation of a {@link StreamResumptionStrategy} for the ReadRows API. This class tracks @@ -36,13 +37,15 @@ public class ReadRowsResumptionStrategy private long rowsProcessed = 0; @Override + @Nonnull public StreamResumptionStrategy createNew() { return new ReadRowsResumptionStrategy(); } @Override + @Nonnull public ReadRowsResponse processResponse(ReadRowsResponse response) { - rowsProcessed += response.getAvroRows().getRowCount(); + rowsProcessed += response.getRowCount(); return response; } diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java index 5f6df6ab361c..5043c6044672 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java @@ -17,7 +17,6 @@ package com.google.cloud.bigquery.storage.v1beta1.it; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import com.google.api.gax.rpc.ServerStream; import com.google.cloud.ServiceOptions; @@ -121,12 +120,12 @@ public Long call() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(tasks.size()); List> results = executor.invokeAll(tasks); - long avroRowCount = 0; + long rowCount = 0; for (Future result : results) { - avroRowCount += result.get(); + rowCount += result.get(); } - assertEquals(313_797_035, avroRowCount); + assertEquals(313_797_035, rowCount); } private long readAllRowsFromStream(Stream stream) { @@ -135,19 +134,13 @@ private long readAllRowsFromStream(Stream stream) { ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); - long avroRowCount = 0; + long rowCount = 0; ServerStream serverStream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : serverStream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far from stream '%s'. ReadRows response:%n%s", - avroRowCount, stream.getName(), response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); + rowCount += response.getRowCount(); } - LOG.info( - String.format("Read total of %d rows from stream '%s'.", avroRowCount, stream.getName())); - return avroRowCount; + LOG.info(String.format("Read total of %d rows from stream '%s'.", rowCount, stream.getName())); + return rowCount; } } diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java index dfb186beb617..79727cd1f26f 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import com.google.api.gax.rpc.ServerStream; import com.google.cloud.RetryOption; @@ -149,18 +148,13 @@ public void testSimpleRead() { ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); - long avroRowCount = 0; + long rowCount = 0; ServerStream stream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s", - avroRowCount, response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); + rowCount += response.getRowCount(); } - assertEquals(164_656, avroRowCount); + assertEquals(164_656, rowCount); } @Test @@ -187,13 +181,10 @@ public void testSimpleReadAndResume() { // We have to read some number of rows in order to be able to resume. More details: // https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#google.cloud.bigquery.storage.v1beta1.ReadRowsRequest - long avroRowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846); + long rowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846); StreamPosition readPosition = - StreamPosition.newBuilder() - .setStream(session.getStreams(0)) - .setOffset(avroRowCount) - .build(); + StreamPosition.newBuilder().setStream(session.getStreams(0)).setOffset(rowCount).build(); ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); @@ -201,17 +192,12 @@ public void testSimpleReadAndResume() { ServerStream stream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s", - avroRowCount, response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); + rowCount += response.getRowCount(); } // Verifies that the number of rows skipped and read equals to the total number of rows in the // table. - assertEquals(164_656, avroRowCount); + assertEquals(164_656, rowCount); } @Test @@ -252,17 +238,11 @@ public void testFilter() throws IOException { SimpleRowReader reader = new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema())); - long avroRowCount = 0; + long rowCount = 0; ServerStream stream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s", - avroRowCount, response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); - + rowCount += response.getRowCount(); reader.processRows( response.getAvroRows(), new SimpleRowReader.AvroRowConsumer() { @@ -276,7 +256,7 @@ public void accept(GenericData.Record record) { }); } - assertEquals(1_333, avroRowCount); + assertEquals(1_333, rowCount); } @Test @@ -336,15 +316,10 @@ public void testColumnSelection() throws IOException { SimpleRowReader reader = new SimpleRowReader(avroSchema); - long avroRowCount = 0; + long rowCount = 0; ServerStream stream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s", - avroRowCount, response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); + rowCount += response.getRowCount(); reader.processRows( response.getAvroRows(), new SimpleRowReader.AvroRowConsumer() { @@ -362,7 +337,7 @@ public void accept(GenericData.Record record) { }); } - assertEquals(1_333, avroRowCount); + assertEquals(1_333, rowCount); } @Test @@ -864,19 +839,19 @@ private long ReadStreamToOffset(Stream stream, long rowOffset) { ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); - long avroRowCount = 0; + long rowCount = 0; ServerStream serverStream = client.readRowsCallable().call(readRowsRequest); Iterator responseIterator = serverStream.iterator(); while (responseIterator.hasNext()) { ReadRowsResponse response = responseIterator.next(); - avroRowCount += response.getAvroRows().getRowCount(); - if (avroRowCount >= rowOffset) { + rowCount += response.getRowCount(); + if (rowCount >= rowOffset) { return rowOffset; } } - return avroRowCount; + return rowCount; } /** diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java index 52da9b504eaa..714e30b39027 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java @@ -19,7 +19,6 @@ import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.ServerStream; -import com.google.cloud.bigquery.storage.v1beta1.AvroProto.AvroRows; import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient; import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc.BigQueryStorageImplBase; import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings; @@ -167,7 +166,7 @@ private int getRowCount(ReadRowsRequest request) { ServerStream serverStream = client.readRowsCallable().call(request); int rowCount = 0; for (ReadRowsResponse readRowsResponse : serverStream) { - rowCount += readRowsResponse.getAvroRows().getRowCount(); + rowCount += readRowsResponse.getRowCount(); } return rowCount; } @@ -232,9 +231,7 @@ static ReadRowsRequest createRequest(String streamName, long offset) { } static ReadRowsResponse createResponse(int numberOfRows) { - return ReadRowsResponse.newBuilder() - .setAvroRows(AvroRows.newBuilder().setRowCount(numberOfRows)) - .build(); + return ReadRowsResponse.newBuilder().setRowCount(numberOfRows).build(); } RpcExpectation expectRequest(String streamName, long offset) {