Skip to content

Commit

Permalink
adding integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
suryasoma committed Apr 14, 2023
1 parent d391d50 commit 316dbc8
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,8 @@ public void shutdownNow() {
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return stub.awaitTermination(duration, unit);
}

public BigQueryReadStubSettings getStubSettings() {
return stubSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.RetryOption;
import com.google.cloud.ServiceOptions;
Expand All @@ -44,6 +46,7 @@
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.cloud.bigquery.storage.v1.ReadStream;
Expand Down Expand Up @@ -806,6 +809,54 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio
assertEquals(rowAssertMessage, new Utf8("abc"), structRecord.get("str_field"));
}

@Test
public void testSimpleReadWithBackgroundExecutorProvider() throws IOException {
BigQueryReadSettings bigQueryReadSettings = BigQueryReadSettings
.newBuilder()
.setBackgroundExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build())
.build();
// Overriding the default client
client = BigQueryReadClient.create(bigQueryReadSettings);
assertTrue(
client.getStub().getStubSettings().getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider);
assertEquals(
14,
((InstantiatingExecutorProvider) client.getStub().getStubSettings().getBackgroundExecutorProvider())
.getExecutorThreadCount());
String table =
BigQueryResource.FormatTableResource(
/* projectId = */ "bigquery-public-data",
/* datasetId = */ "samples",
/* tableId = */ "shakespeare");

ReadSession session =
client.createReadSession(
/* parent = */ parentProjectId,
/* readSession = */ ReadSession.newBuilder()
.setTable(table)
.setDataFormat(DataFormat.AVRO)
.build(),
/* maxStreamCount = */ 1);
assertEquals(
String.format(
"Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s",
table, session.toString()),
1,
session.getStreamsCount());

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build();

long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
rowCount += response.getRowCount();
}

assertEquals(164_656, rowCount);
}

/**
* Reads to the specified row offset within the stream. If the stream does not have the desired
* rows to read, it will read all of them.
Expand Down

0 comments on commit 316dbc8

Please sign in to comment.