Skip to content

Commit

Permalink
fix: Add query dryRun logic to get the schema when null schema is ret…
Browse files Browse the repository at this point in the history
…urned from the backend (#2106)

* Added dryRun Logic in query RPC when the backend returns a null schema for the Fast query path

* Added testFastQueryNullSchema UT and refactored to make TABLE_ROWS global to reduce code duplicity

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Added additional logs for troubleshooting the jobComplete status

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
prash-mi and gcf-owl-bot[bot] authored Jul 20, 2022
1 parent 8b829ef commit c98d22b
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public BigQueryResult executeSelect(String sql) throws BigQuerySQLException {
if (isFastQuerySupported()) {
String projectId = bigQueryOptions.getProjectId();
QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, null, null);
return queryRpc(projectId, queryRequest, false);
return queryRpc(projectId, queryRequest, sql, false);
}
// use jobs.insert otherwise
com.google.api.services.bigquery.model.Job queryJob =
Expand Down Expand Up @@ -236,7 +236,7 @@ public BigQueryResult executeSelect(
final String projectId = bigQueryOptions.getProjectId();
final QueryRequest queryRequest =
createQueryRequest(connectionSettings, sql, parameters, labelMap);
return queryRpc(projectId, queryRequest, parameters != null);
return queryRpc(projectId, queryRequest, sql, parameters != null);
}
// use jobs.insert otherwise
com.google.api.services.bigquery.model.Job queryJob =
Expand Down Expand Up @@ -289,7 +289,10 @@ public int size() {
}

private BigQueryResult queryRpc(
final String projectId, final QueryRequest queryRequest, Boolean hasQueryParameters) {
final String projectId,
final QueryRequest queryRequest,
String sql,
Boolean hasQueryParameters) {
com.google.api.services.bigquery.model.QueryResponse results;
try {
results =
Expand Down Expand Up @@ -322,8 +325,29 @@ private BigQueryResult queryRpc(
// and can be optimized here, but this is left as future work.
Long totalRows = results.getTotalRows() == null ? null : results.getTotalRows().longValue();
Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size());
logger.log(
Level.WARNING,
"\n"
+ String.format(
"results.getJobComplete(): %s, isSchemaNull: %s , totalRows: %s, pageRows: %s",
results.getJobComplete(), results.getSchema() == null, totalRows, pageRows));
JobId jobId = JobId.fromPb(results.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
// We might get null schema from the backend occasionally. Ref:
// https://github.com/googleapis/java-bigquery/issues/2103/. Using queryDryRun in such cases
// to get the schema
if (firstPage.getSchema() == null) { // get schema using dry run
// Log the status if the job was complete complete
logger.log(
Level.WARNING,
"\n"
+ "Received null schema, Using dryRun the get the Schema. jobComplete:"
+ firstPage.getJobComplete());
com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql);
Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema());
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, schema, hasQueryParameters);
}
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, hasQueryParameters);
}
Expand Down Expand Up @@ -1243,7 +1267,8 @@ com.google.api.services.bigquery.model.Job createQueryJob(
}

// Used by dryRun
private com.google.api.services.bigquery.model.Job createDryRunJob(String sql) {
@VisibleForTesting
com.google.api.services.bigquery.model.Job createDryRunJob(String sql) {
com.google.api.services.bigquery.model.JobConfiguration configurationPb =
new com.google.api.services.bigquery.model.JobConfiguration();
configurationPb.setDryRun(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ public class ConnectionImplTest {
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(FAST_QUERY_TABLESCHEMA);

private static final GetQueryResultsResponse GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA =
new GetQueryResultsResponse()
.setJobReference(QUERY_JOB.toPb())
.setRows(ImmutableList.of(TABLE_ROW))
.setJobComplete(false)
.setPageToken(PAGE_TOKEN)
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(null);

private static List<TableRow> TABLE_ROWS =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));

private BigQueryOptions createBigQueryOptionsForProject(
String project, BigQueryRpcFactory rpcFactory) {
return BigQueryOptions.newBuilder()
Expand Down Expand Up @@ -211,24 +231,13 @@ public void testQueryDryRun() throws BigQuerySQLException {

@Test
public void testParseDataTask() throws InterruptedException {
List<TableRow> tableRows =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));

BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
new LinkedBlockingDeque<>(2);
BlockingQueue<Tuple<TableDataList, Boolean>> rpcResponseQueue = new LinkedBlockingDeque<>(2);
rpcResponseQueue.offer(Tuple.of(null, false));
// This call should populate page cache
ConnectionImpl connectionSpy = Mockito.spy(connection);
connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue);
connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue);
Tuple<Iterable<FieldValueList>, Boolean> fvlTupple =
pageCache.take(); // wait for the parser thread to parse the data
assertNotNull(fvlTupple);
Expand All @@ -247,16 +256,6 @@ public void testParseDataTask() throws InterruptedException {

@Test
public void testPopulateBuffer() throws InterruptedException {
List<TableRow> tableRows =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));

BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
new LinkedBlockingDeque<>(2);
Expand All @@ -266,7 +265,7 @@ public void testPopulateBuffer() throws InterruptedException {
// This call should populate page cache
ConnectionImpl connectionSpy = Mockito.spy(connection);

connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue);
connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue);

verify(connectionSpy, times(1))
.parseRpcDataAsync(
Expand Down Expand Up @@ -358,19 +357,62 @@ public void testLegacyQuerySinglePage() throws BigQuerySQLException {
.createJobForQuery(any(com.google.api.services.bigquery.model.Job.class));
}

// calls executeSelect with a Fast query and emulates that no schema is returned with the first
// page
@Test
public void testFastQueryNullSchema() throws BigQuerySQLException {
ConnectionImpl connectionSpy = Mockito.spy(connection);
QueryRequest queryReqMock = new QueryRequest();
com.google.api.services.bigquery.model.JobStatistics stats =
new com.google.api.services.bigquery.model.JobStatistics()
.setQuery(new JobStatistics2().setSchema(FAST_QUERY_TABLESCHEMA));
com.google.api.services.bigquery.model.Job jobResponseMock =
new com.google.api.services.bigquery.model.Job()
// .setConfiguration(QUERY_JOB.g)
.setJobReference(QUERY_JOB.toPb())
.setId(JOB)
.setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE"))
.setStatistics(stats);
// emulating a legacy query
doReturn(true).when(connectionSpy).isFastQuerySupported();
com.google.api.services.bigquery.model.QueryResponse mockQueryRes =
new QueryResponse()
.setSchema(FAST_QUERY_TABLESCHEMA)
.setJobComplete(false) // so that it goes to the else part in queryRpc
.setTotalRows(new BigInteger(String.valueOf(4L)))
.setJobReference(QUERY_JOB.toPb())
.setRows(TABLE_ROWS);
when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
.thenReturn(mockQueryRes);
doReturn(GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA) // wiring the null schema for the test case
.when(connectionSpy)
.getQueryResultsFirstPage(any(JobId.class));
doReturn(BQ_RS_MOCK_RES)
.when(connectionSpy)
.getSubsequentQueryResultsWithJob(
any(Long.class),
any(Long.class),
any(JobId.class),
any(GetQueryResultsResponse.class),
any(Schema.class),
any(Boolean.class));
doReturn(jobResponseMock).when(connectionSpy).createDryRunJob(any(String.class));
BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY);
assertEquals(res.getTotalRows(), 2);
assertEquals(QUERY_SCHEMA, res.getSchema());
verify(connectionSpy, times(1))
.getSubsequentQueryResultsWithJob(
any(Long.class),
any(Long.class),
any(JobId.class),
any(GetQueryResultsResponse.class),
any(Schema.class),
any(Boolean.class));
}

// exercises getSubsequentQueryResultsWithJob for fast running queries
@Test
public void testFastQueryLongRunning() throws SQLException {
List<TableRow> tableRows =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));
ConnectionImpl connectionSpy = Mockito.spy(connection);
// emulating a fast query
doReturn(true).when(connectionSpy).isFastQuerySupported();
Expand All @@ -389,7 +431,7 @@ public void testFastQueryLongRunning() throws SQLException {
.setJobComplete(false)
.setTotalRows(new BigInteger(String.valueOf(4L)))
.setJobReference(QUERY_JOB.toPb())
.setRows(tableRows);
.setRows(TABLE_ROWS);
when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
.thenReturn(mockQueryRes);
BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY);
Expand Down

0 comments on commit c98d22b

Please sign in to comment.