Skip to content

Commit

Permalink
feat: expose settings to configure default missing value interpretati…
Browse files Browse the repository at this point in the history
…on. (#2230)

* chore(main): release 2.41.1 (#2222)

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>

* chore(main): release 2.41.1 (#2222)

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: expose configuration to config the default missing value
interpretation

---------

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 17, 2023
1 parent df686d6 commit dc5ed73
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
Expand Down Expand Up @@ -388,6 +389,11 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
requestBuilder.setWriteStream(streamWriter.getStreamName());
requestBuilder.putAllMissingValueInterpretations(
streamWriter.getMissingValueInterpretationMap());
if (streamWriter.getDefaultValueInterpretation()
!= MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED) {
requestBuilder.setDefaultMissingValueInterpretation(
streamWriter.getDefaultValueInterpretation());
}
return appendInternal(streamWriter, requestBuilder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,22 @@ public Builder setCompressorName(String compressorName) {
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
*
* <p>If this value is set to `DEFAULT_VALUE`, we will always populate default value if the
* field is missing from json and default value is defined in the column.
*
* <p>If this value is set to `NULL_VALUE`, we will always not populate default value.
*/
public Builder setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
this.schemaAwareStreamWriterBuilder.setDefaultMissingValueInterpretation(
defaultMissingValueInterpretation);
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.Exceptions.RowIndexToErrorException;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -97,6 +98,8 @@ private SchemaAwareStreamWriter(Builder<T> builder)
builder.compressorName);
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
streamWriterBuilder.setLocation(builder.location);
streamWriterBuilder.setDefaultMissingValueInterpretation(
builder.defaultMissingValueInterpretation);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
Expand Down Expand Up @@ -433,6 +436,9 @@ public static final class Builder<T> {
private String location;
private String compressorName;

private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

private static final String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
private static final String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
Expand Down Expand Up @@ -627,6 +633,16 @@ public Builder<T> setCompressorName(String compressorName) {
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
*/
public Builder setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
return this;
}

/**
* Builds SchemaAwareStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
Expand Down Expand Up @@ -90,6 +91,13 @@ public class StreamWriter implements AutoCloseable {
*/
private final String writerId = UUID.randomUUID().toString();

/**
* The default missing value interpretation if the column has default value defined but not
* presented in the missing value map.
*/
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

/**
* Stream can access a single connection or a pool of connection depending on whether multiplexing
* is enabled.
Expand Down Expand Up @@ -201,6 +209,7 @@ public static SingleConnectionOrConnectionPool ofConnectionPool(
private StreamWriter(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation;
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
if (!builder.enableConnectionPool) {
this.location = builder.location;
Expand Down Expand Up @@ -312,6 +321,10 @@ static boolean isDefaultStream(String streamName) {
return streamMatcher.find();
}

AppendRowsRequest.MissingValueInterpretation getDefaultValueInterpretation() {
return defaultMissingValueInterpretation;
}

static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
BigQueryWriteSettings.Builder settingsBuilder = null;
if (builder.client != null) {
Expand Down Expand Up @@ -602,6 +615,10 @@ public static final class Builder {

private String compressorName = null;

// Default missing value interpretation value.
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -729,6 +746,16 @@ public Builder setCompressorName(String compressorName) {
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
*/
public Builder setDefaultMissingValueInterpretation(
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriter build() throws IOException {
return new StreamWriter(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.RepetitionType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode;
Expand All @@ -45,8 +46,10 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand All @@ -64,6 +67,7 @@

@RunWith(JUnit4.class)
public class JsonStreamWriterTest {

private static final int NUMERIC_SCALE = 9;
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
Expand Down Expand Up @@ -514,6 +518,9 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
.getSerializedRows(i),
expectedProto.toByteString());
}
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getDefaultMissingValueInterpretation(),
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED);
}
}

Expand Down Expand Up @@ -1015,6 +1022,79 @@ public void testSchemaUpdateInMultiplexing_singleConnection() throws Exception {
writer2.close();
}

@Test
public void testMissingValueInterpretation_multiplexingCase() throws Exception {
// Set min connection count to be 1 to force sharing connection.
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
// The following two writers have different stream name and schema, but will share the same
// connection .
JsonStreamWriter writer1 =
getTestJsonStreamWriterBuilder(TEST_STREAM)
.setEnableConnectionPool(true)
.setLocation("us")
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build();
JsonStreamWriter writer2 =
getTestJsonStreamWriterBuilder(TEST_STREAM_2)
.setEnableConnectionPool(true)
.setLocation("us")
.setDefaultMissingValueInterpretation(MissingValueInterpretation.NULL_VALUE)
.build();

long appendCountPerStream = 5;
for (int i = 0; i < appendCountPerStream * 4; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
// In total insert append `appendCountPerStream` * 4 requests.
// We insert using the pattern of
// jsonStreamWriter1, jsonStreamWriter1, jsonStreamWriter2, jsonStreamWriter2
for (int i = 0; i < appendCountPerStream; i++) {
ApiFuture<AppendRowsResponse> appendFuture1 = writer1.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture2 = writer1.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture3 = writer2.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture4 = writer2.append(jsonArr);
appendFuture1.get();
appendFuture2.get();
appendFuture3.get();
appendFuture4.get();
}

for (int i = 0; i < appendCountPerStream * 4; i++) {
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
if (i % 4 <= 1) {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.DEFAULT_VALUE);
} else {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.NULL_VALUE);
}
}

writer1.close();
writer2.close();
}

@Test
public void testSchemaUpdateInMultiplexing_multipleWriterForSameStreamName() throws Exception {
// Set min connection count to be 1 to force sharing connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
Expand Down Expand Up @@ -849,6 +850,73 @@ public void testProtoSchemaPiping_multiplexingCase() throws Exception {
appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance());
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);
}
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED);
}

writer1.close();
writer2.close();
}

@Test
public void testDefaultValueInterpretation_multiplexingCase() throws Exception {
// Use the shared connection mode.
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
ProtoSchema schema1 = createProtoSchema("Schema1");
ProtoSchema schema2 = createProtoSchema("Schema2");
StreamWriter writer1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(schema1)
.setLocation("US")
.setEnableConnectionPool(true)
.setMaxInflightRequests(1)
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build();
StreamWriter writer2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(schema2)
.setMaxInflightRequests(1)
.setEnableConnectionPool(true)
.setLocation("US")
.setDefaultMissingValueInterpretation(MissingValueInterpretation.NULL_VALUE)
.build();

long appendCountPerStream = 5;
for (int i = 0; i < appendCountPerStream * 4; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

// In total insert append `appendCountPerStream` * 4 requests.
// We insert using the pattern of streamWriter1, streamWriter1, streamWriter2, streamWriter2
for (int i = 0; i < appendCountPerStream; i++) {
ApiFuture<AppendRowsResponse> appendFuture1 =
writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4);
ApiFuture<AppendRowsResponse> appendFuture2 =
writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 1);
ApiFuture<AppendRowsResponse> appendFuture3 =
writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 2);
ApiFuture<AppendRowsResponse> appendFuture4 =
writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 3);
appendFuture1.get();
appendFuture2.get();
appendFuture3.get();
appendFuture4.get();
}

for (int i = 0; i < appendCountPerStream * 4; i++) {
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
assertEquals(i, appendRowsRequest.getOffset().getValue());
if (i % 4 <= 1) {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.DEFAULT_VALUE);
} else {
assertEquals(
appendRowsRequest.getDefaultMissingValueInterpretation(),
MissingValueInterpretation.NULL_VALUE);
}
}

writer1.close();
Expand Down

0 comments on commit dc5ed73

Please sign in to comment.