Skip to content

Commit

Permalink
Dedicated Visitor for Derived Dataset to staging filtering for all sc…
Browse files Browse the repository at this point in the history
…hemes. Added tests for Big Query
  • Loading branch information
prasar-ashutosh committed Jun 13, 2023
1 parent e68b762 commit 9345f66
Show file tree
Hide file tree
Showing 36 changed files with 541 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,29 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.values.*;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class DatasetFilterAndDeduplicator implements VersioningStrategyVisitor<Dataset>
public class DatasetDeduplicator implements VersioningStrategyVisitor<Dataset>
{

Dataset stagingDataset;
List<String> primaryKeys;
Optional<Condition> stagingDatasetFilter;

private static final String ROW_NUMBER = "legend_persistence_row_num";

public DatasetFilterAndDeduplicator(Dataset stagingDataset, List<String> primaryKeys)
public DatasetDeduplicator(Dataset stagingDataset, List<String> primaryKeys)
{
this.stagingDataset = stagingDataset;
this.primaryKeys = primaryKeys;
this.stagingDatasetFilter = LogicalPlanUtils.getDatasetFilterCondition(stagingDataset);
}

@Override
public Dataset visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
Dataset enrichedStagingDataset = this.stagingDataset;
if (this.stagingDatasetFilter.isPresent())
{
enrichedStagingDataset = filterDataset();
}
return enrichedStagingDataset;
return this.stagingDataset;
}

@Override
Expand Down Expand Up @@ -78,7 +69,6 @@ public Dataset visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStra
Selection selectionWithRowNumber = Selection.builder()
.source(stagingDataset)
.addAllFields(allColumnsWithRowNumber)
.condition(stagingDatasetFilter)
.alias(stagingDataset.datasetReference().alias())
.build();

Expand All @@ -91,22 +81,6 @@ public Dataset visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStra
.alias(stagingDataset.datasetReference().alias())
.build();
}
else if (this.stagingDatasetFilter.isPresent())
{
enrichedStagingDataset = filterDataset();
}
return enrichedStagingDataset;
}

private Dataset filterDataset()
{
List<Value> allColumns = new ArrayList<>(stagingDataset.schemaReference().fieldValues());
Selection selection = Selection.builder()
.source(this.stagingDataset)
.addAllFields(allColumns)
.condition(this.stagingDatasetFilter.get())
.alias(stagingDataset.datasetReference().alias())
.build();
return selection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,10 @@ public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets)
public static LogicalPlan getLogicalPlanForMinAndMaxForField(Dataset dataset, String fieldName)
{
FieldValue field = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(fieldName).build();
Selection.Builder selectionBuilder = Selection.builder()
Selection selection = Selection.builder()
.addFields(FunctionImpl.builder().functionName(FunctionName.MIN).addValue(field).alias(MIN_OF_FIELD).build())
.addFields(FunctionImpl.builder().functionName(FunctionName.MAX).addValue(field).alias(MAX_OF_FIELD).build())
.source(dataset);

Optional<Condition> filterCondition = LogicalPlanUtils.getDatasetFilterCondition(dataset);
if (filterCondition.isPresent())
{
selectionBuilder = selectionBuilder.condition(filterCondition);
}

return LogicalPlan.builder().addOps(selectionBuilder.build()).build();
.source(dataset).build();
return LogicalPlan.builder().addOps(selection).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.finos.legend.engine.persistence.components.common.StatisticName;
import org.finos.legend.engine.persistence.components.ingestmode.NontemporalDelta;
import org.finos.legend.engine.persistence.components.ingestmode.audit.AuditingVisitors;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DatasetFilterAndDeduplicator;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DatasetDeduplicator;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.VersioningConditionVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitors;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
Expand Down Expand Up @@ -93,7 +93,7 @@ class NontemporalDeltaPlanner extends Planner

// Perform Deduplication & Filtering of Staging Dataset
this.enrichedStagingDataset = ingestMode().versioningStrategy()
.accept(new DatasetFilterAndDeduplicator(stagingDataset(), primaryKeys));
.accept(new DatasetDeduplicator(stagingDataset(), primaryKeys));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ protected void addPreRunStatsForRowsDeleted(Map<StatisticName, LogicalPlan> preR

protected void addPostRunStatsForIncomingRecords(Map<StatisticName, LogicalPlan> postRunStatisticsResult)
{
Optional<Condition> filterCondition = LogicalPlanUtils.getDatasetFilterCondition(stagingDataset());
Optional<Condition> filterCondition = Optional.empty();
if (dataSplitExecutionSupported())
{
Optional<Condition> dataSplitInRangeCondition = getDataSplitInRangeConditionForStatistics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.finos.legend.engine.persistence.components.common.Resources;
import org.finos.legend.engine.persistence.components.common.StatisticName;
import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalDelta;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DatasetFilterAndDeduplicator;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DatasetDeduplicator;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.VersioningConditionVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitors;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
Expand Down Expand Up @@ -84,7 +84,7 @@ class UnitemporalDeltaPlanner extends UnitemporalPlanner
this.dataSplitInRangeCondition = ingestMode.dataSplitField().map(field -> LogicalPlanUtils.getDataSplitInRangeCondition(stagingDataset(), field));
// Perform Deduplication & Filtering of Staging Dataset
this.enrichedStagingDataset = ingestMode().versioningStrategy()
.accept(new DatasetFilterAndDeduplicator(stagingDataset(), primaryKeys));
.accept(new DatasetDeduplicator(stagingDataset(), primaryKeys));
this.versioningCondition = ingestMode().versioningStrategy()
.accept(new VersioningConditionVisitor(mainDataset(), stagingDataset(), false, ingestMode().digestField()));
this.inverseVersioningCondition = ingestMode.versioningStrategy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public static Selection getRecordCount(Dataset dataset, String alias)
public static Selection getRecordCount(Dataset dataset, String alias, Optional<Condition> condition)
{
return Selection.builder()
.source(dataset.datasetReference())
.source(dataset)
.addFields(FunctionImpl.builder().functionName(FunctionName.COUNT).alias(alias).addValue(All.INSTANCE).build())
.condition(condition)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,32 @@

package org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors;

import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class DerivedDatasetVisitor implements LogicalPlanVisitor<DerivedDataset>
{
@Override
public VisitorResult visit(PhysicalPlanNode prev, DerivedDataset current, VisitorContext context)
{
return new DatasetReferenceVisitor().visit(prev, current.datasetReference(), context);
Optional<Condition> filterCondition = LogicalPlanUtils.getDatasetFilterCondition(current);
List<Value> allColumns = new ArrayList<>(current.schemaReference().fieldValues());
Selection selection = Selection.builder()
.source(current.datasetReference())
.addAllFields(allColumns)
.condition(filterCondition.get())
.alias(current.datasetReference().alias())
.build();
return new SelectionVisitor().visit(prev, selection, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public VisitorResult visit(PhysicalPlanNode prev, Create current, VisitorContext
prev.push(createTable);

List<LogicalPlanNode> logicalPlanNodes = new ArrayList<>();
logicalPlanNodes.add(current.dataset());
logicalPlanNodes.add(current.dataset().datasetReference());
logicalPlanNodes.add(current.dataset().schema());

if (current.ifNotExists())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
package org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.And;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DerivedDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.statements.SelectStatement;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class SelectionVisitor implements LogicalPlanVisitor<Selection>
{
Expand All @@ -36,7 +41,30 @@ public VisitorResult visit(PhysicalPlanNode prev, Selection current, VisitorCont
prev.push(selectStatement);

List<LogicalPlanNode> logicalPlanNodeList = new ArrayList<>();
current.source().ifPresent(logicalPlanNodeList::add);
List<Condition> conditions = new ArrayList<>();
current.condition().ifPresent(conditions::add);

if (current.source().isPresent())
{
Dataset dataset = current.source().get();
/* Optimize Scenarios where using Derived Dataset:
Convert unnecessary inner queries like this
select id from (select * from table where condition)
to
select id from table where condition
*/
if (dataset instanceof DerivedDataset)
{
DerivedDataset derivedDataset = (DerivedDataset) dataset;
Optional<Condition> filterCondition = LogicalPlanUtils.getDatasetFilterCondition(derivedDataset);
filterCondition.ifPresent(conditions::add);
logicalPlanNodeList.add(derivedDataset.datasetReference());
}
else
{
logicalPlanNodeList.add(dataset);
}
}

if (current.fields().isEmpty())
{
Expand All @@ -54,7 +82,11 @@ public VisitorResult visit(PhysicalPlanNode prev, Selection current, VisitorCont
selectStatement.setLimit(current.limit().get());
}

current.condition().ifPresent(logicalPlanNodeList::add);
if (!conditions.isEmpty())
{
logicalPlanNodeList.add(And.of(conditions));
}

current.groupByFields().ifPresent(logicalPlanNodeList::addAll);
current.quantifier().ifPresent(logicalPlanNodeList::add);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,12 @@ public void verifyNontemporalDeltaWithNoVersionAndStagingFilter(GeneratorResult
"sink.\"amount\" = (SELECT stage.\"amount\" FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (sink.\"digest\" <> stage.\"digest\")) AND ((stage.\"biz_date\" > '2020-01-01') AND (stage.\"biz_date\" < '2020-01-03')))," +
"sink.\"biz_date\" = (SELECT stage.\"biz_date\" FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (sink.\"digest\" <> stage.\"digest\")) AND ((stage.\"biz_date\" > '2020-01-01') AND (stage.\"biz_date\" < '2020-01-03')))," +
"sink.\"digest\" = (SELECT stage.\"digest\" FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (sink.\"digest\" <> stage.\"digest\")) AND ((stage.\"biz_date\" > '2020-01-01') AND (stage.\"biz_date\" < '2020-01-03'))) " +
"WHERE EXISTS (SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"digest\" FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (sink.\"digest\" <> stage.\"digest\")) AND ((stage.\"biz_date\" > '2020-01-01') AND (stage.\"biz_date\" < '2020-01-03')))";
"WHERE EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (sink.\"digest\" <> stage.\"digest\")) AND ((stage.\"biz_date\" > '2020-01-01') AND (stage.\"biz_date\" < '2020-01-03')))";

String insertSql = "INSERT INTO \"mydb\".\"main\" (\"id\", \"name\", \"amount\", \"biz_date\", \"digest\") " +
"(SELECT * FROM (SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"digest\" FROM \"mydb\".\"staging\" as stage WHERE (stage.\"biz_date\" > '2020-01-01') AND (stage.\"biz_date\" < '2020-01-03')) as stage " +
"WHERE NOT (EXISTS (SELECT * FROM \"mydb\".\"main\" as sink WHERE (sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\"))))";
"(SELECT * FROM \"mydb\".\"staging\" as stage WHERE (NOT (EXISTS " +
"(SELECT * FROM \"mydb\".\"main\" as sink WHERE (sink.\"id\" = stage.\"id\") AND " +
"(sink.\"name\" = stage.\"name\")))) AND ((stage.\"biz_date\" > '2020-01-01') AND (stage.\"biz_date\" < '2020-01-03')))";

Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTablePlusDigestCreateQuery, preActionsSqlList.get(0));
Assertions.assertEquals(updateSql, milestoningSqlList.get(0));
Expand Down Expand Up @@ -333,11 +334,12 @@ public void verifyNontemporalDeltaWithMaxVersioningNoDedupAndStagingFilters(Gene
"sink.\"biz_date\" = (SELECT stage.\"biz_date\" FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (stage.\"version\" > sink.\"version\")) AND (stage.\"snapshot_id\" > 18972))," +
"sink.\"digest\" = (SELECT stage.\"digest\" FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (stage.\"version\" > sink.\"version\")) AND (stage.\"snapshot_id\" > 18972))," +
"sink.\"version\" = (SELECT stage.\"version\" FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (stage.\"version\" > sink.\"version\")) AND (stage.\"snapshot_id\" > 18972)) " +
"WHERE EXISTS (SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"digest\",stage.\"version\" FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (stage.\"version\" > sink.\"version\")) AND (stage.\"snapshot_id\" > 18972))";
"WHERE EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage WHERE (((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND (stage.\"version\" > sink.\"version\")) AND (stage.\"snapshot_id\" > 18972))";

String insertSql = "INSERT INTO \"mydb\".\"main\" (\"id\", \"name\", \"amount\", \"biz_date\", \"digest\", \"version\") " +
"(SELECT * FROM (SELECT stage.\"id\",stage.\"name\",stage.\"amount\",stage.\"biz_date\",stage.\"digest\",stage.\"version\" FROM \"mydb\".\"staging\" as stage WHERE stage.\"snapshot_id\" > 18972) as stage " +
"WHERE NOT (EXISTS (SELECT * FROM \"mydb\".\"main\" as sink WHERE (sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\"))))";
"(SELECT * FROM \"mydb\".\"staging\" as stage WHERE (NOT (EXISTS " +
"(SELECT * FROM \"mydb\".\"main\" as sink WHERE (sink.\"id\" = stage.\"id\") AND " +
"(sink.\"name\" = stage.\"name\")))) AND (stage.\"snapshot_id\" > 18972))";

Assertions.assertEquals(AnsiTestArtifacts.expectedBaseTablePlusDigestPlusVersionCreateQuery, preActionsSqlList.get(0));
Assertions.assertEquals(updateSql, milestoningSqlList.get(0));
Expand Down
Loading

0 comments on commit 9345f66

Please sign in to comment.