Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add update schema support for multiplexing #1867

Merged
merged 43 commits into from
Nov 12, 2022
Merged
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5a63d95
feat: Split writer into connection worker and wrapper, this is a
GaoleMeng Sep 9, 2022
5a13302
feat: add connection worker pool skeleton, used for multiplexing client
GaoleMeng Sep 13, 2022
0297204
Merge branch 'main' into main
GaoleMeng Sep 14, 2022
8a81ad3
feat: add Load api for connection worker for multiplexing client
GaoleMeng Sep 14, 2022
68fd040
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 14, 2022
3106dae
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 15, 2022
5bf04e5
Merge branch 'googleapis:main' into main
GaoleMeng Sep 15, 2022
2fc7551
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 15, 2022
7a6d919
feat: add multiplexing support to connection worker. We will treat every
GaoleMeng Sep 15, 2022
3ba7659
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
f379a78
Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
9307776
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 16, 2022
de73013
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
19005a1
feat: port the multiplexing client core algorithm and basic tests
GaoleMeng Sep 19, 2022
c5d14ba
Merge branch 'googleapis:main' into main
GaoleMeng Sep 19, 2022
644360a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 20, 2022
3099d82
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
e707dd6
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
9e7a8fa
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 20, 2022
31f1755
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
44c36fc
feat: wire multiplexing connection pool to stream writer
GaoleMeng Sep 20, 2022
87a4036
feat: some fixes for multiplexing client
GaoleMeng Sep 23, 2022
c92ea1b
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 23, 2022
019520c
Merge branch 'googleapis:main' into main
GaoleMeng Sep 26, 2022
47893df
feat: fix some todos, and reject the mixed behavior of passed in clie…
GaoleMeng Sep 27, 2022
8bd4e6a
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
83409b0
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
f7dd72d
Merge branch 'googleapis:main' into main
GaoleMeng Sep 27, 2022
a48399f
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
6789bc9
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
46b4e6c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 29, 2022
dfd4dd9
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
d68ae70
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
2983fe9
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 29, 2022
d406256
Merge branch 'googleapis:main' into main
GaoleMeng Oct 13, 2022
22e9e07
feat: add getInflightWaitSeconds implementation
GaoleMeng Oct 13, 2022
fdb4e1c
Merge branch 'googleapis:main' into main
GaoleMeng Oct 21, 2022
0469474
Merge branch 'googleapis:main' into main
GaoleMeng Nov 2, 2022
d1b7740
feat: Add schema comparision in connection loop to ensure schema upda…
GaoleMeng Nov 3, 2022
e4cd529
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 4, 2022
74ff1c4
Merge branch 'googleapis:main' into main
GaoleMeng Nov 4, 2022
762f49e
feat: add schema update support to multiplexing
GaoleMeng Nov 5, 2022
de456c2
Merge branch 'googleapis:main' into main
GaoleMeng Nov 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -65,4 +65,15 @@
<className>com/google/cloud/bigquery/storage/v1/Exceptions$AppendSerializtionError</className>
<method>Exceptions$AppendSerializtionError(java.lang.String, java.util.Map)</method>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema()</method>
<to>com.google.cloud.bigquery.storage.v1.ConnectionWorker$TableSchemaAndTimestamp</to>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema()</method>
</difference>
</differences>
6 changes: 6 additions & 0 deletions google-cloud-bigquerystorage/pom.xml
Original file line number Diff line number Diff line change
@@ -152,6 +152,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>1.42.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Instant;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
@@ -159,7 +160,7 @@ public class ConnectionWorker implements AutoCloseable {
* Contains the updated TableSchema.
*/
@GuardedBy("lock")
private TableSchema updatedSchema;
private TableSchemaAndTimestamp updatedSchema;

/*
* A client used to interact with BigQuery.
@@ -608,7 +609,8 @@ private void requestCallback(AppendRowsResponse response) {
AppendRequestAndResponse requestWrapper;
this.lock.lock();
if (response.hasUpdatedSchema()) {
this.updatedSchema = response.getUpdatedSchema();
this.updatedSchema =
TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema());
}
try {
// Had a successful connection with at least one result, reset retries.
@@ -720,7 +722,7 @@ private AppendRequestAndResponse pollInflightRequestQueue() {
}

/** Thread-safe getter of updated TableSchema */
public synchronized TableSchema getUpdatedSchema() {
synchronized TableSchemaAndTimestamp getUpdatedSchema() {
return this.updatedSchema;
}

@@ -818,4 +820,17 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) {
overwhelmedInflightCount = newThreshold;
}
}

@AutoValue
abstract static class TableSchemaAndTimestamp {
// Shows the timestamp updated schema is reported from response
abstract Instant updateTimeStamp();

// The updated schema returned from server.
abstract TableSchema updatedSchema();

static TableSchemaAndTimestamp create(Instant updateTimeStamp, TableSchema updatedSchema) {
return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema);
}
}
}
Original file line number Diff line number Diff line change
@@ -16,12 +16,17 @@
package com.google.cloud.bigquery.storage.v1;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowController;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -33,10 +38,15 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.concurrent.GuardedBy;

/** Pool of connections to accept appends and distirbute to different connections. */
public class ConnectionWorkerPool {
static final Pattern STREAM_NAME_PATTERN =
Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/streams/([^/]+)");

private static final Logger log = Logger.getLogger(ConnectionWorkerPool.class.getName());
/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
@@ -65,6 +75,11 @@ public class ConnectionWorkerPool {
private final Set<ConnectionWorker> connectionWorkerPool =
Collections.synchronizedSet(new HashSet<>());

/*
* Contains the mapping from stream name to updated schema.
*/
private Map<String, TableSchemaAndTimestamp> tableNameToUpdatedSchema = new ConcurrentHashMap<>();

/** Enable test related logic. */
private static boolean enableTesting = false;

@@ -246,7 +261,18 @@ public ApiFuture<AppendRowsResponse> append(
ApiFuture<AppendRowsResponse> responseFuture =
connectionWorker.append(
streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset);
return responseFuture;
return ApiFutures.transform(
responseFuture,
// Add callback for update schema
(response) -> {
if (response.getWriteStream() != "" && response.hasUpdatedSchema()) {
tableNameToUpdatedSchema.put(
response.getWriteStream(),
TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema()));
}
return response;
},
MoreExecutors.directExecutor());
}

/**
@@ -392,6 +418,10 @@ public long getInflightWaitSeconds(StreamWriter streamWriter) {
}
}

TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) {
return tableNameToUpdatedSchema.getOrDefault(streamWriter.getStreamName(), null);
}

/** Enable Test related logic. */
public static void enableTestingLogic() {
enableTesting = true;
@@ -421,4 +451,15 @@ FlowController.LimitExceededBehavior limitExceededBehavior() {
BigQueryWriteClient bigQueryWriteClient() {
return client;
}

static String toTableName(String streamName) {
Matcher matcher = STREAM_NAME_PATTERN.matcher(streamName);
Preconditions.checkArgument(matcher.matches(), "Invalid stream name: %s.", streamName);
return "projects/"
+ matcher.group(1)
+ "/datasets/"
+ matcher.group(2)
+ "/tables/"
+ matcher.group(3);
}
}
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
@@ -186,9 +185,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
// Update schema only work when connection pool is not enabled.
if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
&& this.streamWriter.getUpdatedSchema() != null) {
// Create a new stream writer internally if a new updated schema is reported from backend.
if (this.streamWriter.getUpdatedSchema() != null) {
refreshWriter(this.streamWriter.getUpdatedSchema());
}

Original file line number Diff line number Diff line change
@@ -22,12 +22,14 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
@@ -85,6 +87,9 @@ public class StreamWriter implements AutoCloseable {
private static final Map<ConnectionPoolKey, ConnectionWorkerPool> connectionPoolMap =
new ConcurrentHashMap<>();

/** Creation timestamp of this streamwriter */
private final Instant creationTimestamp;

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
@@ -147,11 +152,11 @@ long getInflightWaitSeconds(StreamWriter streamWriter) {
return connectionWorker().getInflightWaitSeconds();
}

TableSchema getUpdatedSchema() {
TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean this is actually a breaking change? Dataflow will use this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, let's used timestamp on streamwriter when returning schema

if (getKind() == Kind.CONNECTION_WORKER_POOL) {
// TODO(gaole): implement updated schema support for multiplexing.
throw new IllegalStateException("getUpdatedSchema is not implemented for multiplexing.");
return connectionWorkerPool().getUpdatedSchema(streamWriter);
}
// Always populate MIN timestamp to w
return connectionWorker().getUpdatedSchema();
}

@@ -255,6 +260,7 @@ private StreamWriter(Builder builder) throws IOException {
client.close();
}
}
this.creationTimestamp = Instant.now();
}

@VisibleForTesting
@@ -396,9 +402,25 @@ public static StreamWriter.Builder newBuilder(String streamName) {
return new StreamWriter.Builder(streamName);
}

/** Thread-safe getter of updated TableSchema */
/**
* Thread-safe getter of updated TableSchema.
*
* <p>This will return the updated schema only when the creation timestamp of this writer is older
* than the updated schema.
*/
public synchronized TableSchema getUpdatedSchema() {
return singleConnectionOrConnectionPool.getUpdatedSchema();
TableSchemaAndTimestamp tableSchemaAndTimestamp =
singleConnectionOrConnectionPool.getUpdatedSchema(this);
if (tableSchemaAndTimestamp == null) {
return null;
}
return creationTimestamp.compareTo(tableSchemaAndTimestamp.updateTimeStamp()) < 0
? tableSchemaAndTimestamp.updatedSchema()
: null;
}

Instant getCreationTimestamp() {
return creationTimestamp;
}

@VisibleForTesting
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
package com.google.cloud.bigquery.storage.v1;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
@@ -311,6 +312,16 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception {
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(0);
}

@Test
public void testToTableName() {
assertThat(ConnectionWorkerPool.toTableName("projects/p/datasets/d/tables/t/streams/s"))
.isEqualTo("projects/p/datasets/d/tables/t");

IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class, () -> ConnectionWorkerPool.toTableName("projects/p/"));
}

private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
Loading