From 056bd25fe976c8c7085035a1b7196899cf4a72e7 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Wed, 25 Oct 2023 15:33:54 -0700 Subject: [PATCH] test: Add e2e test for default value use cases for default stream and exclusive stream (#2285) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore(main): release 2.41.1 (#2222) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> * chore(main): release 2.41.1 (#2222) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: expose configuration to config the default missing value interpretation * Add e2e test for default value test --------- Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: Owl Bot --- .../it/ITBigQueryWriteManualClientTest.java | 140 +++++++++++++++++- 1 file changed, 139 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index c29c03d178..d08713de53 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -25,9 +25,11 @@ import com.google.api.core.ApiFuture; import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.Field.Mode; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.storage.test.Test.*; import com.google.cloud.bigquery.storage.v1.*; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists; import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; @@ -43,6 +45,10 @@ import java.io.IOException; import java.math.BigDecimal; import java.sql.Timestamp; +import java.text.ParseException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -63,14 +69,19 @@ public class ITBigQueryWriteManualClientTest { private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName(); private static final String TABLE = "testtable"; private static final String TABLE2 = "complicatedtable"; + private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; private static BigQueryWriteClient client; private static TableInfo tableInfo; private static TableInfo tableInfo2; + private static TableInfo tableInfoEU; + + private static TableDefinition defaultValueTableDefinition; private static String tableId; private static String tableId2; + private static String tableIdEU; private static BigQuery bigquery; @@ -126,6 +137,24 @@ public static void beforeClass() throws IOException { .build(), innerTypeFieldBuilder.setMode(Field.Mode.NULLABLE).build()))) .build(); + + defaultValueTableDefinition = + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder( + "foo_with_default", LegacySQLTypeName.STRING) + .setDefaultValueExpression("'default_value_for_test'") + .setMode(Field.Mode.NULLABLE) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "bar_without_default", LegacySQLTypeName.STRING) + .setMode(Mode.NULLABLE) + .build(), + com.google.cloud.bigquery.Field.newBuilder( + "date_with_default_to_current", LegacySQLTypeName.TIMESTAMP) + .setDefaultValueExpression("CURRENT_TIMESTAMP()") + .setMode(Mode.NULLABLE) + .build())); bigquery.create(tableInfo); bigquery.create(tableInfo2); tableId = @@ -706,7 +735,12 @@ public void testJsonStreamWriterWithDefaultStream() assertEquals(2, currentRow.get(3).getRepeatedValue().size()); assertEquals("Yg==", currentRow.get(3).getRepeatedValue().get(1).getStringValue()); assertEquals( - Timestamp.valueOf("2022-02-06 07:24:47.84").getTime() * 1000, + Timestamp.valueOf("2022-02-06 07:24:47.84") + .toLocalDateTime() + .atZone(ZoneId.of("UTC")) + .toInstant() + .toEpochMilli() + * 1000, currentRow.get(4).getTimestampValue()); // timestamp long of "2022-02-06 07:24:47.84" assertEquals("bbb", iter.next().get(0).getStringValue()); assertEquals("ccc", iter.next().get(0).getStringValue()); @@ -718,6 +752,110 @@ public void testJsonStreamWriterWithDefaultStream() } } + @Test + public void testJsonDefaultStreamOnTableWithDefaultValue_SchemaNotGiven() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException, ParseException { + String tableName = "defaultStreamDefaultValue"; + String defaultTableId = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, tableName); + tableInfo = + TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build(); + bigquery.create(tableInfo); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(defaultTableId, client) + .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE) + .build()) { + testJsonStreamWriterForDefaultValue(jsonStreamWriter); + } + } + + @Test + public void testJsonExclusiveStreamOnTableWithDefaultValue_GiveTableSchema() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException, ParseException { + String tableName = "exclusiveStreamDefaultValue"; + String exclusiveTableId = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, tableName); + tableInfo = + TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build(); + bigquery.create(tableInfo); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(exclusiveTableId) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (JsonStreamWriter jsonStreamWriter = + JsonStreamWriter.newBuilder(exclusiveTableId, writeStream.getTableSchema()) + .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE) + .build()) { + testJsonStreamWriterForDefaultValue(jsonStreamWriter); + } + } + + private void testJsonStreamWriterForDefaultValue(JsonStreamWriter jsonStreamWriter) + throws DescriptorValidationException, IOException, ExecutionException, InterruptedException, + ParseException { + // 1. row has both fields set. + JSONArray jsonArr1 = new JSONArray(); + JSONObject row1 = new JSONObject(); + row1.put("foo_with_default", "aaa"); + row1.put("bar_without_default", "a"); + row1.put("date_with_default_to_current", "2022-02-02 01:02:03"); + jsonArr1.put(row1); + // 2. row with the column with default value unset + JSONObject row2 = new JSONObject(); + row2.put("bar_without_default", "a"); + jsonArr1.put(row2); + // 3. both value not set + JSONObject row3 = new JSONObject(); + jsonArr1.put(row3); + + // Start insertion and validation. + ApiFuture response1 = jsonStreamWriter.append(jsonArr1, -1); + response1.get(); + TableResult result = + bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + + FieldValueList currentRow = iter.next(); + assertEquals("aaa", currentRow.get(0).getStringValue()); + assertEquals("a", currentRow.get(1).getStringValue()); + assertEquals( + Timestamp.valueOf("2022-02-02 01:02:03") + .toLocalDateTime() + .atZone(ZoneId.of("UTC")) + .toInstant() + .toEpochMilli(), + Double.valueOf(currentRow.get(2).getStringValue()).longValue() * 1000); + + currentRow = iter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertFalse(currentRow.get(2).getStringValue().isEmpty()); + assertEquals("a", currentRow.get(1).getStringValue()); + // Check whether the recorded value is up to date enough. + Instant parsedInstant = + Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue()); + assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS))); + + currentRow = iter.next(); + assertEquals("default_value_for_test", currentRow.get(0).getStringValue()); + assertEquals(null, currentRow.get(1).getValue()); + assertFalse(currentRow.get(2).getStringValue().isEmpty()); + // Check whether the recorded value is up to date enough. + parsedInstant = + Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue()); + assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS))); + + assertEquals(false, iter.hasNext()); + } + // This test runs about 1 min. @Test public void testJsonStreamWriterWithMessagesOver10M()