From 7a581195eeaab802e9d917ffb4ae57aa5cea9979 Mon Sep 17 00:00:00 2001 From: prasar-ashutosh Date: Tue, 24 Sep 2024 14:46:17 +0530 Subject: [PATCH] UDF changes for Supporting Sql based digest calculation --- .../digest/DigestGenerationHandler.java | 1 + .../UDFBasedDigestGenStrategyAbstract.java | 3 + .../logicalplan/values/DigestUdfAbstract.java | 3 + .../logicalplan/values/FunctionName.java | 1 + .../ansi/sql/visitors/DigestUdfVisitor.java | 33 +- .../sql/visitor/DigestUdfVisitor.java | 9 + .../e2e/AppendOnlyExecutorTest.java | 8 +- .../e2e/AppendOnlyGeneratorTest.java | 8 +- .../components/e2e/BulkLoadExecutorTest.java | 8 +- .../components/e2e/BulkLoadGeneratorTest.java | 8 +- .../components/ingestmode/AppendOnlyTest.java | 6 +- .../components/ingestmode/BulkLoadTest.java | 21 +- .../relational/h2/H2DigestUtil.java | 10 +- .../h2/sql/visitor/DigestUdfVisitor.java | 15 +- .../components/H2DigestUtilTest.java | 5 +- .../ingestmode/bulkload/BulkLoadTest.java | 6 +- .../nontemporal/AppendOnlyTest.java | 6 +- .../components/ingestmode/AppendOnlyTest.java | 6 +- .../components/ingestmode/BulkLoadTest.java | 430 ++++++++++-------- 19 files changed, 343 insertions(+), 244 deletions(-) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/digest/DigestGenerationHandler.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/digest/DigestGenerationHandler.java index 17bec00df1d..9ead1cc2b6d 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/digest/DigestGenerationHandler.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/digest/DigestGenerationHandler.java @@ -106,6 +106,7 @@ else if (value instanceof StagedFilesFieldValue) .addAllValues(filteredStagingFieldValues) .addAllFieldTypes(filteredStagingFieldTypes) .putAllTypeConversionUdfNames(udfBasedDigestGenStrategy.typeConversionUdfNames()) + .columnNameValueConcatUdfName(udfBasedDigestGenStrategy.columnNameValueConcatUdfName()) .build(); String digestField = udfBasedDigestGenStrategy.digestField(); diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/digest/UDFBasedDigestGenStrategyAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/digest/UDFBasedDigestGenStrategyAbstract.java index 5af41915ffe..94ed31135ee 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/digest/UDFBasedDigestGenStrategyAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/ingestmode/digest/UDFBasedDigestGenStrategyAbstract.java @@ -18,6 +18,7 @@ import org.immutables.value.Value; import java.util.Map; +import java.util.Optional; import java.util.Set; @Value.Immutable @@ -38,6 +39,8 @@ public interface UDFBasedDigestGenStrategyAbstract extends DigestGenStrategy Map typeConversionUdfNames(); + Optional columnNameValueConcatUdfName(); + @Override default T accept(DigestGenStrategyVisitor visitor) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/DigestUdfAbstract.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/DigestUdfAbstract.java index 611eed08165..b375ed47f60 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/DigestUdfAbstract.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/DigestUdfAbstract.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; @org.immutables.value.Value.Immutable @org.immutables.value.Value.Style( @@ -40,4 +41,6 @@ public interface DigestUdfAbstract extends Value List fieldTypes(); Map typeConversionUdfNames(); + + Optional columnNameValueConcatUdfName(); } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/FunctionName.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/FunctionName.java index bb0523793c3..2f3c045d1a1 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/FunctionName.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/logicalplan/values/FunctionName.java @@ -44,5 +44,6 @@ public enum FunctionName TO_JSON, CONVERT, STRUCT, + CONCAT, APPROX_COUNT_DISTINCT; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/DigestUdfVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/DigestUdfVisitor.java index 9e65b0fd529..b6b6a2f731c 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/DigestUdfVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-ansi/src/main/java/org/finos/legend/engine/persistence/components/relational/ansi/sql/visitors/DigestUdfVisitor.java @@ -16,10 +16,7 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; -import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf; -import org.finos.legend.engine.persistence.components.logicalplan.values.StringValue; -import org.finos.legend.engine.persistence.components.logicalplan.values.ToArrayFunction; -import org.finos.legend.engine.persistence.components.logicalplan.values.Value; +import org.finos.legend.engine.persistence.components.logicalplan.values.*; import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode; import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.values.Udf; import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor; @@ -38,22 +35,34 @@ public VisitorResult visit(PhysicalPlanNode prev, DigestUdf current, VisitorCont { Udf udf = new Udf(context.quoteIdentifier(), current.udfName()); prev.push(udf); - List columnNameList = new ArrayList<>(); - List columnValueList = new ArrayList<>(); + List columns = new ArrayList<>(); for (int i = 0; i < current.values().size(); i++) { - columnNameList.add(StringValue.of(current.fieldNames().get(i))); - columnValueList.add(getColumnValueAsStringType(current.values().get(i), current.fieldTypes().get(i), current.typeConversionUdfNames())); + Value columnName = StringValue.of(current.fieldNames().get(i)); + Value columnValue = getColumnValueAsStringType(current.values().get(i), current.fieldTypes().get(i), current.typeConversionUdfNames()); + if (current.columnNameValueConcatUdfName().isPresent()) + { + columns.add(org.finos.legend.engine.persistence.components.logicalplan.values.Udf.builder().udfName(current.columnNameValueConcatUdfName().get()).addParameters(columnName, columnValue).build()); + } + else + { + columns.add(columnName); + columns.add(columnValue); + } } - ToArrayFunction toArrayColumnNames = ToArrayFunction.builder().addAllValues(columnNameList).build(); - ToArrayFunction toArrayColumnValues = ToArrayFunction.builder().addAllValues(columnValueList).build(); + Value mergeFunction = mergeColumnsFunction(columns); + return new VisitorResult(udf, Arrays.asList(mergeFunction)); + } - return new VisitorResult(udf, Arrays.asList(toArrayColumnNames, toArrayColumnValues)); + protected Value mergeColumnsFunction(List columns) + { + FunctionImpl concatFunction = FunctionImpl.builder().functionName(FunctionName.CONCAT).addAllValue(columns).build(); + return concatFunction; } protected Value getColumnValueAsStringType(Value value, FieldType dataType, Map typeConversionUdfNames) { throw new IllegalStateException("UDF is unsupported in ANSI sink"); } -} +} \ No newline at end of file diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/DigestUdfVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/DigestUdfVisitor.java index 05f474e6e13..b52db965dfc 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/DigestUdfVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/DigestUdfVisitor.java @@ -16,8 +16,10 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; +import org.finos.legend.engine.persistence.components.logicalplan.values.ToArrayFunction; import org.finos.legend.engine.persistence.components.logicalplan.values.Value; +import java.util.List; import java.util.Map; public class DigestUdfVisitor extends org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.DigestUdfVisitor @@ -35,4 +37,11 @@ protected Value getColumnValueAsStringType(Value value, FieldType dataType, Map< return value; } } + + @Override + protected Value mergeColumnsFunction(List columns) + { + ToArrayFunction toArrayFunction = ToArrayFunction.builder().addAllValues(columns).build(); + return toArrayFunction; + } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/AppendOnlyExecutorTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/AppendOnlyExecutorTest.java index a7f907ed045..7243bf809c4 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/AppendOnlyExecutorTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/AppendOnlyExecutorTest.java @@ -142,16 +142,16 @@ public void testMilestoningWithDigestGeneration() throws IOException, Interrupte "AS (\n" + " CAST(value AS STRING)\n" + ");\n"); - runQuery("CREATE FUNCTION demo.stringifyArr(arr1 ARRAY, arr2 ARRAY)\n" + + runQuery("CREATE FUNCTION demo.stringifyArr(args ARRAY)\n" + " RETURNS STRING\n" + " LANGUAGE js AS \"\"\"\n" + " let output = \"\"; \n" + - " for (const [index, element] of arr1.entries()) { output += arr1[index]; output += arr2[index]; }\n" + + " for (const [index, element] of args.entries()) { output += args[index]; }\n" + " return output;\n" + " \"\"\"; \n"); - runQuery("CREATE FUNCTION demo.LAKEHOUSE_MD5(arr1 ARRAY, arr2 ARRAY)\n" + + runQuery("CREATE FUNCTION demo.LAKEHOUSE_MD5(args ARRAY)\n" + "AS (\n" + - " TO_HEX(MD5(demo.stringifyArr(arr1, arr2)))\n" + + " TO_HEX(MD5(demo.stringifyArr(args)))\n" + ");\n"); // Pass 1 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/AppendOnlyGeneratorTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/AppendOnlyGeneratorTest.java index cf2f4ae2eba..d80f5f7fbad 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/AppendOnlyGeneratorTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/AppendOnlyGeneratorTest.java @@ -111,16 +111,16 @@ public void testMilestoningWithDigestGenerationWithFieldsToExclude() throws IOEx "AS (\n" + " CAST(value AS STRING)\n" + ");\n"); - runQuery("CREATE FUNCTION demo.stringifyArr(arr1 ARRAY, arr2 ARRAY)\n" + + runQuery("CREATE FUNCTION demo.stringifyArr(args ARRAY)\n" + " RETURNS STRING\n" + " LANGUAGE js AS \"\"\"\n" + " let output = \"\"; \n" + - " for (const [index, element] of arr1.entries()) { output += arr1[index]; output += arr2[index]; }\n" + + " for (const [index, element] of args.entries()) { output += args[index]; }\n" + " return output;\n" + " \"\"\"; \n"); - runQuery("CREATE FUNCTION demo.LAKEHOUSE_MD5(arr1 ARRAY, arr2 ARRAY)\n" + + runQuery("CREATE FUNCTION demo.LAKEHOUSE_MD5(args ARRAY)\n" + "AS (\n" + - " TO_HEX(MD5(demo.stringifyArr(arr1, arr2)))\n" + + " TO_HEX(MD5(demo.stringifyArr(args)))\n" + ");\n"); // Pass 1 diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java index b5ed31c45de..01c974ae7b2 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadExecutorTest.java @@ -238,16 +238,16 @@ public void testMilestoningWithUdfBasedDigestGenerationWithFieldsToExclude() thr "AS (\n" + " CAST(value AS STRING)\n" + ");\n"); - runQuery("CREATE FUNCTION demo.stringifyArr(arr1 ARRAY, arr2 ARRAY)\n" + + runQuery("CREATE FUNCTION demo.stringifyArr(args ARRAY)\n" + " RETURNS STRING\n" + " LANGUAGE js AS \"\"\"\n" + " let output = \"\"; \n" + - " for (const [index, element] of arr1.entries()) { output += arr1[index]; output += arr2[index]; }\n" + + " for (const [index, element] of args.entries()) { output += args[index]; }\n" + " return output;\n" + " \"\"\"; \n"); - runQuery("CREATE FUNCTION demo.LAKEHOUSE_MD5(arr1 ARRAY, arr2 ARRAY)\n" + + runQuery("CREATE FUNCTION demo.LAKEHOUSE_MD5(args ARRAY)\n" + "AS (\n" + - " TO_HEX(MD5(demo.stringifyArr(arr1, arr2)))\n" + + " TO_HEX(MD5(demo.stringifyArr(args)))\n" + ");\n"); RelationalIngestor ingestor = RelationalIngestor.builder() diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadGeneratorTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadGeneratorTest.java index 9c4363c3fea..b2e74e65345 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadGeneratorTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/e2e/BulkLoadGeneratorTest.java @@ -193,16 +193,16 @@ public void testMilestoningWithDigestGeneration() throws IOException, Interrupte "AS (\n" + " CAST(value AS STRING)\n" + ");\n"); - runQuery("CREATE FUNCTION demo.stringifyArr(arr1 ARRAY, arr2 ARRAY)\n" + + runQuery("CREATE FUNCTION demo.stringifyArr(args ARRAY)\n" + " RETURNS STRING\n" + " LANGUAGE js AS \"\"\"\n" + " let output = \"\"; \n" + - " for (const [index, element] of arr1.entries()) { output += arr1[index]; output += arr2[index]; }\n" + + " for (const [index, element] of args.entries()) { output += args[index]; }\n" + " return output;\n" + " \"\"\"; \n"); - runQuery("CREATE FUNCTION demo.LAKEHOUSE_MD5(arr1 ARRAY, arr2 ARRAY)\n" + + runQuery("CREATE FUNCTION demo.LAKEHOUSE_MD5(args ARRAY)\n" + "AS (\n" + - " TO_HEX(MD5(demo.stringifyArr(arr1, arr2)))\n" + + " TO_HEX(MD5(demo.stringifyArr(args)))\n" + ");\n"); RelationalGenerator generator = RelationalGenerator.builder() diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/AppendOnlyTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/AppendOnlyTest.java index 3dc1f40b63b..0e3ec32dad1 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/AppendOnlyTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/AppendOnlyTest.java @@ -303,7 +303,7 @@ public void verifyAppendOnlyNoAuditingAllowDuplicatesNoVersioningNoFilterExistin List milestoningSqlList = operations.ingestSql(); String insertSql = "INSERT INTO `mydb`.`main` (`id`, `name`, `amount`, `biz_date`, `digest`, `batch_id`) " + - "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,LAKEHOUSE_MD5(['amount','biz_date','id','name'],[stage.`amount`,stage.`biz_date`,stage.`id`,stage.`name`])," + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,LAKEHOUSE_MD5(['amount',stage.`amount`,'biz_date',stage.`biz_date`,'id',stage.`id`,'name',stage.`name`])," + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN') " + "FROM `mydb`.`staging` as stage)"; Assertions.assertEquals(BigQueryTestArtifacts.expectedBaseTableCreateQueryWithNoPKs, preActionsSqlList.get(0)); @@ -339,7 +339,7 @@ public void verifyAppendOnlyWithAuditingFailOnDuplicatesAllVersionNoFilterExisti { String insertSql = "INSERT INTO `mydb`.`main` " + "(`id`, `name`, `amount`, `biz_date`, `digest`, `batch_update_time`, `batch_id`) " + - "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,LAKEHOUSE_MD5(['biz_date','name'],[stage.`biz_date`,stage.`name`]),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000')," + + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`,LAKEHOUSE_MD5(['biz_date',stage.`biz_date`,'name',stage.`name`]),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000')," + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN') " + "FROM `mydb`.`staging_temp_staging_lp_yosulf` as stage " + "WHERE (stage.`data_split` >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.`data_split` <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}'))"; @@ -391,7 +391,7 @@ public void verifyAppendOnlyWithAuditingFailOnDuplicatesAllVersionNoFilterExisti String insertSql = "INSERT INTO `mydb`.`main` " + "(`id`, `name`, `amount`, `biz_date`, `digest`, `batch_update_time`, `batch_id`) " + "(SELECT stage.`id`,stage.`name`,stage.`amount`,stage.`biz_date`," + - "LAKEHOUSE_MD5(['amount','biz_date','name'],[doubleToString(stage.`amount`),dateToString(stage.`biz_date`),stage.`name`]),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000')," + + "LAKEHOUSE_MD5(['amount',doubleToString(stage.`amount`),'biz_date',dateToString(stage.`biz_date`),'name',stage.`name`]),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000')," + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MAIN') " + "FROM `mydb`.`staging_temp_staging_lp_yosulf` as stage " + "WHERE (stage.`data_split` >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.`data_split` <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}'))"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java index e9463654f2a..8610febb0d7 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java @@ -351,11 +351,11 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptions() "OPTIONS (uris=['/path/xyz/file1.csv','/path/xyz/file2.csv'], format='CSV')"; String expectedInsertSql = "INSERT INTO `my_db`.`my_name` " + - "(`col_int`, `col_string`, `col_decimal`, `col_datetime`, `col_variant`, `digest`, `batch_id`, `append_time`) " + - "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`," + - "LAKEHOUSE_MD5(['col_datetime','col_decimal','col_int','col_string','col_variant'],[legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_variant`])," + - "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MY_NAME'),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000') " + - "FROM `my_db`.`my_name_temp_lp_yosulf` as legend_persistence_temp)"; + "(`col_int`, `col_string`, `col_decimal`, `col_datetime`, `col_variant`, `digest`, `batch_id`, `append_time`) " + + "(SELECT legend_persistence_temp.`col_int`,legend_persistence_temp.`col_string`,legend_persistence_temp.`col_decimal`,legend_persistence_temp.`col_datetime`,legend_persistence_temp.`col_variant`," + + "LAKEHOUSE_MD5(['col_datetime',legend_persistence_temp.`col_datetime`,'col_decimal',legend_persistence_temp.`col_decimal`,'col_int',legend_persistence_temp.`col_int`,'col_string',legend_persistence_temp.`col_string`,'col_variant',legend_persistence_temp.`col_variant`])," + + "(SELECT COALESCE(MAX(batch_metadata.`table_batch_id`),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.`table_name`) = 'MY_NAME'),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000') " + + "FROM `my_db`.`my_name_temp_lp_yosulf` as legend_persistence_temp)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedCopySql, preActionsSql.get(2)); @@ -414,10 +414,11 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledNoExtraOptionsUpperCase() "OPTIONS (uris=['/path/xyz/file1.csv','/path/xyz/file2.csv'], format='CSV')"; String expectedInsertSql = "INSERT INTO `MY_DB`.`MY_NAME` " + - "(`COL_INT`, `COL_STRING`, `COL_DECIMAL`, `COL_DATETIME`, `COL_VARIANT`, `DIGEST`, `BATCH_ID`, `APPEND_TIME`) " + - "(SELECT legend_persistence_temp.`COL_INT`,legend_persistence_temp.`COL_STRING`,legend_persistence_temp.`COL_DECIMAL`,legend_persistence_temp.`COL_DATETIME`,legend_persistence_temp.`COL_VARIANT`," + - "LAKEHOUSE_MD5(['COL_VARIANT'],[legend_persistence_temp.`COL_VARIANT`]),(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MY_NAME'),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000') " + - "FROM `MY_DB`.`MY_NAME_TEMP_LP_YOSULF` as legend_persistence_temp)"; + "(`COL_INT`, `COL_STRING`, `COL_DECIMAL`, `COL_DATETIME`, `COL_VARIANT`, `DIGEST`, `BATCH_ID`, `APPEND_TIME`) " + + "(SELECT legend_persistence_temp.`COL_INT`,legend_persistence_temp.`COL_STRING`,legend_persistence_temp.`COL_DECIMAL`,legend_persistence_temp.`COL_DATETIME`,legend_persistence_temp.`COL_VARIANT`," + + "LAKEHOUSE_MD5(['COL_VARIANT',legend_persistence_temp.`COL_VARIANT`])," + + "(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MY_NAME'),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000') " + + "FROM `MY_DB`.`MY_NAME_TEMP_LP_YOSULF` as legend_persistence_temp)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedCopySql, preActionsSql.get(2)); @@ -483,7 +484,7 @@ public void testBulkLoadWithDigestGeneratedAndTypeConversionUdfsAuditEnabledNoEx String expectedInsertSql = "INSERT INTO `MY_DB`.`MY_NAME` " + "(`COL_INT`, `COL_STRING`, `COL_DECIMAL`, `COL_DATETIME`, `COL_VARIANT`, `DIGEST`, `BATCH_ID`, `APPEND_TIME`) " + "(SELECT legend_persistence_temp.`COL_INT`,legend_persistence_temp.`COL_STRING`,legend_persistence_temp.`COL_DECIMAL`,legend_persistence_temp.`COL_DATETIME`,legend_persistence_temp.`COL_VARIANT`," + - "LAKEHOUSE_MD5(['COL_DATETIME','COL_DECIMAL','COL_STRING'],[timestampToString(legend_persistence_temp.`COL_DATETIME`),decimalToString(legend_persistence_temp.`COL_DECIMAL`),legend_persistence_temp.`COL_STRING`])" + + "LAKEHOUSE_MD5(['COL_DATETIME',timestampToString(legend_persistence_temp.`COL_DATETIME`),'COL_DECIMAL',decimalToString(legend_persistence_temp.`COL_DECIMAL`),'COL_STRING',legend_persistence_temp.`COL_STRING`])" + ",(SELECT COALESCE(MAX(BATCH_METADATA.`TABLE_BATCH_ID`),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.`TABLE_NAME`) = 'MY_NAME'),PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000') " + "FROM `MY_DB`.`MY_NAME_TEMP_LP_YOSULF` as legend_persistence_temp)"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2DigestUtil.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2DigestUtil.java index 3a6c1db5e95..3ea2589e381 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2DigestUtil.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/H2DigestUtil.java @@ -36,17 +36,17 @@ public static void registerMD5Udf(JdbcHelper sink, String UdfName) sink.executeStatement("CREATE ALIAS " + UdfName + " FOR \"org.finos.legend.engine.persistence.components.relational.h2.H2DigestUtil.MD5\";"); } - public static String MD5(String[] columnNameList, String[] columnValueList) + public static String MD5(String[] columnNameAndValueList) { - return calculateMD5Digest(generateRowMap(columnNameList, columnValueList)); + return calculateMD5Digest(generateRowMap(columnNameAndValueList)); } - private static Map generateRowMap(String[] columnNameList, String[] columnValueList) + private static Map generateRowMap(String[] columnNameAndValueList) { Map map = new HashMap<>(); - for (int i = 0; i < columnNameList.length; i++) + for (int i = 0; i < columnNameAndValueList.length; i++) { - map.put(columnNameList[i], columnValueList[i]); + map.put(columnNameAndValueList[i], columnNameAndValueList[++i]); } return map; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/sql/visitor/DigestUdfVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/sql/visitor/DigestUdfVisitor.java index 800ac27937a..79b11953d98 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/sql/visitor/DigestUdfVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/main/java/org/finos/legend/engine/persistence/components/relational/h2/sql/visitor/DigestUdfVisitor.java @@ -16,13 +16,9 @@ import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType; import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType; -import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue; -import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl; -import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName; -import org.finos.legend.engine.persistence.components.logicalplan.values.ObjectValue; -import org.finos.legend.engine.persistence.components.logicalplan.values.StagedFilesFieldValue; -import org.finos.legend.engine.persistence.components.logicalplan.values.Value; +import org.finos.legend.engine.persistence.components.logicalplan.values.*; +import java.util.List; import java.util.Map; public class DigestUdfVisitor extends org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.DigestUdfVisitor @@ -47,4 +43,11 @@ protected Value getColumnValueAsStringType(Value value, FieldType dataType, Map< return FunctionImpl.builder().functionName(FunctionName.CONVERT).addValue(value, ObjectValue.of(DataType.VARCHAR.name())).build(); } } + + @Override + protected Value mergeColumnsFunction(List columns) + { + ToArrayFunction toArrayFunction = ToArrayFunction.builder().addAllValues(columns).build(); + return toArrayFunction; + } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/H2DigestUtilTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/H2DigestUtilTest.java index 0a0b29ea9ea..32009f26c42 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/H2DigestUtilTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/H2DigestUtilTest.java @@ -21,12 +21,11 @@ public class H2DigestUtilTest { private String expectedDigest = "fd40b241c6d2eb55348e3bc51e81925b"; - private String[] columns = new String[]{"COLUMN_1", "COLUMN_2", "COLUMN_3", "COLUMN_4", "COLUMN_5", "COLUMN_6"}; - private String[] values = new String[]{"test data", "true", "33", "1111", "1.5", null}; + private String[] columns = new String[]{"COLUMN_1", "test data", "COLUMN_2", "true", "COLUMN_3", "33", "COLUMN_4", "1111", "COLUMN_5", "1.5", "COLUMN_6", null}; @Test void testMD5() { - Assertions.assertEquals(expectedDigest, H2DigestUtil.MD5(columns, values)); + Assertions.assertEquals(expectedDigest, H2DigestUtil.MD5(columns)); } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bulkload/BulkLoadTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bulkload/BulkLoadTest.java index 39aedf767e6..134d22a7546 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bulkload/BulkLoadTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/bulkload/BulkLoadTest.java @@ -305,7 +305,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabled() throws Exception String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"main\" " + "(\"col_int\", \"col_string\", \"col_decimal\", \"col_datetime\", \"digest\", \"batch_id\", \"append_time\") " + "SELECT CONVERT(\"col_int\",INTEGER),CONVERT(\"col_string\",VARCHAR),CONVERT(\"col_decimal\",DECIMAL(5,2)),CONVERT(\"col_datetime\",TIMESTAMP)," + - "LAKEHOUSE_MD5(ARRAY['col_datetime','col_decimal','col_int','col_string'],ARRAY[\"col_datetime\",\"col_decimal\",\"col_int\",\"col_string\"])," + + "LAKEHOUSE_MD5(ARRAY['col_datetime',\"col_datetime\",'col_decimal',\"col_decimal\",'col_int',\"col_int\",'col_string',\"col_string\"])," + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),'2000-01-01 00:00:00.000000' FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file3.csv','col_int,col_string,col_decimal,col_datetime',NULL)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); @@ -385,7 +385,7 @@ public void testBulkLoadWithDigestGeneratedWithFieldsToExcludeAuditEnabled() thr String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"main\" " + "(\"col_int\", \"col_string\", \"col_decimal\", \"col_datetime\", \"digest\", \"batch_id\", \"append_time\") " + "SELECT CONVERT(\"col_int\",INTEGER),CONVERT(\"col_string\",VARCHAR),CONVERT(\"col_decimal\",DECIMAL(5,2)),CONVERT(\"col_datetime\",TIMESTAMP)," + - "LAKEHOUSE_MD5(ARRAY['col_decimal','col_int'],ARRAY[\"col_decimal\",\"col_int\"])," + + "LAKEHOUSE_MD5(ARRAY['col_decimal',\"col_decimal\",'col_int',\"col_int\"])," + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN'),'2000-01-01 00:00:00.000000' FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file3.csv','col_int,col_string,col_decimal,col_datetime',NULL)"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); @@ -466,7 +466,7 @@ public void testBulkLoadWithDigestGeneratedAuditEnabledUpperCase() throws Except String expectedIngestSql = "INSERT INTO \"TEST_DB\".\"TEST\".\"MAIN\" " + "(\"COL_INT\", \"COL_STRING\", \"COL_DECIMAL\", \"COL_DATETIME\", \"DIGEST\", \"BATCH_ID\", \"APPEND_TIME\") " + "SELECT CONVERT(\"COL_INT\",INTEGER),CONVERT(\"COL_STRING\",VARCHAR),CONVERT(\"COL_DECIMAL\",DECIMAL(5,2)),CONVERT(\"COL_DATETIME\",TIMESTAMP)," + - "LAKEHOUSE_MD5(ARRAY['COL_DATETIME','COL_DECIMAL','COL_INT','COL_STRING'],ARRAY[\"COL_DATETIME\",\"COL_DECIMAL\",\"COL_INT\",\"COL_STRING\"])," + + "LAKEHOUSE_MD5(ARRAY['COL_DATETIME',\"COL_DATETIME\",'COL_DECIMAL',\"COL_DECIMAL\",'COL_INT',\"COL_INT\",'COL_STRING',\"COL_STRING\"])," + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN'),'2000-01-01 00:00:00.000000' " + "FROM CSVREAD('src/test/resources/data/bulk-load/input/staged_file4.csv','COL_INT,COL_STRING,COL_DECIMAL,COL_DATETIME',NULL)"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java index 2124532fb31..632f2944cea 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-h2/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/nontemporal/AppendOnlyTest.java @@ -480,7 +480,7 @@ void testAppendOnlyWithUDFDigestGeneration() throws Exception String expectedIngestSql = "INSERT INTO \"TEST\".\"MAIN\" " + "(\"NAME\", \"INCOME\", \"EXPIRY_DATE\", \"DIGEST\", \"BATCH_ID\") " + "(SELECT staging.\"NAME\" as \"NAME\",staging.\"INCOME\" as \"INCOME\",staging.\"EXPIRY_DATE\" as \"EXPIRY_DATE\"," + - "LAKEHOUSE_MD5(ARRAY['EXPIRY_DATE','INCOME','NAME'],ARRAY[CONVERT(staging.\"EXPIRY_DATE\",VARCHAR),CONVERT(staging.\"INCOME\",VARCHAR),CONVERT(staging.\"NAME\",VARCHAR)])," + + "LAKEHOUSE_MD5(ARRAY['EXPIRY_DATE',CONVERT(staging.\"EXPIRY_DATE\",VARCHAR),'INCOME',CONVERT(staging.\"INCOME\",VARCHAR),'NAME',CONVERT(staging.\"NAME\",VARCHAR)])," + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MAIN') " + "FROM \"TEST\".\"STAGING\" as staging)"; @@ -568,7 +568,7 @@ void testAppendOnlyWithUDFDigestGenerationWithFieldsToExclude() throws Exception String expectedIngestSql = "INSERT INTO \"TEST\".\"main\" " + "(\"id\", \"name\", \"income\", \"start_time\", \"expiry_date\", \"version\", \"digest\", \"batch_update_time\", \"batch_id\") " + "(SELECT staging.\"id\" as \"id\",staging.\"name\" as \"name\",staging.\"income\" as \"income\",staging.\"start_time\" as \"start_time\",staging.\"expiry_date\" as \"expiry_date\",staging.\"version\" as \"version\"," + - "LAKEHOUSE_MD5(ARRAY['expiry_date','id','income','name','start_time'],ARRAY[CONVERT(staging.\"expiry_date\",VARCHAR),CONVERT(staging.\"id\",VARCHAR),CONVERT(staging.\"income\",VARCHAR),CONVERT(staging.\"name\",VARCHAR),CONVERT(staging.\"start_time\",VARCHAR)])," + + "LAKEHOUSE_MD5(ARRAY['expiry_date',CONVERT(staging.\"expiry_date\",VARCHAR),'id',CONVERT(staging.\"id\",VARCHAR),'income',CONVERT(staging.\"income\",VARCHAR),'name',CONVERT(staging.\"name\",VARCHAR),'start_time',CONVERT(staging.\"start_time\",VARCHAR)])," + "'2000-01-01 00:00:00.000000'," + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN') " + "FROM \"TEST\".\"staging_temp_staging_lp_yosulf\" as staging WHERE (staging.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (staging.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}'))"; @@ -670,7 +670,7 @@ void testAppendOnlyWithUDFDigestGenerationWithFieldsToExcludeAndTypeConversionUd String expectedIngestSql = "INSERT INTO \"TEST\".\"main\" " + "(\"id\", \"name\", \"income\", \"start_time\", \"expiry_date\", \"version\", \"digest\", \"batch_update_time\", \"batch_id\") " + "(SELECT staging.\"id\" as \"id\",staging.\"name\" as \"name\",staging.\"income\" as \"income\",staging.\"start_time\" as \"start_time\",staging.\"expiry_date\" as \"expiry_date\",staging.\"version\" as \"version\"," + - "LAKEHOUSE_MD5(ARRAY['expiry_date','id','income','name','start_time'],ARRAY[CONVERT(staging.\"expiry_date\",VARCHAR),CONVERT(staging.\"id\",VARCHAR),CONVERT(staging.\"income\",VARCHAR),StringToString(staging.\"name\"),CONVERT(staging.\"start_time\",VARCHAR)])," + + "LAKEHOUSE_MD5(ARRAY['expiry_date',CONVERT(staging.\"expiry_date\",VARCHAR),'id',CONVERT(staging.\"id\",VARCHAR),'income',CONVERT(staging.\"income\",VARCHAR),'name',StringToString(staging.\"name\"),'start_time',CONVERT(staging.\"start_time\",VARCHAR)])," + "'2000-01-01 00:00:00.000000'," + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN') " + "FROM \"TEST\".\"staging_temp_staging_lp_yosulf\" as staging WHERE (staging.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (staging.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}'))"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/AppendOnlyTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/AppendOnlyTest.java index 04b064a1abc..3a981643bf2 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/AppendOnlyTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/AppendOnlyTest.java @@ -61,7 +61,7 @@ public void verifyAppendOnlyNoAuditingAllowDuplicatesNoVersioningNoFilterExistin String insertSql = "INSERT INTO \"mydb\".\"main\" " + "(\"id\", \"name\", \"amount\", \"biz_date\", \"digest\", \"batch_id\") " + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," + - "LAKEHOUSE_MD5(ARRAY_CONSTRUCT('amount','biz_date','id','name'),ARRAY_CONSTRUCT(stage.\"amount\",stage.\"biz_date\",stage.\"id\",stage.\"name\"))," + + "LAKEHOUSE_MD5(CONCAT('amount',stage.\"amount\",'biz_date',stage.\"biz_date\",'id',stage.\"id\",'name',stage.\"name\"))," + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN') " + "FROM \"mydb\".\"staging\" as stage)"; Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTableCreateQueryWithNoPKs, preActionsSqlList.get(0)); @@ -99,7 +99,7 @@ public void verifyAppendOnlyWithAuditingFailOnDuplicatesAllVersionNoFilterExisti String insertSql = "INSERT INTO \"mydb\".\"main\" " + "(\"id\", \"name\", \"amount\", \"biz_date\", \"digest\", \"batch_update_time\", \"batch_id\") " + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," + - "LAKEHOUSE_MD5(ARRAY_CONSTRUCT('biz_date','name'),ARRAY_CONSTRUCT(stage.\"biz_date\",stage.\"name\"))," + + "LAKEHOUSE_MD5(CONCAT('biz_date',stage.\"biz_date\",'name',stage.\"name\"))," + "'2000-01-01 00:00:00.000000',(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN') " + "FROM \"mydb\".\"staging_temp_staging_lp_yosulf\" as stage WHERE (stage.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}'))"; @@ -152,7 +152,7 @@ public void verifyAppendOnlyWithAuditingFailOnDuplicatesAllVersionNoFilterExisti String insertSql = "INSERT INTO \"mydb\".\"main\" " + "(\"id\", \"name\", \"amount\", \"biz_date\", \"digest\", \"batch_update_time\", \"batch_id\") " + "(SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\"," + - "LAKEHOUSE_MD5(ARRAY_CONSTRUCT('amount','biz_date','name'),ARRAY_CONSTRUCT(doubleToString(stage.\"amount\"),dateToString(stage.\"biz_date\"),stage.\"name\"))," + + "LAKEHOUSE_MD5(CONCAT('amount',doubleToString(stage.\"amount\"),'biz_date',dateToString(stage.\"biz_date\"),'name',stage.\"name\"))," + "'2000-01-01 00:00:00.000000',(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN') " + "FROM \"mydb\".\"staging_temp_staging_lp_yosulf\" as stage WHERE (stage.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}'))"; diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java index 74b4fd01f53..25106f3948d 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/test/java/org/finos/legend/engine/persistence/components/ingestmode/BulkLoadTest.java @@ -80,30 +80,30 @@ public class BulkLoadTest .build(); private static Field col5 = Field.builder() - .name("col_decimal") - .type(FieldType.of(DataType.DECIMAL, 4, 1)) - .columnNumber(6) - .build(); + .name("col_decimal") + .type(FieldType.of(DataType.DECIMAL, 4, 1)) + .columnNumber(6) + .build(); private static Field col1NonNullable = Field.builder() - .name("col_int") - .type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())) - .nullable(false) - .build(); + .name("col_int") + .type(FieldType.of(DataType.INT, Optional.empty(), Optional.empty())) + .nullable(false) + .build(); private static Field col3NonNullable = Field.builder() - .name("col_bigint") - .type(FieldType.of(DataType.BIGINT, Optional.empty(), Optional.empty())) - .columnNumber(4) - .nullable(false) - .build(); + .name("col_bigint") + .type(FieldType.of(DataType.BIGINT, Optional.empty(), Optional.empty())) + .columnNumber(4) + .nullable(false) + .build(); private static Field col4NonNullable = Field.builder() - .name("col_variant") - .type(FieldType.of(DataType.VARIANT, Optional.empty(), Optional.empty())) - .columnNumber(5) - .nullable(false) - .build(); + .name("col_variant") + .type(FieldType.of(DataType.VARIANT, Optional.empty(), Optional.empty())) + .columnNumber(5) + .nullable(false) + .build(); private List filesList = Arrays.asList("/path/xyz/file1.csv", "/path/xyz/file2.csv"); @@ -122,12 +122,12 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersDerived() Dataset stagedFilesDataset = StagedFilesDataset.builder() .stagedFilesDatasetProperties( SnowflakeStagedFilesDatasetProperties.builder() - .location("my_location") - .fileFormat(StandardFileFormat.builder() - .formatType(FileFormatType.CSV) - .putFormatOptions("FIELD_DELIMITER", ",") - .build()) - .addAllFilePatterns(filesList).build()) + .location("my_location") + .fileFormat(StandardFileFormat.builder() + .formatType(FileFormatType.CSV) + .putFormatOptions("FIELD_DELIMITER", ",") + .build()) + .addAllFilePatterns(filesList).build()) .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) .build(); @@ -262,25 +262,25 @@ public void testBulkLoadWithDigestNotGeneratedColumnNumbersProvided() // Checking dry run String expectedDryRunPreActionSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name_validation_lp_yosulf\"" + - "(\"col_bigint\" VARCHAR,\"col_variant\" VARCHAR,\"legend_persistence_file\" VARCHAR,\"legend_persistence_row_number\" BIGINT)"; + "(\"col_bigint\" VARCHAR,\"col_variant\" VARCHAR,\"legend_persistence_file\" VARCHAR,\"legend_persistence_row_number\" BIGINT)"; String expectedDryRunDeleteSql = "DELETE FROM \"my_db\".\"my_name_validation_lp_yosulf\" as my_name_validation_lp_yosulf"; String expectedDryRunLoadSQl = "COPY INTO \"my_db\".\"my_name_validation_lp_yosulf\" (\"col_bigint\", \"col_variant\", \"legend_persistence_file\", \"legend_persistence_row_number\") " + - "FROM (SELECT t.$4 as \"col_bigint\",t.$5 as \"col_variant\",METADATA$FILENAME,METADATA$FILE_ROW_NUMBER + 1 FROM my_location as t) " + - "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') FILE_FORMAT = (TYPE = 'AVRO') ON_ERROR = 'ABORT_STATEMENT'"; + "FROM (SELECT t.$4 as \"col_bigint\",t.$5 as \"col_variant\",METADATA$FILENAME,METADATA$FILE_ROW_NUMBER + 1 FROM my_location as t) " + + "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') FILE_FORMAT = (TYPE = 'AVRO') ON_ERROR = 'ABORT_STATEMENT'"; String expectedDryRunNullValidationSql = "SELECT my_name_validation_lp_yosulf.\"col_bigint\",my_name_validation_lp_yosulf.\"col_variant\",my_name_validation_lp_yosulf.\"legend_persistence_file\",my_name_validation_lp_yosulf.\"legend_persistence_row_number\" " + - "FROM \"my_db\".\"my_name_validation_lp_yosulf\" as my_name_validation_lp_yosulf " + - "WHERE (my_name_validation_lp_yosulf.\"col_bigint\" IS NULL) OR (my_name_validation_lp_yosulf.\"col_variant\" IS NULL) LIMIT 20"; + "FROM \"my_db\".\"my_name_validation_lp_yosulf\" as my_name_validation_lp_yosulf " + + "WHERE (my_name_validation_lp_yosulf.\"col_bigint\" IS NULL) OR (my_name_validation_lp_yosulf.\"col_variant\" IS NULL) LIMIT 20"; String expectedDryRunDatatypeValidationSql1 = "SELECT my_name_validation_lp_yosulf.\"col_bigint\",my_name_validation_lp_yosulf.\"col_variant\",my_name_validation_lp_yosulf.\"legend_persistence_file\",my_name_validation_lp_yosulf.\"legend_persistence_row_number\" " + - "FROM \"my_db\".\"my_name_validation_lp_yosulf\" as my_name_validation_lp_yosulf " + - "WHERE (NOT (my_name_validation_lp_yosulf.\"col_bigint\" IS NULL)) AND (TRY_CAST(my_name_validation_lp_yosulf.\"col_bigint\" AS BIGINT) IS NULL) LIMIT 20"; + "FROM \"my_db\".\"my_name_validation_lp_yosulf\" as my_name_validation_lp_yosulf " + + "WHERE (NOT (my_name_validation_lp_yosulf.\"col_bigint\" IS NULL)) AND (TRY_CAST(my_name_validation_lp_yosulf.\"col_bigint\" AS BIGINT) IS NULL) LIMIT 20"; String expectedDryRunDatatypeValidationSql2 = "SELECT my_name_validation_lp_yosulf.\"col_bigint\",my_name_validation_lp_yosulf.\"col_variant\",my_name_validation_lp_yosulf.\"legend_persistence_file\",my_name_validation_lp_yosulf.\"legend_persistence_row_number\" " + - "FROM \"my_db\".\"my_name_validation_lp_yosulf\" as my_name_validation_lp_yosulf " + - "WHERE (NOT (my_name_validation_lp_yosulf.\"col_variant\" IS NULL)) AND (TRY_CAST(my_name_validation_lp_yosulf.\"col_variant\" AS VARIANT) IS NULL) LIMIT 20"; + "FROM \"my_db\".\"my_name_validation_lp_yosulf\" as my_name_validation_lp_yosulf " + + "WHERE (NOT (my_name_validation_lp_yosulf.\"col_variant\" IS NULL)) AND (TRY_CAST(my_name_validation_lp_yosulf.\"col_variant\" AS VARIANT) IS NULL) LIMIT 20"; String expectedDryRunPostCleanupSql = "DROP TABLE IF EXISTS \"my_db\".\"my_name_validation_lp_yosulf\""; @@ -335,12 +335,12 @@ public void testBulkLoadWithUpperCaseConversionAndNoEventId() Map statsSql = operations.postIngestStatisticsSql(); String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"MY_DB\".\"MY_NAME\"" + - "(\"COL_INT\" INTEGER NOT NULL,\"COL_INTEGER\" INTEGER,\"DIGEST\" VARCHAR,\"BATCH_ID\" INTEGER,\"APPEND_TIME\" DATETIME)"; + "(\"COL_INT\" INTEGER NOT NULL,\"COL_INTEGER\" INTEGER,\"DIGEST\" VARCHAR,\"BATCH_ID\" INTEGER,\"APPEND_TIME\" DATETIME)"; String expectedIngestSql = "COPY INTO \"MY_DB\".\"MY_NAME\" " + "(\"COL_INT\", \"COL_INTEGER\", \"DIGEST\", \"BATCH_ID\", \"APPEND_TIME\") " + "FROM " + "(SELECT legend_persistence_stage.$1 as \"COL_INT\",legend_persistence_stage.$2 as \"COL_INTEGER\"," + - "LAKEHOUSE_MD5(ARRAY_CONSTRUCT('COL_INT','COL_INTEGER'),ARRAY_CONSTRUCT(CAST(legend_persistence_stage.$1 AS INTEGER),CAST(legend_persistence_stage.$2 AS INTEGER)))," + + "LAKEHOUSE_MD5(CONCAT('COL_INT',CAST(legend_persistence_stage.$1 AS INTEGER),'COL_INTEGER',CAST(legend_persistence_stage.$2 AS INTEGER)))," + "(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + "FROM my_location as legend_persistence_stage) " + "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') " + @@ -348,8 +348,8 @@ public void testBulkLoadWithUpperCaseConversionAndNoEventId() "ON_ERROR = 'ABORT_STATEMENT'"; String expectedMetadataIngestSql = "INSERT INTO BATCH_METADATA (\"TABLE_NAME\", \"TABLE_BATCH_ID\", \"BATCH_START_TS_UTC\", \"BATCH_END_TS_UTC\", \"BATCH_STATUS\", \"BATCH_SOURCE_INFO\") " + - "(SELECT 'MY_NAME',(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MY_NAME')," + - "'2000-01-01 00:00:00.000000',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"file_paths\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; + "(SELECT 'MY_NAME',(SELECT COALESCE(MAX(BATCH_METADATA.\"TABLE_BATCH_ID\"),0)+1 FROM BATCH_METADATA as BATCH_METADATA WHERE UPPER(BATCH_METADATA.\"TABLE_NAME\") = 'MY_NAME')," + + "'2000-01-01 00:00:00.000000',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}',PARSE_JSON('{\"file_paths\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}'))"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -363,27 +363,27 @@ public void testBulkLoadWithUpperCaseConversionAndNoEventId() // Checking dry run String expectedDryRunPreActionSql = "CREATE TABLE IF NOT EXISTS \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\"" + - "(\"COL_INT\" VARCHAR,\"COL_INTEGER\" VARCHAR,\"LEGEND_PERSISTENCE_FILE\" VARCHAR,\"LEGEND_PERSISTENCE_ROW_NUMBER\" BIGINT)"; + "(\"COL_INT\" VARCHAR,\"COL_INTEGER\" VARCHAR,\"LEGEND_PERSISTENCE_FILE\" VARCHAR,\"LEGEND_PERSISTENCE_ROW_NUMBER\" BIGINT)"; String expectedDryRunDeleteSql = "DELETE FROM \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\" as MY_NAME_validation_lp_yosulf"; String expectedDryRunLoadSQl = "COPY INTO \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\" " + - "(\"COL_INT\", \"COL_INTEGER\", \"LEGEND_PERSISTENCE_FILE\", \"LEGEND_PERSISTENCE_ROW_NUMBER\") FROM " + - "(SELECT legend_persistence_stage.$1 as \"COL_INT\",legend_persistence_stage.$2 as \"COL_INTEGER\",METADATA$FILENAME,METADATA$FILE_ROW_NUMBER " + - "FROM my_location as legend_persistence_stage) " + - "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') FILE_FORMAT = (FORMAT_NAME = 'my_file_format') ON_ERROR = 'ABORT_STATEMENT'"; + "(\"COL_INT\", \"COL_INTEGER\", \"LEGEND_PERSISTENCE_FILE\", \"LEGEND_PERSISTENCE_ROW_NUMBER\") FROM " + + "(SELECT legend_persistence_stage.$1 as \"COL_INT\",legend_persistence_stage.$2 as \"COL_INTEGER\",METADATA$FILENAME,METADATA$FILE_ROW_NUMBER " + + "FROM my_location as legend_persistence_stage) " + + "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') FILE_FORMAT = (FORMAT_NAME = 'my_file_format') ON_ERROR = 'ABORT_STATEMENT'"; String expectedDryRunNullValidationSql = "SELECT MY_NAME_validation_lp_yosulf.\"COL_INT\",MY_NAME_validation_lp_yosulf.\"COL_INTEGER\",MY_NAME_validation_lp_yosulf.\"LEGEND_PERSISTENCE_FILE\",MY_NAME_validation_lp_yosulf.\"LEGEND_PERSISTENCE_ROW_NUMBER\" " + - "FROM \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\" as MY_NAME_validation_lp_yosulf " + - "WHERE MY_NAME_validation_lp_yosulf.\"COL_INT\" IS NULL LIMIT 20"; + "FROM \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\" as MY_NAME_validation_lp_yosulf " + + "WHERE MY_NAME_validation_lp_yosulf.\"COL_INT\" IS NULL LIMIT 20"; String expectedDryRunDatatypeValidationSql1 = "SELECT MY_NAME_validation_lp_yosulf.\"COL_INT\",MY_NAME_validation_lp_yosulf.\"COL_INTEGER\",MY_NAME_validation_lp_yosulf.\"LEGEND_PERSISTENCE_FILE\",MY_NAME_validation_lp_yosulf.\"LEGEND_PERSISTENCE_ROW_NUMBER\" " + - "FROM \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\" as MY_NAME_validation_lp_yosulf " + - "WHERE (NOT (MY_NAME_validation_lp_yosulf.\"COL_INT\" IS NULL)) AND (TRY_CAST(MY_NAME_validation_lp_yosulf.\"COL_INT\" AS INTEGER) IS NULL) LIMIT 20"; + "FROM \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\" as MY_NAME_validation_lp_yosulf " + + "WHERE (NOT (MY_NAME_validation_lp_yosulf.\"COL_INT\" IS NULL)) AND (TRY_CAST(MY_NAME_validation_lp_yosulf.\"COL_INT\" AS INTEGER) IS NULL) LIMIT 20"; String expectedDryRunDatatypeValidationSql2 = "SELECT MY_NAME_validation_lp_yosulf.\"COL_INT\",MY_NAME_validation_lp_yosulf.\"COL_INTEGER\",MY_NAME_validation_lp_yosulf.\"LEGEND_PERSISTENCE_FILE\",MY_NAME_validation_lp_yosulf.\"LEGEND_PERSISTENCE_ROW_NUMBER\" " + - "FROM \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\" as MY_NAME_validation_lp_yosulf " + - "WHERE (NOT (MY_NAME_validation_lp_yosulf.\"COL_INTEGER\" IS NULL)) AND (TRY_CAST(MY_NAME_validation_lp_yosulf.\"COL_INTEGER\" AS INTEGER) IS NULL) LIMIT 20"; + "FROM \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\" as MY_NAME_validation_lp_yosulf " + + "WHERE (NOT (MY_NAME_validation_lp_yosulf.\"COL_INTEGER\" IS NULL)) AND (TRY_CAST(MY_NAME_validation_lp_yosulf.\"COL_INTEGER\" AS INTEGER) IS NULL) LIMIT 20"; String expectedDryRunPostCleanupSql = "DROP TABLE IF EXISTS \"MY_DB\".\"MY_NAME_VALIDATION_LP_YOSULF\""; @@ -517,7 +517,7 @@ public void testBulkLoadWithDigest() "(\"col_int\", \"col_integer\", \"digest\", \"batch_id\", \"append_time\") " + "FROM " + "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + - "LAKEHOUSE_UDF(ARRAY_CONSTRUCT('col_int','col_integer'),ARRAY_CONSTRUCT(CAST(legend_persistence_stage.$1 AS INTEGER),CAST(legend_persistence_stage.$2 AS INTEGER)))," + + "LAKEHOUSE_UDF(CONCAT('col_int',CAST(legend_persistence_stage.$1 AS INTEGER),'col_integer',CAST(legend_persistence_stage.$2 AS INTEGER)))," + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + "FROM my_location as legend_persistence_stage) " + "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') " + @@ -544,33 +544,33 @@ public void testBulkLoadWithDigestAndTypeConversionUdfs() typeConversionUdfs.put(DataType.VARIANT, "variantToString"); BulkLoad bulkLoad = BulkLoad.builder() - .batchIdField("batch_id") - .digestGenStrategy(UDFBasedDigestGenStrategy.builder().digestField("digest").digestUdfName("LAKEHOUSE_UDF").putAllTypeConversionUdfNames(typeConversionUdfs).addFieldsToExcludeFromDigest(col4.name()).build()) - .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) - .build(); + .batchIdField("batch_id") + .digestGenStrategy(UDFBasedDigestGenStrategy.builder().digestField("digest").digestUdfName("LAKEHOUSE_UDF").putAllTypeConversionUdfNames(typeConversionUdfs).addFieldsToExcludeFromDigest(col4.name()).build()) + .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) + .build(); Dataset stagedFilesDataset = StagedFilesDataset.builder() - .stagedFilesDatasetProperties( - SnowflakeStagedFilesDatasetProperties.builder() - .location("my_location") - .fileFormat(UserDefinedFileFormat.of("my_file_format")) - .addAllFilePaths(filesList).build()) - .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, col5)).build()) - .build(); + .stagedFilesDatasetProperties( + SnowflakeStagedFilesDatasetProperties.builder() + .location("my_location") + .fileFormat(UserDefinedFileFormat.of("my_file_format")) + .addAllFilePaths(filesList).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, col5)).build()) + .build(); Dataset mainDataset = DatasetDefinition.builder() - .database("my_db").name("my_name").alias("my_alias") - .schema(SchemaDefinition.builder().build()) - .build(); + .database("my_db").name("my_name").alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); RelationalGenerator generator = RelationalGenerator.builder() - .ingestMode(bulkLoad) - .relationalSink(SnowflakeSink.get()) - .collectStatistics(true) - .executionTimestampClock(fixedClock_2000_01_01) - .ingestRequestId("task123") - .ingestRunId(ingestRunId) - .build(); + .ingestMode(bulkLoad) + .relationalSink(SnowflakeSink.get()) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .ingestRequestId("task123") + .ingestRunId(ingestRunId) + .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -581,14 +581,14 @@ public void testBulkLoadWithDigestAndTypeConversionUdfs() String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_int\" INTEGER,\"col_integer\" INTEGER,\"col_bigint\" BIGINT,\"col_variant\" VARIANT,\"col_decimal\" NUMBER(4,1),\"digest\" VARCHAR,\"batch_id\" INTEGER,\"append_time\" DATETIME)"; String expectedIngestSql = "COPY INTO \"my_db\".\"my_name\" " + - "(\"col_int\", \"col_integer\", \"col_bigint\", \"col_variant\", \"col_decimal\", \"digest\", \"batch_id\", \"append_time\") FROM " + - "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\",legend_persistence_stage.$3 as \"col_bigint\",TO_VARIANT(PARSE_JSON(legend_persistence_stage.$4)) as \"col_variant\",legend_persistence_stage.$5 as \"col_decimal\"," + - "LAKEHOUSE_UDF(ARRAY_CONSTRUCT('col_bigint','col_decimal','col_int','col_integer'),ARRAY_CONSTRUCT(longToString(CAST(legend_persistence_stage.$3 AS BIGINT)),CAST(legend_persistence_stage.$5 AS NUMBER(4,1)),intToString(CAST(legend_persistence_stage.$1 AS INTEGER)),intToString(CAST(legend_persistence_stage.$2 AS INTEGER))))," + - "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + - "FROM my_location as legend_persistence_stage) " + - "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') " + - "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + - "ON_ERROR = 'ABORT_STATEMENT'"; + "(\"col_int\", \"col_integer\", \"col_bigint\", \"col_variant\", \"col_decimal\", \"digest\", \"batch_id\", \"append_time\") FROM " + + "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\",legend_persistence_stage.$3 as \"col_bigint\",TO_VARIANT(PARSE_JSON(legend_persistence_stage.$4)) as \"col_variant\",legend_persistence_stage.$5 as \"col_decimal\"," + + "LAKEHOUSE_UDF(CONCAT('col_bigint',longToString(CAST(legend_persistence_stage.$3 AS BIGINT)),'col_decimal',CAST(legend_persistence_stage.$5 AS NUMBER(4,1)),'col_int',intToString(CAST(legend_persistence_stage.$1 AS INTEGER)),'col_integer',intToString(CAST(legend_persistence_stage.$2 AS INTEGER))))," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + + "FROM my_location as legend_persistence_stage) " + + "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') " + + "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + + "ON_ERROR = 'ABORT_STATEMENT'"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -604,34 +604,34 @@ public void testBulkLoadWithDigestAndTypeConversionUdfs() public void testBulkLoadWithDigestAndForceOption() { BulkLoad bulkLoad = BulkLoad.builder() - .batchIdField("batch_id") - .digestGenStrategy(UDFBasedDigestGenStrategy.builder().digestField("digest").digestUdfName("LAKEHOUSE_UDF").build()) - .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) - .build(); + .batchIdField("batch_id") + .digestGenStrategy(UDFBasedDigestGenStrategy.builder().digestField("digest").digestUdfName("LAKEHOUSE_UDF").build()) + .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) + .build(); Dataset stagedFilesDataset = StagedFilesDataset.builder() - .stagedFilesDatasetProperties( - SnowflakeStagedFilesDatasetProperties.builder() - .location("my_location") - .fileFormat(UserDefinedFileFormat.of("my_file_format")) - .putCopyOptions("FORCE", true) - .addAllFilePatterns(filesList).build()) - .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) - .build(); + .stagedFilesDatasetProperties( + SnowflakeStagedFilesDatasetProperties.builder() + .location("my_location") + .fileFormat(UserDefinedFileFormat.of("my_file_format")) + .putCopyOptions("FORCE", true) + .addAllFilePatterns(filesList).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) + .build(); Dataset mainDataset = DatasetDefinition.builder() - .database("my_db").name("my_name").alias("my_alias") - .schema(SchemaDefinition.builder().build()) - .build(); + .database("my_db").name("my_name").alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); RelationalGenerator generator = RelationalGenerator.builder() - .ingestMode(bulkLoad) - .relationalSink(SnowflakeSink.get()) - .collectStatistics(true) - .executionTimestampClock(fixedClock_2000_01_01) - .putAllAdditionalMetadata(Collections.singletonMap("watermark", "my_watermark_value")) - .ingestRunId(ingestRunId) - .build(); + .ingestMode(bulkLoad) + .relationalSink(SnowflakeSink.get()) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .putAllAdditionalMetadata(Collections.singletonMap("watermark", "my_watermark_value")) + .ingestRunId(ingestRunId) + .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -643,22 +643,22 @@ public void testBulkLoadWithDigestAndForceOption() String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_int\" INTEGER,\"col_integer\" INTEGER,\"digest\" VARCHAR,\"batch_id\" INTEGER,\"append_time\" DATETIME)"; String expectedIngestSql = "COPY INTO \"my_db\".\"my_name\" " + - "(\"col_int\", \"col_integer\", \"digest\", \"batch_id\", \"append_time\") " + - "FROM " + - "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + - "LAKEHOUSE_UDF(ARRAY_CONSTRUCT('col_int','col_integer'),ARRAY_CONSTRUCT(CAST(legend_persistence_stage.$1 AS INTEGER),CAST(legend_persistence_stage.$2 AS INTEGER)))," + - "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + - "FROM my_location as legend_persistence_stage) " + - "PATTERN = '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)' " + - "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + - "FORCE = true, ON_ERROR = 'ABORT_STATEMENT'"; + "(\"col_int\", \"col_integer\", \"digest\", \"batch_id\", \"append_time\") " + + "FROM " + + "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + + "LAKEHOUSE_UDF(CONCAT('col_int',CAST(legend_persistence_stage.$1 AS INTEGER),'col_integer',CAST(legend_persistence_stage.$2 AS INTEGER)))," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + + "FROM my_location as legend_persistence_stage) " + + "PATTERN = '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)' " + + "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + + "FORCE = true, ON_ERROR = 'ABORT_STATEMENT'"; String expectedMetaIngestSql = "INSERT INTO batch_metadata " + - "(\"table_name\", \"table_batch_id\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"batch_source_info\", \"additional_metadata\") " + - "(SELECT 'my_name',(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME')," + - "'2000-01-01 00:00:00.000000',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}'," + - "PARSE_JSON('{\"file_patterns\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}')," + - "PARSE_JSON('{\"watermark\":\"my_watermark_value\"}'))"; + "(\"table_name\", \"table_batch_id\", \"batch_start_ts_utc\", \"batch_end_ts_utc\", \"batch_status\", \"batch_source_info\", \"additional_metadata\") " + + "(SELECT 'my_name',(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME')," + + "'2000-01-01 00:00:00.000000',SYSDATE(),'{BULK_LOAD_BATCH_STATUS_PLACEHOLDER}'," + + "PARSE_JSON('{\"file_patterns\":[\"/path/xyz/file1.csv\",\"/path/xyz/file2.csv\"]}')," + + "PARSE_JSON('{\"watermark\":\"my_watermark_value\"}'))"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -675,34 +675,34 @@ public void testBulkLoadWithDigestAndForceOption() public void testBulkLoadWithDigestWithFieldsToExcludeAndForceOption() { BulkLoad bulkLoad = BulkLoad.builder() - .batchIdField("batch_id") - .digestGenStrategy(UDFBasedDigestGenStrategy.builder().digestField("digest").digestUdfName("LAKEHOUSE_UDF").addAllFieldsToExcludeFromDigest(Arrays.asList(col1.name())).build()) - .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) - .build(); + .batchIdField("batch_id") + .digestGenStrategy(UDFBasedDigestGenStrategy.builder().digestField("digest").digestUdfName("LAKEHOUSE_UDF").addAllFieldsToExcludeFromDigest(Arrays.asList(col1.name())).build()) + .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) + .build(); Dataset stagedFilesDataset = StagedFilesDataset.builder() - .stagedFilesDatasetProperties( - SnowflakeStagedFilesDatasetProperties.builder() - .location("my_location") - .fileFormat(UserDefinedFileFormat.of("my_file_format")) - .putCopyOptions("FORCE", true) - .addAllFilePatterns(filesList).build()) - .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) - .build(); + .stagedFilesDatasetProperties( + SnowflakeStagedFilesDatasetProperties.builder() + .location("my_location") + .fileFormat(UserDefinedFileFormat.of("my_file_format")) + .putCopyOptions("FORCE", true) + .addAllFilePatterns(filesList).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) + .build(); Dataset mainDataset = DatasetDefinition.builder() - .database("my_db").name("my_name").alias("my_alias") - .schema(SchemaDefinition.builder().build()) - .build(); + .database("my_db").name("my_name").alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); RelationalGenerator generator = RelationalGenerator.builder() - .ingestMode(bulkLoad) - .relationalSink(SnowflakeSink.get()) - .collectStatistics(true) - .executionTimestampClock(fixedClock_2000_01_01) - .ingestRequestId("task123") - .ingestRunId(ingestRunId) - .build(); + .ingestMode(bulkLoad) + .relationalSink(SnowflakeSink.get()) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .ingestRequestId("task123") + .ingestRunId(ingestRunId) + .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -713,15 +713,15 @@ public void testBulkLoadWithDigestWithFieldsToExcludeAndForceOption() String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_int\" INTEGER,\"col_integer\" INTEGER,\"digest\" VARCHAR,\"batch_id\" INTEGER,\"append_time\" DATETIME)"; String expectedIngestSql = "COPY INTO \"my_db\".\"my_name\" " + - "(\"col_int\", \"col_integer\", \"digest\", \"batch_id\", \"append_time\") " + - "FROM " + - "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + - "LAKEHOUSE_UDF(ARRAY_CONSTRUCT('col_integer'),ARRAY_CONSTRUCT(CAST(legend_persistence_stage.$2 AS INTEGER)))," + - "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + - "FROM my_location as legend_persistence_stage) " + - "PATTERN = '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)' " + - "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + - "FORCE = true, ON_ERROR = 'ABORT_STATEMENT'"; + "(\"col_int\", \"col_integer\", \"digest\", \"batch_id\", \"append_time\") " + + "FROM " + + "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + + "LAKEHOUSE_UDF(CONCAT('col_integer',CAST(legend_persistence_stage.$2 AS INTEGER)))," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + + "FROM my_location as legend_persistence_stage) " + + "PATTERN = '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)' " + + "FILE_FORMAT = (FORMAT_NAME = 'my_file_format') " + + "FORCE = true, ON_ERROR = 'ABORT_STATEMENT'"; Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); @@ -737,37 +737,37 @@ public void testBulkLoadWithDigestWithFieldsToExcludeAndForceOption() public void testBulkLoadWithDigestAndForceOptionAndOnErrorOption() { BulkLoad bulkLoad = BulkLoad.builder() - .batchIdField("batch_id") - .digestGenStrategy(UDFBasedDigestGenStrategy.builder().digestField("digest").digestUdfName("LAKEHOUSE_UDF").build()) - .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) - .build(); + .batchIdField("batch_id") + .digestGenStrategy(UDFBasedDigestGenStrategy.builder().digestField("digest").digestUdfName("LAKEHOUSE_UDF").build()) + .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) + .build(); Dataset stagedFilesDataset = StagedFilesDataset.builder() - .stagedFilesDatasetProperties( - SnowflakeStagedFilesDatasetProperties.builder() - .location("my_location") - .fileFormat(StandardFileFormat.builder() - .formatType(FileFormatType.CSV) - .putFormatOptions("FIELD_DELIMITER", ",") - .build()) - .putCopyOptions("ON_ERROR", "SKIP_FILE") - .addAllFilePatterns(filesList).build()) - .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) - .build(); + .stagedFilesDatasetProperties( + SnowflakeStagedFilesDatasetProperties.builder() + .location("my_location") + .fileFormat(StandardFileFormat.builder() + .formatType(FileFormatType.CSV) + .putFormatOptions("FIELD_DELIMITER", ",") + .build()) + .putCopyOptions("ON_ERROR", "SKIP_FILE") + .addAllFilePatterns(filesList).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2)).build()) + .build(); Dataset mainDataset = DatasetDefinition.builder() - .database("my_db").name("my_name").alias("my_alias") - .schema(SchemaDefinition.builder().build()) - .build(); + .database("my_db").name("my_name").alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); RelationalGenerator generator = RelationalGenerator.builder() - .ingestMode(bulkLoad) - .relationalSink(SnowflakeSink.get()) - .collectStatistics(true) - .executionTimestampClock(fixedClock_2000_01_01) - .ingestRequestId("task123") - .ingestRunId(ingestRunId) - .build(); + .ingestMode(bulkLoad) + .relationalSink(SnowflakeSink.get()) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .ingestRequestId("task123") + .ingestRunId(ingestRunId) + .build(); GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); @@ -778,15 +778,15 @@ public void testBulkLoadWithDigestAndForceOptionAndOnErrorOption() String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_int\" INTEGER,\"col_integer\" INTEGER,\"digest\" VARCHAR,\"batch_id\" INTEGER,\"append_time\" DATETIME)"; String expectedIngestSql = "COPY INTO \"my_db\".\"my_name\" " + - "(\"col_int\", \"col_integer\", \"digest\", \"batch_id\", \"append_time\") " + - "FROM " + - "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + - "LAKEHOUSE_UDF(ARRAY_CONSTRUCT('col_int','col_integer'),ARRAY_CONSTRUCT(CAST(legend_persistence_stage.$1 AS INTEGER),CAST(legend_persistence_stage.$2 AS INTEGER)))," + - "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + - "FROM my_location as legend_persistence_stage) " + - "PATTERN = '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)' " + - "FILE_FORMAT = (FIELD_DELIMITER = ',', TYPE = 'CSV') " + - "ON_ERROR = 'SKIP_FILE'"; + "(\"col_int\", \"col_integer\", \"digest\", \"batch_id\", \"append_time\") " + + "FROM " + + "(SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\"," + + "LAKEHOUSE_UDF(CONCAT('col_int',CAST(legend_persistence_stage.$1 AS INTEGER),'col_integer',CAST(legend_persistence_stage.$2 AS INTEGER)))," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + + "FROM my_location as legend_persistence_stage) " + + "PATTERN = '(/path/xyz/file1.csv)|(/path/xyz/file2.csv)' " + + "FILE_FORMAT = (FIELD_DELIMITER = ',', TYPE = 'CSV') " + + "ON_ERROR = 'SKIP_FILE'"; String expectedDryRunPreActionsSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name_validation_lp_yosulf\"" + "(\"col_int\" INTEGER,\"col_integer\" INTEGER)"; @@ -810,4 +810,74 @@ public void testBulkLoadWithDigestAndForceOptionAndOnErrorOption() Assertions.assertNull(statsSql.get(ROWS_UPDATED)); Assertions.assertEquals("SELECT COUNT(*) as \"rowsInserted\" FROM \"my_db\".\"my_name\" as my_alias WHERE my_alias.\"batch_id\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME')", statsSql.get(ROWS_INSERTED)); } -} + + @Test + public void testBulkLoadWithDigestAndTypeConversionUdfsAndcolumnTransformationUdf() + { + Map typeConversionUdfs = new HashMap<>(); + typeConversionUdfs.put(DataType.INTEGER, "intToString"); + typeConversionUdfs.put(DataType.INT, "intToString"); + typeConversionUdfs.put(DataType.BIGINT, "longToString"); + typeConversionUdfs.put(DataType.VARIANT, "variantToString"); + + BulkLoad bulkLoad = BulkLoad.builder() + .batchIdField("batch_id") + .digestGenStrategy(UDFBasedDigestGenStrategy.builder() + .digestField("digest") + .digestUdfName("LAKEHOUSE_UDF") + .columnNameValueConcatUdfName("COLUMN_STRING_UDF") + .putAllTypeConversionUdfNames(typeConversionUdfs) + .addFieldsToExcludeFromDigest(col4.name()) + .build()) + .auditing(DateTimeAuditing.builder().dateTimeField(APPEND_TIME).build()) + .build(); + + Dataset stagedFilesDataset = StagedFilesDataset.builder() + .stagedFilesDatasetProperties( + SnowflakeStagedFilesDatasetProperties.builder() + .location("my_location") + .fileFormat(UserDefinedFileFormat.of("my_file_format")) + .addAllFilePaths(filesList).build()) + .schema(SchemaDefinition.builder().addAllFields(Arrays.asList(col1, col2, col3, col4, col5)).build()) + .build(); + + Dataset mainDataset = DatasetDefinition.builder() + .database("my_db").name("my_name").alias("my_alias") + .schema(SchemaDefinition.builder().build()) + .build(); + + RelationalGenerator generator = RelationalGenerator.builder() + .ingestMode(bulkLoad) + .relationalSink(SnowflakeSink.get()) + .collectStatistics(true) + .executionTimestampClock(fixedClock_2000_01_01) + .ingestRequestId("task123") + .ingestRunId(ingestRunId) + .build(); + + GeneratorResult operations = generator.generateOperations(Datasets.of(mainDataset, stagedFilesDataset)); + + List preActionsSql = operations.preActionsSql(); + List ingestSql = operations.ingestSql(); + Map statsSql = operations.postIngestStatisticsSql(); + + String expectedCreateTableSql = "CREATE TABLE IF NOT EXISTS \"my_db\".\"my_name\"(\"col_int\" INTEGER,\"col_integer\" INTEGER,\"col_bigint\" BIGINT,\"col_variant\" VARIANT,\"col_decimal\" NUMBER(4,1),\"digest\" VARCHAR,\"batch_id\" INTEGER,\"append_time\" DATETIME)"; + + String expectedIngestSql = "COPY INTO \"my_db\".\"my_name\" " + + "(\"col_int\", \"col_integer\", \"col_bigint\", \"col_variant\", \"col_decimal\", \"digest\", \"batch_id\", \"append_time\") " + + "FROM (SELECT legend_persistence_stage.$1 as \"col_int\",legend_persistence_stage.$2 as \"col_integer\",legend_persistence_stage.$3 as \"col_bigint\",TO_VARIANT(PARSE_JSON(legend_persistence_stage.$4)) as \"col_variant\",legend_persistence_stage.$5 as \"col_decimal\"," + + "LAKEHOUSE_UDF(CONCAT(COLUMN_STRING_UDF('col_bigint',longToString(CAST(legend_persistence_stage.$3 AS BIGINT))),COLUMN_STRING_UDF('col_decimal',CAST(legend_persistence_stage.$5 AS NUMBER(4,1))),COLUMN_STRING_UDF('col_int',intToString(CAST(legend_persistence_stage.$1 AS INTEGER))),COLUMN_STRING_UDF('col_integer',intToString(CAST(legend_persistence_stage.$2 AS INTEGER)))))," + + "(SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME'),'2000-01-01 00:00:00.000000' " + + "FROM my_location as legend_persistence_stage) " + + "FILES = ('/path/xyz/file1.csv', '/path/xyz/file2.csv') FILE_FORMAT = (FORMAT_NAME = 'my_file_format') ON_ERROR = 'ABORT_STATEMENT'"; + + Assertions.assertEquals(expectedCreateTableSql, preActionsSql.get(0)); + Assertions.assertEquals(expectedIngestSql, ingestSql.get(0)); + + Assertions.assertNull(statsSql.get(INCOMING_RECORD_COUNT)); + Assertions.assertNull(statsSql.get(ROWS_DELETED)); + Assertions.assertNull(statsSql.get(ROWS_TERMINATED)); + Assertions.assertNull(statsSql.get(ROWS_UPDATED)); + Assertions.assertEquals("SELECT COUNT(*) as \"rowsInserted\" FROM \"my_db\".\"my_name\" as my_alias WHERE my_alias.\"batch_id\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MY_NAME')", statsSql.get(ROWS_INSERTED)); + } +} \ No newline at end of file