diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index 424b0b6fe9..080a8c33f3 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -65,4 +65,15 @@ com/google/cloud/bigquery/storage/v1/Exceptions$AppendSerializtionError Exceptions$AppendSerializtionError(java.lang.String, java.util.Map) + + 7006 + com/google/cloud/bigquery/storage/v1/ConnectionWorker + com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema() + com.google.cloud.bigquery.storage.v1.ConnectionWorker$TableSchemaAndTimestamp + + + 7009 + com/google/cloud/bigquery/storage/v1/ConnectionWorker + com.google.cloud.bigquery.storage.v1.TableSchema getUpdatedSchema() + diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 529beff3a2..aacde39430 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -152,6 +152,12 @@ junit test + + com.google.http-client + google-http-client + 1.42.3 + test + com.google.truth truth diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index aef972470a..32a8c948e0 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -31,6 +31,7 @@ import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.time.Instant; import java.util.Comparator; import java.util.Deque; import java.util.HashMap; @@ -159,7 +160,7 @@ public class ConnectionWorker implements AutoCloseable { * Contains the updated TableSchema. */ @GuardedBy("lock") - private TableSchema updatedSchema; + private TableSchemaAndTimestamp updatedSchema; /* * A client used to interact with BigQuery. @@ -608,7 +609,8 @@ private void requestCallback(AppendRowsResponse response) { AppendRequestAndResponse requestWrapper; this.lock.lock(); if (response.hasUpdatedSchema()) { - this.updatedSchema = response.getUpdatedSchema(); + this.updatedSchema = + TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema()); } try { // Had a successful connection with at least one result, reset retries. @@ -720,7 +722,7 @@ private AppendRequestAndResponse pollInflightRequestQueue() { } /** Thread-safe getter of updated TableSchema */ - public synchronized TableSchema getUpdatedSchema() { + synchronized TableSchemaAndTimestamp getUpdatedSchema() { return this.updatedSchema; } @@ -818,4 +820,17 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) { overwhelmedInflightCount = newThreshold; } } + + @AutoValue + abstract static class TableSchemaAndTimestamp { + // Shows the timestamp updated schema is reported from response + abstract Instant updateTimeStamp(); + + // The updated schema returned from server. + abstract TableSchema updatedSchema(); + + static TableSchemaAndTimestamp create(Instant updateTimeStamp, TableSchema updatedSchema) { + return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema); + } + } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index fae883b131..dea49b62db 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -16,12 +16,17 @@ package com.google.cloud.bigquery.storage.v1; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.gax.batching.FlowController; import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; +import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; +import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -33,10 +38,15 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.annotation.concurrent.GuardedBy; /** Pool of connections to accept appends and distirbute to different connections. */ public class ConnectionWorkerPool { + static final Pattern STREAM_NAME_PATTERN = + Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/streams/([^/]+)"); + private static final Logger log = Logger.getLogger(ConnectionWorkerPool.class.getName()); /* * Max allowed inflight requests in the stream. Method append is blocked at this. @@ -65,6 +75,11 @@ public class ConnectionWorkerPool { private final Set connectionWorkerPool = Collections.synchronizedSet(new HashSet<>()); + /* + * Contains the mapping from stream name to updated schema. + */ + private Map tableNameToUpdatedSchema = new ConcurrentHashMap<>(); + /** Enable test related logic. */ private static boolean enableTesting = false; @@ -246,7 +261,18 @@ public ApiFuture append( ApiFuture responseFuture = connectionWorker.append( streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset); - return responseFuture; + return ApiFutures.transform( + responseFuture, + // Add callback for update schema + (response) -> { + if (response.getWriteStream() != "" && response.hasUpdatedSchema()) { + tableNameToUpdatedSchema.put( + response.getWriteStream(), + TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema())); + } + return response; + }, + MoreExecutors.directExecutor()); } /** @@ -392,6 +418,10 @@ public long getInflightWaitSeconds(StreamWriter streamWriter) { } } + TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) { + return tableNameToUpdatedSchema.getOrDefault(streamWriter.getStreamName(), null); + } + /** Enable Test related logic. */ public static void enableTestingLogic() { enableTesting = true; @@ -421,4 +451,15 @@ FlowController.LimitExceededBehavior limitExceededBehavior() { BigQueryWriteClient bigQueryWriteClient() { return client; } + + static String toTableName(String streamName) { + Matcher matcher = STREAM_NAME_PATTERN.matcher(streamName); + Preconditions.checkArgument(matcher.matches(), "Invalid stream name: %s.", streamName); + return "projects/" + + matcher.group(1) + + "/datasets/" + + matcher.group(2) + + "/tables/" + + matcher.group(3); + } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 77ae006eed..6380af4fc6 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -21,7 +21,6 @@ import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; -import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; @@ -186,9 +185,8 @@ public ApiFuture append(JSONArray jsonArr, long offset) throws IOException, DescriptorValidationException { // Handle schema updates in a Thread-safe way by locking down the operation synchronized (this) { - // Update schema only work when connection pool is not enabled. - if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER - && this.streamWriter.getUpdatedSchema() != null) { + // Create a new stream writer internally if a new updated schema is reported from backend. + if (this.streamWriter.getUpdatedSchema() != null) { refreshWriter(this.streamWriter.getUpdatedSchema()); } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index d51c5d669c..744839f3db 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -22,12 +22,14 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auto.value.AutoOneOf; import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.time.Instant; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -85,6 +87,9 @@ public class StreamWriter implements AutoCloseable { private static final Map connectionPoolMap = new ConcurrentHashMap<>(); + /** Creation timestamp of this streamwriter */ + private final Instant creationTimestamp; + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -147,11 +152,11 @@ long getInflightWaitSeconds(StreamWriter streamWriter) { return connectionWorker().getInflightWaitSeconds(); } - TableSchema getUpdatedSchema() { + TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) { if (getKind() == Kind.CONNECTION_WORKER_POOL) { - // TODO(gaole): implement updated schema support for multiplexing. - throw new IllegalStateException("getUpdatedSchema is not implemented for multiplexing."); + return connectionWorkerPool().getUpdatedSchema(streamWriter); } + // Always populate MIN timestamp to w return connectionWorker().getUpdatedSchema(); } @@ -255,6 +260,7 @@ private StreamWriter(Builder builder) throws IOException { client.close(); } } + this.creationTimestamp = Instant.now(); } @VisibleForTesting @@ -396,9 +402,25 @@ public static StreamWriter.Builder newBuilder(String streamName) { return new StreamWriter.Builder(streamName); } - /** Thread-safe getter of updated TableSchema */ + /** + * Thread-safe getter of updated TableSchema. + * + *

This will return the updated schema only when the creation timestamp of this writer is older + * than the updated schema. + */ public synchronized TableSchema getUpdatedSchema() { - return singleConnectionOrConnectionPool.getUpdatedSchema(); + TableSchemaAndTimestamp tableSchemaAndTimestamp = + singleConnectionOrConnectionPool.getUpdatedSchema(this); + if (tableSchemaAndTimestamp == null) { + return null; + } + return creationTimestamp.compareTo(tableSchemaAndTimestamp.updateTimeStamp()) < 0 + ? tableSchemaAndTimestamp.updatedSchema() + : null; + } + + Instant getCreationTimestamp() { + return creationTimestamp; } @VisibleForTesting diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index cba5bf3fe6..08543f539d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -16,6 +16,7 @@ package com.google.cloud.bigquery.storage.v1; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; @@ -311,6 +312,16 @@ public void testMultiStreamAppend_appendWhileClosing() throws Exception { assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(0); } + @Test + public void testToTableName() { + assertThat(ConnectionWorkerPool.toTableName("projects/p/datasets/d/tables/t/streams/s")) + .isEqualTo("projects/p/datasets/d/tables/t"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, () -> ConnectionWorkerPool.toTableName("projects/p/")); + } + private AppendRowsResponse createAppendResponse(long offset) { return AppendRowsResponse.newBuilder() .setAppendResult( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index d4c5614e3e..258a287a1c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import com.google.api.client.util.Sleeper; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; @@ -33,6 +34,7 @@ import com.google.cloud.bigquery.storage.test.Test.FlexibleType; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; +import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode; import com.google.protobuf.Descriptors.DescriptorValidationException; @@ -62,6 +64,7 @@ public class JsonStreamWriterTest { private static final Logger LOG = Logger.getLogger(JsonStreamWriterTest.class.getName()); private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2"; private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); @@ -77,8 +80,6 @@ public class JsonStreamWriterTest { .setMode(TableFieldSchema.Mode.NULLABLE) .setName("foo") .build(); - private final TableSchema TABLE_SCHEMA = TableSchema.newBuilder().addFields(0, FOO).build(); - private final TableFieldSchema BAR = TableFieldSchema.newBuilder() .setType(TableFieldSchema.Type.STRING) @@ -91,10 +92,24 @@ public class JsonStreamWriterTest { .setMode(TableFieldSchema.Mode.NULLABLE) .setName("baz") .build(); + + private final TableSchema TABLE_SCHEMA = TableSchema.newBuilder().addFields(0, FOO).build(); + private final TableSchema TABLE_SCHEMA_2 = TableSchema.newBuilder().addFields(0, BAZ).build(); + private final TableSchema UPDATED_TABLE_SCHEMA = TableSchema.newBuilder().addFields(0, FOO).addFields(1, BAR).build(); private final TableSchema UPDATED_TABLE_SCHEMA_2 = TableSchema.newBuilder().addFields(0, FOO).addFields(1, BAR).addFields(2, BAZ).build(); + private final ProtoSchema PROTO_SCHEMA = + ProtoSchemaConverter.convert( + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(TABLE_SCHEMA)); + private final ProtoSchema PROTO_SCHEMA_2 = + ProtoSchemaConverter.convert( + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(TABLE_SCHEMA_2)); + private final ProtoSchema UPDATED_PROTO_SCHEMA = + ProtoSchemaConverter.convert( + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor( + UPDATED_TABLE_SCHEMA)); private final TableFieldSchema TEST_INT = TableFieldSchema.newBuilder() @@ -109,6 +124,8 @@ public class JsonStreamWriterTest { .setName("test_string") .build(); + public JsonStreamWriterTest() throws DescriptorValidationException {} + @Before public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); @@ -128,6 +145,7 @@ public void setUp() throws Exception { Instant time = Instant.now(); Timestamp timestamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); + StreamWriter.cleanUp(); } @After @@ -518,21 +536,9 @@ public void testSimpleSchemaUpdate() throws Exception { AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) - .build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + testBigQueryWrite.addResponse(createAppendResponse(2)); + testBigQueryWrite.addResponse(createAppendResponse(3)); // First append JSONObject foo = new JSONObject(); foo.put("foo", "aaa"); @@ -687,6 +693,252 @@ public void testWithoutIgnoreUnknownFieldsUpdateSecondSuccess() throws Exception } } + @Test + public void testSchemaUpdateInMultiplexing_singleConnection() throws Exception { + // Set min connection count to be 1 to force sharing connection. + ConnectionWorkerPool.setOptions( + Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + // The following two writers have different stream name and schema, but will share the same + // connection . + JsonStreamWriter writer1 = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + JsonStreamWriter writer2 = + getTestJsonStreamWriterBuilder(TEST_STREAM_2, TABLE_SCHEMA_2) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .setWriteStream(TEST_STREAM) + .build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + testBigQueryWrite.addResponse(createAppendResponse(2)); + testBigQueryWrite.addResponse(createAppendResponse(3)); + // Append request with old schema for writer 1. + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + // Append request with old schema for writer 2. + JSONObject baz = new JSONObject(); + baz.put("baz", "bbb"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(baz); + + // Append request with new schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "aaa"); + updatedFoo.put("bar", "bbb"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + + // This append will trigger new schema update. + ApiFuture appendFuture1 = writer1.append(jsonArr); + // This append be put onto the same connection as the first one. + ApiFuture appendFuture2 = writer2.append(jsonArr2); + + // Sleep for a small period of time to make sure the updated schema is stored. + Sleeper.DEFAULT.sleep(300); + // Back to writer1 here, we are expected to use the updated schema already. + // Both of the following append will be parsed correctly. + ApiFuture appendFuture3 = writer1.append(updatedJsonArr); + ApiFuture appendFuture4 = writer1.append(jsonArr); + + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); + + // The 1st schema comes from writer1's initial schema + assertEquals( + testBigQueryWrite.getAppendRequests().get(0).getProtoRows().getWriterSchema(), + PROTO_SCHEMA); + // The 2nd schema comes from writer2's initial schema + assertEquals( + testBigQueryWrite.getAppendRequests().get(1).getProtoRows().getWriterSchema(), + PROTO_SCHEMA_2); + // The 3rd schema comes from writer1's updated schema + assertEquals( + testBigQueryWrite.getAppendRequests().get(2).getProtoRows().getWriterSchema(), + UPDATED_PROTO_SCHEMA); + // The 4th schema should be empty as schema update is already done for writer 1. + assertEquals( + testBigQueryWrite.getAppendRequests().get(3).getProtoRows().getWriterSchema(), + ProtoSchema.getDefaultInstance()); + writer1.close(); + writer2.close(); + } + + @Test + public void testSchemaUpdateInMultiplexing_multipleWriterForSameStreamName() throws Exception { + // Set min connection count to be 1 to force sharing connection. + ConnectionWorkerPool.setOptions( + Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + + // Create two writers writing to the same stream. + JsonStreamWriter writer1 = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + JsonStreamWriter writer2 = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + + // Trigger schema update in the second request. + testBigQueryWrite.addResponse(createAppendResponse(0)); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .setWriteStream(TEST_STREAM) + .build()); + testBigQueryWrite.addResponse(createAppendResponse(2)); + testBigQueryWrite.addResponse(createAppendResponse(3)); + // Append request with old schema. + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + // Append request with new schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "aaa"); + updatedFoo.put("bar", "bbb"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + + // Normal append, nothing happens + ApiFuture appendFuture1 = writer1.append(jsonArr); + // This append triggers updated schema + ApiFuture appendFuture2 = writer2.append(jsonArr); + + // Sleep for a small period of time to make sure the updated schema is stored. + Sleeper.DEFAULT.sleep(300); + // From now on everyone should be able to use the new schema. + ApiFuture appendFuture3 = writer1.append(updatedJsonArr); + ApiFuture appendFuture4 = writer2.append(updatedJsonArr); + + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); + + // The 1st schema comes from writer1's initial schema + assertEquals( + testBigQueryWrite.getAppendRequests().get(0).getProtoRows().getWriterSchema(), + PROTO_SCHEMA); + // The 2nd append trigger no schema change. + assertEquals( + testBigQueryWrite.getAppendRequests().get(1).getProtoRows().getWriterSchema(), + ProtoSchema.getDefaultInstance()); + assertEquals( + testBigQueryWrite.getAppendRequests().get(2).getProtoRows().getWriterSchema(), + UPDATED_PROTO_SCHEMA); + // The next request after schema update will back to empty. + assertEquals( + testBigQueryWrite.getAppendRequests().get(3).getProtoRows().getWriterSchema(), + ProtoSchema.getDefaultInstance()); + writer1.close(); + writer2.close(); + } + + @Test + public void testSchemaUpdateInMultiplexing_IgnoreUpdateIfTimeStampNewer() throws Exception { + // Set min connection count to be 1 to force sharing connection. + ConnectionWorkerPool.setOptions( + Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + // The following two writers have different stream name and schema, but will share the same + // connection . + JsonStreamWriter writer1 = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .setWriteStream(TEST_STREAM) + .build()); + testBigQueryWrite.addResponse(createAppendResponse(1)); + testBigQueryWrite.addResponse(createAppendResponse(2)); + testBigQueryWrite.addResponse(createAppendResponse(3)); + // Append request with old schema for writer 1. + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + + // Append request with old schema for writer 2. + JSONObject baz = new JSONObject(); + baz.put("baz", "bbb"); + JSONArray jsonArr2 = new JSONArray(); + jsonArr2.put(baz); + + // Append request with new schema. + JSONObject updatedFoo = new JSONObject(); + updatedFoo.put("foo", "aaa"); + updatedFoo.put("bar", "bbb"); + JSONArray updatedJsonArr = new JSONArray(); + updatedJsonArr.put(updatedFoo); + + // This append will trigger new schema update. + ApiFuture appendFuture1 = writer1.append(jsonArr); + // Sleep for a small period of time to make sure the updated schema is stored. + Sleeper.DEFAULT.sleep(300); + // Write to writer 1 again, new schema should be used. + // The following two append will succeeds. + ApiFuture appendFuture2 = writer1.append(updatedJsonArr); + ApiFuture appendFuture3 = writer1.append(jsonArr); + + // Second phase of the test: create another writer. + // Expect the append went through without using the updated schema + JsonStreamWriter writer2 = + getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA_2) + .setEnableConnectionPool(true) + .setLocation("us") + .build(); + ApiFuture appendFuture4 = writer2.append(jsonArr2); + + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); + + // The 1st schema comes from writer1's initial schema + assertEquals( + testBigQueryWrite.getAppendRequests().get(0).getProtoRows().getWriterSchema(), + PROTO_SCHEMA); + // The 2nd schema comes from updated schema + assertEquals( + testBigQueryWrite.getAppendRequests().get(1).getProtoRows().getWriterSchema(), + UPDATED_PROTO_SCHEMA); + // No new schema. + assertEquals( + testBigQueryWrite.getAppendRequests().get(2).getProtoRows().getWriterSchema(), + ProtoSchema.getDefaultInstance()); + // The 4th schema come from the + assertEquals( + testBigQueryWrite.getAppendRequests().get(3).getProtoRows().getWriterSchema(), + PROTO_SCHEMA_2); + writer1.close(); + writer2.close(); + } + @Test public void testWithoutIgnoreUnknownFieldsUpdateFail() throws Exception { TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); @@ -886,4 +1138,11 @@ public void testWriterId() Assert.assertFalse(writer2.getWriterId().isEmpty()); Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId()); } + + private AppendRowsResponse createAppendResponse(long offset) { + return AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build()) + .build(); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 851abc9a49..134b438593 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -36,6 +36,7 @@ import com.google.protobuf.Any; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -68,6 +69,31 @@ public class StreamWriterTest { private FakeBigQueryWrite testBigQueryWrite; private static MockServiceHelper serviceHelper; private BigQueryWriteClient client; + private final TableFieldSchema FOO = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("foo") + .build(); + private final TableFieldSchema BAR = + TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setName("bar") + .build(); + private final TableSchema TABLE_SCHEMA = TableSchema.newBuilder().addFields(0, FOO).build(); + private final ProtoSchema PROTO_SCHEMA = + ProtoSchemaConverter.convert( + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(TABLE_SCHEMA)); + + private final TableSchema UPDATED_TABLE_SCHEMA = + TableSchema.newBuilder().addFields(0, FOO).addFields(1, BAR).build(); + private final ProtoSchema UPDATED_PROTO_SCHEMA = + ProtoSchemaConverter.convert( + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor( + UPDATED_TABLE_SCHEMA)); + + public StreamWriterTest() throws DescriptorValidationException {} @Before public void setUp() throws Exception { @@ -251,6 +277,52 @@ public void testAppendSuccess() throws Exception { writer.close(); } + @Test + public void testUpdatedSchemaFetch_multiplexing() throws Exception { + testUpdatedSchemaFetch(/*enableMultiplexing=*/ true); + } + + @Test + public void testUpdatedSchemaFetch_nonMultiplexing() throws Exception { + testUpdatedSchemaFetch(/*enableMultiplexing=*/ false); + } + + private void testUpdatedSchemaFetch(boolean enableMultiplexing) + throws IOException, ExecutionException, InterruptedException { + StreamWriter writer = + StreamWriter.newBuilder(TEST_STREAM_1) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setChannelProvider(serviceHelper.createChannelProvider()) + .setWriterSchema(PROTO_SCHEMA) + .setEnableConnectionPool(enableMultiplexing) + .setLocation("us") + .build(); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .setUpdatedSchema(UPDATED_TABLE_SCHEMA) + .setWriteStream(TEST_STREAM_1) + .build()); + + assertEquals(writer.getUpdatedSchema(), null); + AppendRowsResponse response = + writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0).get(); + assertEquals(writer.getUpdatedSchema(), UPDATED_TABLE_SCHEMA); + + // Create another writer, although it's the same stream name but the time stamp is newer, thus + // the old updated schema won't get returned. + StreamWriter writer2 = + StreamWriter.newBuilder(TEST_STREAM_1) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setChannelProvider(serviceHelper.createChannelProvider()) + .setWriterSchema(PROTO_SCHEMA) + .setEnableConnectionPool(enableMultiplexing) + .setLocation("us") + .build(); + assertEquals(writer2.getUpdatedSchema(), null); + } + @Test public void testNoSchema() throws Exception { StatusRuntimeException ex =