Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherrypicking #31721 to release 2.58.0 #31868

Merged
merged 11 commits into from
Jul 15, 2024
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
## New Features / Improvements

* Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)).
* Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721))
* [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726))
* Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)).
* Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)])
Expand All @@ -82,6 +83,7 @@

## Bugfixes

* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710))
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
*/
@AutoValue
abstract class AppendClientInfo {
private final Counter activeConnections =
Metrics.counter(AppendClientInfo.class, "activeConnections");
private final Counter activeStreamAppendClients =
Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients");

abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();

Expand Down Expand Up @@ -123,7 +123,7 @@ public AppendClientInfo withAppendClient(
writeStreamService.getStreamAppendClient(
streamName, getDescriptor(), useConnectionPool, missingValueInterpretation);

activeConnections.inc();
activeStreamAppendClients.inc();

return toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
}
Expand All @@ -133,7 +133,7 @@ public void close() {
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
getCloseAppendClient().accept(client);
activeConnections.dec();
activeStreamAppendClients.dec();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,28 @@ public interface BigQueryOptions

void setNumStorageWriteApiStreamAppendClients(Integer value);

@Description(
"When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), "
+ "this option sets the minimum number of connections each pool creates before any connections are shared. This is "
+ "on a per worker, per region basis. Note that in practice, the minimum number of connections created is the minimum "
+ "of this value and (numStorageWriteApiStreamAppendClients x num destinations). BigQuery will create this many "
+ "connections at first and will only create more connections if the current ones are \"overwhelmed\". Consider "
+ "increasing this value if you are running into performance issues.")
@Default.Integer(2)
Integer getMinConnectionPoolConnections();

void setMinConnectionPoolConnections(Integer value);

@Description(
"When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), "
+ "this option sets the maximum number of connections each pool creates. This is on a per worker, per region basis. "
+ "If writing to many dynamic destinations (>20) and experiencing performance issues or seeing append operations competing"
+ "for streams, consider increasing this value.")
@Default.Integer(20)
Integer getMaxConnectionPoolConnections();

void setMaxConnectionPoolConnections(Integer value);

@Description("The max number of messages inflight that we expect each connection will retain.")
@Default.Long(1000)
Long getStorageWriteMaxInflightRequests();
Expand All @@ -122,6 +144,11 @@ public interface BigQueryOptions

void setStorageWriteMaxInflightBytes(Long value);

@Description(
"Enables multiplexing mode, where multiple tables can share the same connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE"
+ " mode. This is recommended if your write operation is creating 20+ connections. When using multiplexing, consider tuning "
+ "the number of connections created by the connection pool with minConnectionPoolConnections and maxConnectionPoolConnections. "
+ "For more information, see https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management")
@Default.Boolean(false)
Boolean getUseStorageApiConnectionPool();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
Expand Down Expand Up @@ -1423,6 +1424,14 @@ public StreamAppendClient getStreamAppendClient(
bqIOMetadata.getBeamJobId() == null ? "" : bqIOMetadata.getBeamJobId(),
bqIOMetadata.getBeamWorkerId() == null ? "" : bqIOMetadata.getBeamWorkerId());

ConnectionWorkerPool.setOptions(
ConnectionWorkerPool.Settings.builder()
.setMinConnectionsPerRegion(
options.as(BigQueryOptions.class).getMinConnectionPoolConnections())
.setMaxConnectionsPerRegion(
options.as(BigQueryOptions.class).getMaxConnectionPoolConnections())
.build());

StreamWriter streamWriter =
StreamWriter.newBuilder(streamName, newWriteClient)
.setExecutorProvider(
Expand Down
Loading