Skip to content

Commit

Permalink
test: Add e2e test for default value use cases for default stream and…
Browse files Browse the repository at this point in the history
… exclusive stream (#2285)

* chore(main): release 2.41.1 (#2222)

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>

* chore(main): release 2.41.1 (#2222)

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: expose configuration to config the default missing value
interpretation

* Add e2e test for default value test

---------

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 25, 2023
1 parent 12f3fce commit 056bd25
Showing 1 changed file with 139 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import com.google.api.core.ApiFuture;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange;
Expand All @@ -43,6 +45,10 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -63,14 +69,19 @@ public class ITBigQueryWriteManualClientTest {
private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String TABLE2 = "complicatedtable";

private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";

private static BigQueryWriteClient client;
private static TableInfo tableInfo;
private static TableInfo tableInfo2;

private static TableInfo tableInfoEU;

private static TableDefinition defaultValueTableDefinition;
private static String tableId;
private static String tableId2;

private static String tableIdEU;
private static BigQuery bigquery;

Expand Down Expand Up @@ -126,6 +137,24 @@ public static void beforeClass() throws IOException {
.build(),
innerTypeFieldBuilder.setMode(Field.Mode.NULLABLE).build())))
.build();

defaultValueTableDefinition =
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"foo_with_default", LegacySQLTypeName.STRING)
.setDefaultValueExpression("'default_value_for_test'")
.setMode(Field.Mode.NULLABLE)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"bar_without_default", LegacySQLTypeName.STRING)
.setMode(Mode.NULLABLE)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"date_with_default_to_current", LegacySQLTypeName.TIMESTAMP)
.setDefaultValueExpression("CURRENT_TIMESTAMP()")
.setMode(Mode.NULLABLE)
.build()));
bigquery.create(tableInfo);
bigquery.create(tableInfo2);
tableId =
Expand Down Expand Up @@ -706,7 +735,12 @@ public void testJsonStreamWriterWithDefaultStream()
assertEquals(2, currentRow.get(3).getRepeatedValue().size());
assertEquals("Yg==", currentRow.get(3).getRepeatedValue().get(1).getStringValue());
assertEquals(
Timestamp.valueOf("2022-02-06 07:24:47.84").getTime() * 1000,
Timestamp.valueOf("2022-02-06 07:24:47.84")
.toLocalDateTime()
.atZone(ZoneId.of("UTC"))
.toInstant()
.toEpochMilli()
* 1000,
currentRow.get(4).getTimestampValue()); // timestamp long of "2022-02-06 07:24:47.84"
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
Expand All @@ -718,6 +752,110 @@ public void testJsonStreamWriterWithDefaultStream()
}
}

@Test
public void testJsonDefaultStreamOnTableWithDefaultValue_SchemaNotGiven()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException, ParseException {
String tableName = "defaultStreamDefaultValue";
String defaultTableId =
String.format(
"projects/%s/datasets/%s/tables/%s",
ServiceOptions.getDefaultProjectId(), DATASET, tableName);
tableInfo =
TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build();
bigquery.create(tableInfo);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(defaultTableId, client)
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build()) {
testJsonStreamWriterForDefaultValue(jsonStreamWriter);
}
}

@Test
public void testJsonExclusiveStreamOnTableWithDefaultValue_GiveTableSchema()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException, ParseException {
String tableName = "exclusiveStreamDefaultValue";
String exclusiveTableId =
String.format(
"projects/%s/datasets/%s/tables/%s",
ServiceOptions.getDefaultProjectId(), DATASET, tableName);
tableInfo =
TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build();
bigquery.create(tableInfo);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(exclusiveTableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(exclusiveTableId, writeStream.getTableSchema())
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build()) {
testJsonStreamWriterForDefaultValue(jsonStreamWriter);
}
}

private void testJsonStreamWriterForDefaultValue(JsonStreamWriter jsonStreamWriter)
throws DescriptorValidationException, IOException, ExecutionException, InterruptedException,
ParseException {
// 1. row has both fields set.
JSONArray jsonArr1 = new JSONArray();
JSONObject row1 = new JSONObject();
row1.put("foo_with_default", "aaa");
row1.put("bar_without_default", "a");
row1.put("date_with_default_to_current", "2022-02-02 01:02:03");
jsonArr1.put(row1);
// 2. row with the column with default value unset
JSONObject row2 = new JSONObject();
row2.put("bar_without_default", "a");
jsonArr1.put(row2);
// 3. both value not set
JSONObject row3 = new JSONObject();
jsonArr1.put(row3);

// Start insertion and validation.
ApiFuture<AppendRowsResponse> response1 = jsonStreamWriter.append(jsonArr1, -1);
response1.get();
TableResult result =
bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();

FieldValueList currentRow = iter.next();
assertEquals("aaa", currentRow.get(0).getStringValue());
assertEquals("a", currentRow.get(1).getStringValue());
assertEquals(
Timestamp.valueOf("2022-02-02 01:02:03")
.toLocalDateTime()
.atZone(ZoneId.of("UTC"))
.toInstant()
.toEpochMilli(),
Double.valueOf(currentRow.get(2).getStringValue()).longValue() * 1000);

currentRow = iter.next();
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
assertFalse(currentRow.get(2).getStringValue().isEmpty());
assertEquals("a", currentRow.get(1).getStringValue());
// Check whether the recorded value is up to date enough.
Instant parsedInstant =
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));

currentRow = iter.next();
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
assertEquals(null, currentRow.get(1).getValue());
assertFalse(currentRow.get(2).getStringValue().isEmpty());
// Check whether the recorded value is up to date enough.
parsedInstant =
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));

assertEquals(false, iter.hasNext());
}

// This test runs about 1 min.
@Test
public void testJsonStreamWriterWithMessagesOver10M()
Expand Down

0 comments on commit 056bd25

Please sign in to comment.