From 7b35934f100e4aa2faa430802856342aa41e45d6 Mon Sep 17 00:00:00 2001 From: Prashant Mishra Date: Wed, 8 Jun 2022 11:40:57 +0530 Subject: [PATCH 1/4] Added dryRun Logic in query RPC when the backend returns a null schema for the Fast query path --- .../google/cloud/bigquery/ConnectionImpl.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index be5174b26..c02d59904 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -192,7 +192,7 @@ public BigQueryResult executeSelect(String sql) throws BigQuerySQLException { if (isFastQuerySupported()) { String projectId = bigQueryOptions.getProjectId(); QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, null, null); - return queryRpc(projectId, queryRequest, false); + return queryRpc(projectId, queryRequest, sql, false); } // use jobs.insert otherwise com.google.api.services.bigquery.model.Job queryJob = @@ -236,7 +236,7 @@ public BigQueryResult executeSelect( final String projectId = bigQueryOptions.getProjectId(); final QueryRequest queryRequest = createQueryRequest(connectionSettings, sql, parameters, labelMap); - return queryRpc(projectId, queryRequest, parameters != null); + return queryRpc(projectId, queryRequest, sql, parameters != null); } // use jobs.insert otherwise com.google.api.services.bigquery.model.Job queryJob = @@ -289,7 +289,10 @@ public int size() { } private BigQueryResult queryRpc( - final String projectId, final QueryRequest queryRequest, Boolean hasQueryParameters) { + final String projectId, + final QueryRequest queryRequest, + String sql, + Boolean hasQueryParameters) { com.google.api.services.bigquery.model.QueryResponse results; try { results = @@ -324,6 +327,15 @@ private BigQueryResult queryRpc( Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size()); JobId jobId = JobId.fromPb(results.getJobReference()); GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId); + // We might get null schema from the backend occasionally. Ref: + // https://github.com/googleapis/java-bigquery/issues/2103/. Using queryDryRun in such cases + // to get the schema + if (firstPage.getSchema() == null) { // get schema using dry run + com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql); + Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema()); + return getSubsequentQueryResultsWithJob( + totalRows, pageRows, jobId, firstPage, schema, hasQueryParameters); + } return getSubsequentQueryResultsWithJob( totalRows, pageRows, jobId, firstPage, hasQueryParameters); } @@ -1243,7 +1255,8 @@ com.google.api.services.bigquery.model.Job createQueryJob( } // Used by dryRun - private com.google.api.services.bigquery.model.Job createDryRunJob(String sql) { + @VisibleForTesting + com.google.api.services.bigquery.model.Job createDryRunJob(String sql) { com.google.api.services.bigquery.model.JobConfiguration configurationPb = new com.google.api.services.bigquery.model.JobConfiguration(); configurationPb.setDryRun(true); From 201c13a34e223c331ebecb4f6d30271db5ec7e5c Mon Sep 17 00:00:00 2001 From: Prashant Mishra Date: Wed, 8 Jun 2022 11:42:29 +0530 Subject: [PATCH 2/4] Added testFastQueryNullSchema UT and refactored to make TABLE_ROWS global to reduce code duplicity --- .../cloud/bigquery/ConnectionImplTest.java | 110 ++++++++++++------ 1 file changed, 76 insertions(+), 34 deletions(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java index e4fdc9731..9543ccebf 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/ConnectionImplTest.java @@ -93,6 +93,26 @@ public class ConnectionImplTest { .setTotalRows(BigInteger.valueOf(1L)) .setSchema(FAST_QUERY_TABLESCHEMA); + private static final GetQueryResultsResponse GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA = + new GetQueryResultsResponse() + .setJobReference(QUERY_JOB.toPb()) + .setRows(ImmutableList.of(TABLE_ROW)) + .setJobComplete(false) + .setPageToken(PAGE_TOKEN) + .setTotalBytesProcessed(42L) + .setTotalRows(BigInteger.valueOf(1L)) + .setSchema(null); + + private static List TABLE_ROWS = + ImmutableList.of( + new TableRow() + .setF( + ImmutableList.of(new TableCell().setV("Value1"), new TableCell().setV("Value2"))), + new TableRow() + .setF( + ImmutableList.of( + new TableCell().setV("Value3"), new TableCell().setV("Value4")))); + private BigQueryOptions createBigQueryOptionsForProject( String project, BigQueryRpcFactory rpcFactory) { return BigQueryOptions.newBuilder() @@ -211,24 +231,13 @@ public void testQueryDryRun() throws BigQuerySQLException { @Test public void testParseDataTask() throws InterruptedException { - List tableRows = - ImmutableList.of( - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value1"), new TableCell().setV("Value2"))), - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value3"), new TableCell().setV("Value4")))); - BlockingQueue, Boolean>> pageCache = new LinkedBlockingDeque<>(2); BlockingQueue> rpcResponseQueue = new LinkedBlockingDeque<>(2); rpcResponseQueue.offer(Tuple.of(null, false)); // This call should populate page cache ConnectionImpl connectionSpy = Mockito.spy(connection); - connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue); + connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue); Tuple, Boolean> fvlTupple = pageCache.take(); // wait for the parser thread to parse the data assertNotNull(fvlTupple); @@ -247,16 +256,6 @@ public void testParseDataTask() throws InterruptedException { @Test public void testPopulateBuffer() throws InterruptedException { - List tableRows = - ImmutableList.of( - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value1"), new TableCell().setV("Value2"))), - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value3"), new TableCell().setV("Value4")))); BlockingQueue, Boolean>> pageCache = new LinkedBlockingDeque<>(2); @@ -266,7 +265,7 @@ public void testPopulateBuffer() throws InterruptedException { // This call should populate page cache ConnectionImpl connectionSpy = Mockito.spy(connection); - connectionSpy.parseRpcDataAsync(tableRows, QUERY_SCHEMA, pageCache, rpcResponseQueue); + connectionSpy.parseRpcDataAsync(TABLE_ROWS, QUERY_SCHEMA, pageCache, rpcResponseQueue); verify(connectionSpy, times(1)) .parseRpcDataAsync( @@ -358,19 +357,62 @@ public void testLegacyQuerySinglePage() throws BigQuerySQLException { .createJobForQuery(any(com.google.api.services.bigquery.model.Job.class)); } + // calls executeSelect with a Fast query and emulates that no schema is returned with the first + // page + @Test + public void testFastQueryNullSchema() throws BigQuerySQLException { + ConnectionImpl connectionSpy = Mockito.spy(connection); + QueryRequest queryReqMock = new QueryRequest(); + com.google.api.services.bigquery.model.JobStatistics stats = + new com.google.api.services.bigquery.model.JobStatistics() + .setQuery(new JobStatistics2().setSchema(FAST_QUERY_TABLESCHEMA)); + com.google.api.services.bigquery.model.Job jobResponseMock = + new com.google.api.services.bigquery.model.Job() + // .setConfiguration(QUERY_JOB.g) + .setJobReference(QUERY_JOB.toPb()) + .setId(JOB) + .setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE")) + .setStatistics(stats); + // emulating a legacy query + doReturn(true).when(connectionSpy).isFastQuerySupported(); + com.google.api.services.bigquery.model.QueryResponse mockQueryRes = + new QueryResponse() + .setSchema(FAST_QUERY_TABLESCHEMA) + .setJobComplete(false) // so that it goes to the else part in queryRpc + .setTotalRows(new BigInteger(String.valueOf(4L))) + .setJobReference(QUERY_JOB.toPb()) + .setRows(TABLE_ROWS); + when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class))) + .thenReturn(mockQueryRes); + doReturn(GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA) // wiring the null schema for the test case + .when(connectionSpy) + .getQueryResultsFirstPage(any(JobId.class)); + doReturn(BQ_RS_MOCK_RES) + .when(connectionSpy) + .getSubsequentQueryResultsWithJob( + any(Long.class), + any(Long.class), + any(JobId.class), + any(GetQueryResultsResponse.class), + any(Schema.class), + any(Boolean.class)); + doReturn(jobResponseMock).when(connectionSpy).createDryRunJob(any(String.class)); + BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY); + assertEquals(res.getTotalRows(), 2); + assertEquals(QUERY_SCHEMA, res.getSchema()); + verify(connectionSpy, times(1)) + .getSubsequentQueryResultsWithJob( + any(Long.class), + any(Long.class), + any(JobId.class), + any(GetQueryResultsResponse.class), + any(Schema.class), + any(Boolean.class)); + } + // exercises getSubsequentQueryResultsWithJob for fast running queries @Test public void testFastQueryLongRunning() throws SQLException { - List tableRows = - ImmutableList.of( - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value1"), new TableCell().setV("Value2"))), - new TableRow() - .setF( - ImmutableList.of( - new TableCell().setV("Value3"), new TableCell().setV("Value4")))); ConnectionImpl connectionSpy = Mockito.spy(connection); // emulating a fast query doReturn(true).when(connectionSpy).isFastQuerySupported(); @@ -389,7 +431,7 @@ public void testFastQueryLongRunning() throws SQLException { .setJobComplete(false) .setTotalRows(new BigInteger(String.valueOf(4L))) .setJobReference(QUERY_JOB.toPb()) - .setRows(tableRows); + .setRows(TABLE_ROWS); when(bigqueryRpcMock.queryRpc(any(String.class), any(QueryRequest.class))) .thenReturn(mockQueryRes); BigQueryResult res = connectionSpy.executeSelect(SQL_QUERY); From 7f961b1adeb59c63a44c6a42a8c8fedad1deee28 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 14 Jun 2022 08:32:57 +0000 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e963b29f5..1419a8f04 100644 --- a/README.md +++ b/README.md @@ -59,13 +59,13 @@ implementation 'com.google.cloud:google-cloud-bigquery' If you are using Gradle without BOM, add this to your dependencies ```Groovy -implementation 'com.google.cloud:google-cloud-bigquery:2.13.1' +implementation 'com.google.cloud:google-cloud-bigquery:2.13.2' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.13.1" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.13.2" ``` ## Authentication From c93813c178c2fac354a9203df7087875c49fc193 Mon Sep 17 00:00:00 2001 From: Prashant Mishra Date: Tue, 19 Jul 2022 23:41:53 +0530 Subject: [PATCH 4/4] Added additional logs for troubleshooting the jobComplete status --- .../com/google/cloud/bigquery/ConnectionImpl.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index c02d59904..3a9937a07 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -325,12 +325,24 @@ private BigQueryResult queryRpc( // and can be optimized here, but this is left as future work. Long totalRows = results.getTotalRows() == null ? null : results.getTotalRows().longValue(); Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size()); + logger.log( + Level.WARNING, + "\n" + + String.format( + "results.getJobComplete(): %s, isSchemaNull: %s , totalRows: %s, pageRows: %s", + results.getJobComplete(), results.getSchema() == null, totalRows, pageRows)); JobId jobId = JobId.fromPb(results.getJobReference()); GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId); // We might get null schema from the backend occasionally. Ref: // https://github.com/googleapis/java-bigquery/issues/2103/. Using queryDryRun in such cases // to get the schema if (firstPage.getSchema() == null) { // get schema using dry run + // Log the status if the job was complete complete + logger.log( + Level.WARNING, + "\n" + + "Received null schema, Using dryRun the get the Schema. jobComplete:" + + firstPage.getJobComplete()); com.google.api.services.bigquery.model.Job dryRunJob = createDryRunJob(sql); Schema schema = Schema.fromPb(dryRunJob.getStatistics().getQuery().getSchema()); return getSubsequentQueryResultsWithJob(