Skip to content

Commit

Permalink
Persistence Component: API for extracting staging filters, Support Ic…
Browse files Browse the repository at this point in the history
…eberg and Staging table creation (#2027)

* Make table creation optional and extract Staging Filters

* Add test for Big Query

* Add test for Table Not created

* Changes for Supporting Iceberg table creation and tags in Snowflake

* Changes for Creating Staging Table as an optional Param

* Fix the imports in SnowflakeSink
  • Loading branch information
prasar-ashutosh authored Jul 13, 2023
1 parent 96f18e6 commit 18fa480
Show file tree
Hide file tree
Showing 72 changed files with 1,153 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package org.finos.legend.engine.persistence.components.common;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum FilterType
{
GREATER_THAN("GT"),
Expand All @@ -33,4 +37,13 @@ public String getType()
{
return type;
}

private static final Map<String, FilterType> BY_NAME = Arrays
.stream(FilterType.values())
.collect(Collectors.toMap(FilterType::getType, java.util.function.Function.identity()));

public static FilterType fromName(String name)
{
return BY_NAME.get(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;

import java.util.Optional;

public interface Dataset extends LogicalPlanNode
{
default Optional<DatasetAdditionalProperties> datasetAdditionalProperties()
{
throw new UnsupportedOperationException();
}

default SchemaDefinition schema()
{
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.datasets;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Style;

import java.util.Map;
import java.util.Optional;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface DatasetAdditionalPropertiesAbstract extends LogicalPlanNode
{
Optional<TableType> tableType();

Optional<TableOrigin> tableOrigin();

Optional<String> externalVolume();

Optional<String> dataPath();

Optional<String> filePattern();

Map<String, String> tags();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ default String alias()

SchemaDefinition schema();

Optional<DatasetAdditionalProperties> datasetAdditionalProperties();

@Derived
default DatasetReference datasetReference()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.datasets;

public enum TableOrigin
{
NATIVE, ICEBERG, EXTERNAL, EXTERNAL_STAGE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.datasets;

public enum TableType
{
TEMPORARY, TRANSIENT, REFERENCE, MEMORY, CACHED
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Create;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Operation;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
Expand Down Expand Up @@ -115,7 +116,13 @@ else if (!ingestMode().dataSplitField().isPresent())
@Override
public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
{
return LogicalPlan.builder().addOps(Create.of(true, mainDataset())).build();
List<Operation> operations = new ArrayList<>();
operations.add(Create.of(true, mainDataset()));
if (options().createStagingDataset())
{
operations.add(Create.of(true, stagingDataset()));
}
return LogicalPlan.of(operations);
}

protected void addPostRunStatsForRowsInserted(Map<StatisticName, LogicalPlan> postRunStatisticsResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
List<Operation> operations = new ArrayList<>();

operations.add(Create.of(true, mainDataset()));
if (options().createStagingDataset())
{
operations.add(Create.of(true, stagingDataset()));
}
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));

if (ingestMode().validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,14 @@ public LogicalPlan buildLogicalPlanForIngest(Resources resources, Set<Capability
@Override
public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
{
return LogicalPlan.builder()
.addOps(
Create.of(true, mainDataset()),
Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()))
.build();
List<Operation> operations = new ArrayList<>();
operations.add(Create.of(true, mainDataset()));
if (options().createStagingDataset())
{
operations.add(Create.of(true, stagingDataset()));
}
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));
return LogicalPlan.of(operations);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,13 @@ else if (!ingestMode().dataSplitField().isPresent() && !this.deleteIndicatorFiel
@Override
public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
{
return LogicalPlan.builder().addOps(Create.of(true, mainDataset())).build();
List<Operation> operations = new ArrayList<>();
operations.add(Create.of(true, mainDataset()));
if (options().createStagingDataset())
{
operations.add(Create.of(true, stagingDataset()));
}
return LogicalPlan.of(operations);
}

public Optional<Condition> getDataSplitInRangeConditionForStatistics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,13 @@ else if (!ingestMode().dataSplitField().isPresent())
@Override
public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
{
return LogicalPlan.builder().addOps(Create.of(true, mainDataset())).build();
List<Operation> operations = new ArrayList<>();
operations.add(Create.of(true, mainDataset()));
if (options().createStagingDataset())
{
operations.add(Create.of(true, stagingDataset()));
}
return LogicalPlan.of(operations);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ default boolean enableSchemaEvolution()
{
return false;
}

@Default
default boolean createStagingDataset()
{
return false;
}
}

private final Datasets datasets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,17 @@ public LogicalPlan buildLogicalPlanForIngest(Resources resources, Set<Capability
@Override
public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
{
return LogicalPlan.builder().addOps(
Create.of(true, mainDataset()),
Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()))
.build();
List<Operation> operations = new ArrayList<>();
operations.add(Create.of(true, mainDataset()));
if (options().createStagingDataset())
{
operations.add(Create.of(true, stagingDataset()));
}
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));
return LogicalPlan.of(operations);
}


/*
------------------
Upsert Logic:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,14 @@ public LogicalPlan buildLogicalPlanForIngest(Resources resources, Set<Capability
@Override
public LogicalPlan buildLogicalPlanForPreActions(Resources resources)
{
return LogicalPlan.builder()
.addOps(
Create.of(true, mainDataset()),
Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()))
.build();
List<Operation> operations = new ArrayList<>();
operations.add(Create.of(true, mainDataset()));
if (options().createStagingDataset())
{
operations.add(Create.of(true, stagingDataset()));
}
operations.add(Create.of(true, metadataDataset().orElseThrow(IllegalStateException::new).get()));
return LogicalPlan.of(operations);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.SumBinaryValueOperator;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.common.DatasetFilter;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.And;
import org.finos.legend.engine.persistence.components.logicalplan.values.SelectValue;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -139,4 +141,38 @@ public Insert insertMetaData(StringValue mainTableName, BatchStartTimestamp batc
}
return Insert.of(metaDataset, Selection.builder().addAllFields(metaSelectFields).build(), metaInsertFields);
}


/*
SELECT STAGING_FILTERS FROM <batch_metadata> WHERE
TABLE_NAME = <mainTableName> AND
TABLE_BATCH_ID = (SELECT MAX(TABLE_BATCH_ID) from <batch_metadata> where TABLE_NAME = <mainTableName>)
LIMIT 1
*/
public Selection getLatestStagingFilters(StringValue mainTableName)
{
FieldValue stagingFiltersField = FieldValue.builder().datasetRef(metaDataset.datasetReference()).fieldName(dataset.stagingFiltersField()).build();
FieldValue tableNameField = FieldValue.builder().datasetRef(metaDataset.datasetReference()).fieldName(dataset.tableNameField()).build();
FunctionImpl tableNameInUpperCase = FunctionImpl.builder().functionName(FunctionName.UPPER).addValue(tableNameField).build();
StringValue mainTableNameInUpperCase = StringValue.builder().value(mainTableName.value().map(field -> field.toUpperCase()))
.alias(mainTableName.alias()).build();

Condition tableNameEqualsCondition = Equals.of(tableNameInUpperCase, mainTableNameInUpperCase);

FieldValue tableBatchIdField = FieldValue.builder().datasetRef(metaDataset.datasetReference()).fieldName(dataset.tableBatchIdField()).build();
FunctionImpl maxBatchId = FunctionImpl.builder().functionName(FunctionName.MAX).addValue(tableBatchIdField).build();
SelectValue maxBatchIdValue = SelectValue.of(Selection.builder().source(metaDataset.datasetReference()).condition(tableNameEqualsCondition).addFields(maxBatchId).build());
Condition maxBatchIdCondition = Equals.of(tableBatchIdField, maxBatchIdValue);

List<Condition> conditions = new ArrayList<>();
conditions.add(tableNameEqualsCondition);
conditions.add(maxBatchIdCondition);

return Selection.builder()
.source(metaDataset)
.condition(And.of(conditions))
.addFields(stagingFiltersField)
.limit(1)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.JsonExternalDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetAdditionalProperties;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.modifiers.IfExistsTableModifier;
import org.finos.legend.engine.persistence.components.logicalplan.modifiers.IfNotExistsTableModifier;
Expand Down Expand Up @@ -133,6 +134,7 @@
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.SumBinaryValueOperatorVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.TableModifierVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.TabularValuesVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.DatasetAdditionalPropertiesVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.TruncateVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.TableConstraintVisitor;
import org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.WindowFunctionVisitor;
Expand Down Expand Up @@ -177,6 +179,7 @@ public class AnsiSqlSink extends RelationalSink
logicalPlanVisitorByClass.put(DatasetDefinition.class, new DatasetDefinitionVisitor());
logicalPlanVisitorByClass.put(DerivedDataset.class, new DerivedDatasetVisitor());
logicalPlanVisitorByClass.put(JsonExternalDatasetReference.class, new DatasetReferenceVisitor());
logicalPlanVisitorByClass.put(DatasetAdditionalProperties.class, new DatasetAdditionalPropertiesVisitor());

logicalPlanVisitorByClass.put(Not.class, new NotVisitor());
logicalPlanVisitorByClass.put(And.class, new AndVisitor());
Expand Down
Loading

0 comments on commit 18fa480

Please sign in to comment.