Skip to content

Commit 8833c97

Browse files
authored
fix: Close bq read client (#3644)
* Shutdown bqReadClient after high throughput read * Code formatted to google-java-format
1 parent fadd992 commit 8833c97

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ class ConnectionImpl implements Connection {
9797
private final Logger logger = Logger.getLogger(this.getClass().getName());
9898
private BigQueryReadClient bqReadClient;
9999
private static final long EXECUTOR_TIMEOUT_SEC = 10;
100+
private static final long BIGQUERY_TIMEOUT_SEC = 10;
100101
private BlockingQueue<AbstractList<FieldValue>>
101102
bufferFvl; // initialized lazily iff we end up using the tabledata.list end point
102103
private BlockingQueue<BigQueryResultImpl.Row>
@@ -148,8 +149,15 @@ public synchronized boolean close() throws BigQuerySQLException {
148149
flagEndOfStream(); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops
149150
// advancing the cursor
150151
queryTaskExecutor.shutdownNow();
152+
boolean isBqReadClientTerminated = true;
151153
try {
152-
if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) {
154+
if (bqReadClient != null) {
155+
bqReadClient.shutdownNow();
156+
isBqReadClientTerminated =
157+
bqReadClient.awaitTermination(BIGQUERY_TIMEOUT_SEC, TimeUnit.SECONDS);
158+
}
159+
if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)
160+
&& isBqReadClientTerminated) {
153161
return true;
154162
} // else queryTaskExecutor.isShutdown() will be returned outside this try block
155163
} catch (InterruptedException e) {
@@ -159,7 +167,9 @@ public synchronized boolean close() throws BigQuerySQLException {
159167
e); // Logging InterruptedException instead of throwing the exception back, close method
160168
// will return queryTaskExecutor.isShutdown()
161169
}
162-
return queryTaskExecutor.isShutdown(); // check if the executor has been shutdown
170+
171+
return queryTaskExecutor.isShutdown()
172+
&& isBqReadClientTerminated; // check if the executor has been shutdown
163173
}
164174

165175
/**
@@ -992,7 +1002,6 @@ BigQueryResult highThroughPutRead(
9921002
// DO a regex check using order by and use multiple streams
9931003
;
9941004
ReadSession readSession = bqReadClient.createReadSession(builder.build());
995-
9961005
bufferRow = new LinkedBlockingDeque<>(getBufferSize());
9971006
Map<String, Integer> arrowNameToIndex = new HashMap<>();
9981007
// deserialize and populate the buffer async, so that the client isn't blocked
@@ -1050,6 +1059,7 @@ private void processArrowStreamAsync(
10501059
"\n" + Thread.currentThread().getName() + " Interrupted @ markLast",
10511060
e);
10521061
}
1062+
bqReadClient.shutdownNow(); // Shutdown the read client
10531063
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
10541064
}
10551065
};

0 commit comments

Comments
 (0)