Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add query dryRun logic to get the schema when null schema is returned from the backend #2106

Merged
merged 6 commits into from
Jul 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public BigQueryResult executeSelect(String sql) throws BigQuerySQLException {
if (isFastQuerySupported()) {
String projectId = bigQueryOptions.getProjectId();
QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, null, null);
return queryRpc(projectId, queryRequest, false);
return queryRpc(projectId, queryRequest, sql, false);
}
// use jobs.insert otherwise
com.google.api.services.bigquery.model.Job queryJob =
Expand Down Expand Up @@ -236,7 +236,7 @@ public BigQueryResult executeSelect(
final String projectId = bigQueryOptions.getProjectId();
final QueryRequest queryRequest =
createQueryRequest(connectionSettings, sql, parameters, labelMap);
return queryRpc(projectId, queryRequest, parameters != null);
return queryRpc(projectId, queryRequest, sql, parameters != null);
}
// use jobs.insert otherwise
com.google.api.services.bigquery.model.Job queryJob =
Expand Down Expand Up @@ -289,7 +289,10 @@ public int size() {
}

private BigQueryResult queryRpc(
final String projectId, final QueryRequest queryRequest, Boolean hasQueryParameters) {
final String projectId,
final QueryRequest queryRequest,
String sql,
Boolean hasQueryParameters) {
com.google.api.services.bigquery.model.QueryResponse results;
try {
results =
Expand Down Expand Up @@ -324,6 +327,15 @@ private BigQueryResult queryRpc(
Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size());
JobId jobId = JobId.fromPb(results.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
// We might get null schema from the backend occasionally. Ref:
// https://github.com/googleapis/java-bigquery/issues/2103/. Using queryDryRun in such cases
// to get the schema
if (firstPage.getSchema() == null) { // get schema using dry run
com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql);
Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema());
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, schema, hasQueryParameters);
}
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, hasQueryParameters);
}
Expand Down Expand Up @@ -1243,7 +1255,8 @@ com.google.api.services.bigquery.model.Job createQueryJob(
}

// Used by dryRun
private com.google.api.services.bigquery.model.Job createDryRunJob(String sql) {
@VisibleForTesting
com.google.api.services.bigquery.model.Job createDryRunJob(String sql) {
com.google.api.services.bigquery.model.JobConfiguration configurationPb =
new com.google.api.services.bigquery.model.JobConfiguration();
configurationPb.setDryRun(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ public class ConnectionImplTest {
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(FAST_QUERY_TABLESCHEMA);

private static final GetQueryResultsResponse GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA =
new GetQueryResultsResponse()
.setJobReference(QUERY_JOB.toPb())
.setRows(ImmutableList.of(TABLE_ROW))
.setJobComplete(false)
.setPageToken(PAGE_TOKEN)
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(null);

private static List<TableRow> TABLE_ROWS =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));

private BigQueryOptions createBigQueryOptionsForProject(
String project, BigQueryRpcFactory rpcFactory) {
return BigQueryOptions.newBuilder()
Expand Down Expand Up @@ -211,24 +231,13 @@ public void testQueryDryRun() throws BigQuerySQLException {

@Test
public void testParseDataTask() throws InterruptedException {
List<TableRow> tableRows =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));

BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
new LinkedBlockingDeque<>(2);
BlockingQueue<Tuple<TableDataList, Boolean>> rpcResponseQueue = new LinkedBlockingDeque<>(2);
rpcResponseQueue.offer(Tuple.of(null, false));
// This call should populate page cache
ConnectionImpl connectionSpy = Mockito.spy(connection);
connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue);
connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue);
Tuple<Iterable<FieldValueList>, Boolean> fvlTupple =
pageCache.take(); // wait for the parser thread to parse the data
assertNotNull(fvlTupple);
Expand All @@ -247,16 +256,6 @@ public void testParseDataTask() throws InterruptedException {

@Test
public void testPopulateBuffer() throws InterruptedException {
List<TableRow> tableRows =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));

BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
new LinkedBlockingDeque<>(2);
Expand All @@ -266,7 +265,7 @@ public void testPopulateBuffer() throws InterruptedException {
// This call should populate page cache
ConnectionImpl connectionSpy = Mockito.spy(connection);

connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue);
connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue);

verify(connectionSpy, times(1))
.parseRpcDataAsync(
Expand Down Expand Up @@ -358,19 +357,62 @@ public void testLegacyQuerySinglePage() throws BigQuerySQLException {
.createJobForQuery(any(com.google.api.services.bigquery.model.Job.class));
}

// calls executeSelect with a Fast query and emulates that no schema is returned with the first
// page
@Test
public void testFastQueryNullSchema() throws BigQuerySQLException {
ConnectionImpl connectionSpy = Mockito.spy(connection);
QueryRequest queryReqMock = new QueryRequest();
com.google.api.services.bigquery.model.JobStatistics stats =
new com.google.api.services.bigquery.model.JobStatistics()
.setQuery(new JobStatistics2().setSchema(FAST_QUERY_TABLESCHEMA));
com.google.api.services.bigquery.model.Job jobResponseMock =
new com.google.api.services.bigquery.model.Job()
// .setConfiguration(QUERY_JOB.g)
.setJobReference(QUERY_JOB.toPb())
.setId(JOB)
.setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE"))
.setStatistics(stats);
// emulating a legacy query
doReturn(true).when(connectionSpy).isFastQuerySupported();
com.google.api.services.bigquery.model.QueryResponse mockQueryRes =
new QueryResponse()
.setSchema(FAST_QUERY_TABLESCHEMA)
.setJobComplete(false) // so that it goes to the else part in queryRpc
.setTotalRows(new BigInteger(String.valueOf(4L)))
.setJobReference(QUERY_JOB.toPb())
.setRows(TABLE_ROWS);
when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
.thenReturn(mockQueryRes);
doReturn(GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA) // wiring the null schema for the test case
.when(connectionSpy)
.getQueryResultsFirstPage(any(JobId.class));
doReturn(BQ_RS_MOCK_RES)
.when(connectionSpy)
.getSubsequentQueryResultsWithJob(
any(Long.class),
any(Long.class),
any(JobId.class),
any(GetQueryResultsResponse.class),
any(Schema.class),
any(Boolean.class));
doReturn(jobResponseMock).when(connectionSpy).createDryRunJob(any(String.class));
BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY);
assertEquals(res.getTotalRows(), 2);
assertEquals(QUERY_SCHEMA, res.getSchema());
verify(connectionSpy, times(1))
.getSubsequentQueryResultsWithJob(
any(Long.class),
any(Long.class),
any(JobId.class),
any(GetQueryResultsResponse.class),
any(Schema.class),
any(Boolean.class));
}

// exercises getSubsequentQueryResultsWithJob for fast running queries
@Test
public void testFastQueryLongRunning() throws SQLException {
List<TableRow> tableRows =
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value1"), new TableCell().setV("Value2"))),
new TableRow()
.setF(
ImmutableList.of(
new TableCell().setV("Value3"), new TableCell().setV("Value4"))));
ConnectionImpl connectionSpy = Mockito.spy(connection);
// emulating a fast query
doReturn(true).when(connectionSpy).isFastQuerySupported();
Expand All @@ -389,7 +431,7 @@ public void testFastQueryLongRunning() throws SQLException {
.setJobComplete(false)
.setTotalRows(new BigInteger(String.valueOf(4L)))
.setJobReference(QUERY_JOB.toPb())
.setRows(tableRows);
.setRows(TABLE_ROWS);
when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class)))
.thenReturn(mockQueryRes);
BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY);
Expand Down