Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add connection worker skeleton used for multiplexing client #1778

Merged
merged 3 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/.OwlBot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ deep-preserve-regex:
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1.*/Waiter.java"
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java"
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java"
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java"

deep-copy-regex:
- source: "/google/cloud/bigquery/storage/(v.*)/.*-java/proto-google-.*/src"
Expand Down
21 changes: 19 additions & 2 deletions google-cloud-bigquerystorage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<usedDependencies>
<dependency>com.google.auto.value:auto-value</dependency>
</usedDependencies>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
Expand All @@ -63,6 +72,16 @@
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand All @@ -71,7 +90,6 @@
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-bigquerystorage-v1beta1</artifactId>
Expand Down Expand Up @@ -134,7 +152,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.auto.value.AutoValue;
import javax.annotation.concurrent.GuardedBy;

public class ConnectionWorkerPool {
/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
*/
private final long maxInflightRequests;

/*
* Max allowed inflight bytes in the stream. Method append is blocked at this.
*/
private final long maxInflightBytes;

/*
* Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
*/
private final FlowController.LimitExceededBehavior limitExceededBehavior;

/*
* TraceId for debugging purpose.
*/
private final String traceId;

/*
* Tracks current inflight requests in the stream.
*/
@GuardedBy("lock")
private long inflightRequests = 0;

/*
* Tracks current inflight bytes in the stream.
*/
@GuardedBy("lock")
private long inflightBytes = 0;

/*
* Tracks how often the stream was closed due to a retriable error. Streaming will stop when the
* count hits a threshold. Streaming should only be halted, if it isn't possible to establish a
* connection. Keep track of the number of reconnections in succession. This will be reset if
* a row is successfully called back.
*/
@GuardedBy("lock")
private long conectionRetryCountWithoutCallback = 0;

/*
* If false, streamConnection needs to be reset.
*/
@GuardedBy("lock")
private boolean streamConnectionIsConnected = false;

/*
* A boolean to track if we cleaned up inflight queue.
*/
@GuardedBy("lock")
private boolean inflightCleanuped = false;

/*
* Indicates whether user has called Close() or not.
*/
@GuardedBy("lock")
private boolean userClosed = false;

/*
* The final status of connection. Set to nonnull when connection is permanently closed.
*/
@GuardedBy("lock")
private Throwable connectionFinalStatus = null;

/*
* Contains the updated TableSchema.
*/
@GuardedBy("lock")
private TableSchema updatedSchema;

/*
* A client used to interact with BigQuery.
*/
private BigQueryWriteClient client;

/*
* If true, the client above is created by this writer and should be closed.
*/
private boolean ownsBigQueryWriteClient = false;

/** Settings for connection pool. */
@AutoValue
public abstract static class Settings {
/**
* The minimum connections each pool created before trying to reuse the previously created
* connection in multiplexing mode.
*/
abstract int minConnectionsPerPool();

/** The maximum connections per connection pool. */
abstract int maxConnectionsPerPool();

public static Builder builder() {
return new AutoValue_ConnectionWorkerPool_Settings.Builder()
.setMinConnectionsPerPool(2)
.setMaxConnectionsPerPool(10);
}

/** Builder for the options to config {@link ConnectionWorkerPool}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setMinConnectionsPerPool(int value);

public abstract Builder setMaxConnectionsPerPool(int value);

public abstract Settings build();
}
}

/** Static setting for connection pool. */
private static Settings settings = Settings.builder().build();

public ConnectionWorkerPool(
long maxInflightRequests,
long maxInflightBytes,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
boolean ownsBigQueryWriteClient) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
this.client = client;
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
}

/**
* Sets static connection pool options.
*
* <p>Note: this method should be triggered prior to the construction of connection pool.
*/
public static void setOptions(Settings settings) {
ConnectionWorkerPool.settings = settings;
}

/** Distributes the writing of a message to an underlying connection. */
public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows) {
throw new RuntimeException("Append is not implemented!");
}

/** Distributes the writing of a message to an underlying connection. */
public ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows rows, long offset) {
throw new RuntimeException("append with offset is not implemented on connection pool!");
}

/** Close the stream writer. Shut down all resources. */
public void close(StreamWriter streamWriter) {
throw new RuntimeException("close is implemented on connection pool");
}
}