Skip to content

Commit

Permalink
Persistence Component - Big query support (#1904)
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Jul 4, 2023
1 parent e0cc125 commit 594db31
Show file tree
Hide file tree
Showing 139 changed files with 10,492 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
import org.finos.legend.engine.persistence.components.ingestmode.validitymilestoning.derivation.SourceSpecifiesFromAndThruDateTimeAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.validitymilestoning.derivation.SourceSpecifiesFromDateTimeAbstract;
import org.finos.legend.engine.persistence.components.ingestmode.validitymilestoning.derivation.ValidityDerivationVisitor;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.*;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DatasetDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Field;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.*;
import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.SUPPORTED_DATA_TYPES_FOR_OPTIMIZATION_COLUMNS;
import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.findCommonPrimaryFieldsBetweenMainAndStaging;

public class IngestModeOptimizationColumnHandler implements IngestModeVisitor<IngestMode>
{
Expand Down Expand Up @@ -92,9 +93,9 @@ private List<OptimizationFilter> deriveOptimizationFilters(UnitemporalDeltaAbstr
List<Field> primaryKeys = findCommonPrimaryFieldsBetweenMainAndStaging(datasets.mainDataset(), datasets.stagingDataset());
List<Field> comparablePrimaryKeys = primaryKeys.stream().filter(field -> SUPPORTED_DATA_TYPES_FOR_OPTIMIZATION_COLUMNS.contains(field.type().dataType())).collect(Collectors.toList());
optimizationFilters = new ArrayList<>();
for (Field field: comparablePrimaryKeys)
for (Field field : comparablePrimaryKeys)
{
OptimizationFilter filter = OptimizationFilter.of(field.name(),field.name().toUpperCase() + "_LOWER", field.name().toUpperCase() + "_UPPER");
OptimizationFilter filter = OptimizationFilter.of(field.name(), field.name().toUpperCase() + "_LOWER", field.name().toUpperCase() + "_UPPER");
optimizationFilters.add(filter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,37 @@
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.values.*;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;
import org.finos.legend.engine.persistence.components.logicalplan.values.FieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionImpl;
import org.finos.legend.engine.persistence.components.logicalplan.values.FunctionName;
import org.finos.legend.engine.persistence.components.logicalplan.values.ObjectValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.Order;
import org.finos.legend.engine.persistence.components.logicalplan.values.OrderedField;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.finos.legend.engine.persistence.components.logicalplan.values.WindowFunction;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class DatasetFilterAndDeduplicator implements VersioningStrategyVisitor<Dataset>
public class DatasetDeduplicator implements VersioningStrategyVisitor<Dataset>
{

Dataset stagingDataset;
List<String> primaryKeys;
Optional<Condition> stagingDatasetFilter;

private static final String ROW_NUMBER = "legend_persistence_row_num";

public DatasetFilterAndDeduplicator(Dataset stagingDataset, List<String> primaryKeys)
public DatasetDeduplicator(Dataset stagingDataset, List<String> primaryKeys)
{
this.stagingDataset = stagingDataset;
this.primaryKeys = primaryKeys;
this.stagingDatasetFilter = LogicalPlanUtils.getDatasetFilterCondition(stagingDataset);
}

@Override
public Dataset visitNoVersioningStrategy(NoVersioningStrategyAbstract noVersioningStrategy)
{
Dataset enrichedStagingDataset = this.stagingDataset;
if (this.stagingDatasetFilter.isPresent())
{
enrichedStagingDataset = filterDataset();
}
return enrichedStagingDataset;
return this.stagingDataset;
}

@Override
Expand Down Expand Up @@ -78,7 +76,6 @@ public Dataset visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStra
Selection selectionWithRowNumber = Selection.builder()
.source(stagingDataset)
.addAllFields(allColumnsWithRowNumber)
.condition(stagingDatasetFilter)
.alias(stagingDataset.datasetReference().alias())
.build();

Expand All @@ -91,22 +88,6 @@ public Dataset visitMaxVersionStrategy(MaxVersionStrategyAbstract maxVersionStra
.alias(stagingDataset.datasetReference().alias())
.build();
}
else if (this.stagingDatasetFilter.isPresent())
{
enrichedStagingDataset = filterDataset();
}
return enrichedStagingDataset;
}

private Dataset filterDataset()
{
List<Value> allColumns = new ArrayList<>(stagingDataset.schemaReference().fieldValues());
Selection selection = Selection.builder()
.source(this.stagingDataset)
.addAllFields(allColumns)
.condition(this.stagingDatasetFilter.get())
.alias(stagingDataset.datasetReference().alias())
.build();
return selection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,10 @@ public static LogicalPlan getLogicalPlanForNextBatchId(Datasets datasets)
public static LogicalPlan getLogicalPlanForMinAndMaxForField(Dataset dataset, String fieldName)
{
FieldValue field = FieldValue.builder().datasetRef(dataset.datasetReference()).fieldName(fieldName).build();
Selection.Builder selectionBuilder = Selection.builder()
Selection selection = Selection.builder()
.addFields(FunctionImpl.builder().functionName(FunctionName.MIN).addValue(field).alias(MIN_OF_FIELD).build())
.addFields(FunctionImpl.builder().functionName(FunctionName.MAX).addValue(field).alias(MAX_OF_FIELD).build())
.source(dataset);

Optional<Condition> filterCondition = LogicalPlanUtils.getDatasetFilterCondition(dataset);
if (filterCondition.isPresent())
{
selectionBuilder = selectionBuilder.condition(filterCondition);
}

return LogicalPlan.builder().addOps(selectionBuilder.build()).build();
.source(dataset).build();
return LogicalPlan.builder().addOps(selection).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public enum DataType
LONGNVARCHAR,
UNDEFINED,
INT64,
FLOAT64,
BYTES,
STRING,
BOOL,
LONGTEXT,
Expand All @@ -67,13 +69,13 @@ public enum DataType

public static boolean isStringDatatype(DataType type)
{
List<DataType> stringDatatype = new ArrayList<DataType>(Arrays.asList(CHAR, CHARACTER, VARCHAR, LONGVARCHAR, NCHAR, NVARCHAR, LONGNVARCHAR, LONGTEXT, TEXT, JSON, STRING));
List<DataType> stringDatatype = new ArrayList<>(Arrays.asList(CHAR, CHARACTER, VARCHAR, LONGVARCHAR, NCHAR, NVARCHAR, LONGNVARCHAR, LONGTEXT, TEXT, JSON, STRING));
return stringDatatype.contains(type);
}

public static Set<DataType> getComparableDataTypes()
{
return new HashSet<>(Arrays.asList(INT, INTEGER, BIGINT, TINYINT, SMALLINT, INT64, REAL, DECIMAL, FLOAT, DOUBLE, NUMBER, NUMERIC,
return new HashSet<>(Arrays.asList(INT, INTEGER, BIGINT, TINYINT, SMALLINT, INT64, FLOAT64, REAL, DECIMAL, FLOAT, DOUBLE, NUMBER, NUMERIC,
TIME, TIMESTAMP, TIMESTAMP_NTZ, TIMESTAMP_TZ, TIMESTAMP_LTZ, DATETIME, TIMESTAMPTZ, DATE));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.datasets;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Parameter;
import org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface PartitionKeyAbstract extends LogicalPlanNode
{
@Parameter(order = 0)
Value key();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface SchemaDefinitionAbstract extends Schema

List<ClusterKey> clusterKeys();

List<PartitionKey> partitionKeys();

Optional<ColumnStoreSpecification> columnStoreSpecification();

Optional<ShardSpecification> shardSpecification();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,12 @@ public enum FunctionName
UPPER,
ROW_NUMBER,
SUBSTRING,
PARSE_JSON;
PARSE_JSON,
DATE,
DATE_TRUNC,
DATETIME_TRUNC,
TIMESTAMP_TRUNC,
RANGE_BUCKET,
GENERATE_ARRAY,
PARSE_DATETIME;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.finos.legend.engine.persistence.components.common.StatisticName;
import org.finos.legend.engine.persistence.components.ingestmode.NontemporalDelta;
import org.finos.legend.engine.persistence.components.ingestmode.audit.AuditingVisitors;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DatasetFilterAndDeduplicator;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DatasetDeduplicator;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.VersioningConditionVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitors;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
Expand Down Expand Up @@ -93,7 +93,7 @@ class NontemporalDeltaPlanner extends Planner

// Perform Deduplication & Filtering of Staging Dataset
this.enrichedStagingDataset = ingestMode().versioningStrategy()
.accept(new DatasetFilterAndDeduplicator(stagingDataset(), primaryKeys));
.accept(new DatasetDeduplicator(stagingDataset(), primaryKeys));
}

@Override
Expand Down Expand Up @@ -194,6 +194,12 @@ private Merge getMergeOperation()
versioningCondition = this.versioningCondition;
}

if (ingestMode().auditing().accept(AUDIT_ENABLED))
{
String auditField = ingestMode().auditing().accept(AuditingVisitors.EXTRACT_AUDIT_FIELD).orElseThrow(IllegalStateException::new);
keyValuePairs.add(Pair.of(FieldValue.builder().datasetRef(mainDataset().datasetReference()).fieldName(auditField).build(), batchStartTimestamp));
}

Merge merge = Merge.builder()
.dataset(mainDataset())
.usingDataset(stagingDataset)
Expand All @@ -203,13 +209,6 @@ private Merge getMergeOperation()
.matchedCondition(versioningCondition)
.build();

if (ingestMode().auditing().accept(AUDIT_ENABLED))
{
String auditField = ingestMode().auditing().accept(AuditingVisitors.EXTRACT_AUDIT_FIELD).orElseThrow(IllegalStateException::new);
keyValuePairs.add(Pair.of(FieldValue.builder().datasetRef(mainDataset().datasetReference()).fieldName(auditField).build(), batchStartTimestamp));
merge = merge.withUnmatchedKeyValuePairs(keyValuePairs);
}

return merge;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ protected void addPreRunStatsForRowsDeleted(Map<StatisticName, LogicalPlan> preR

protected void addPostRunStatsForIncomingRecords(Map<StatisticName, LogicalPlan> postRunStatisticsResult)
{
Optional<Condition> filterCondition = LogicalPlanUtils.getDatasetFilterCondition(stagingDataset());
Optional<Condition> filterCondition = Optional.empty();
if (dataSplitExecutionSupported())
{
Optional<Condition> dataSplitInRangeCondition = getDataSplitInRangeConditionForStatistics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.finos.legend.engine.persistence.components.common.Resources;
import org.finos.legend.engine.persistence.components.common.StatisticName;
import org.finos.legend.engine.persistence.components.ingestmode.UnitemporalDelta;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DatasetFilterAndDeduplicator;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.DatasetDeduplicator;
import org.finos.legend.engine.persistence.components.ingestmode.deduplication.VersioningConditionVisitor;
import org.finos.legend.engine.persistence.components.ingestmode.merge.MergeStrategyVisitors;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
Expand Down Expand Up @@ -84,7 +84,7 @@ class UnitemporalDeltaPlanner extends UnitemporalPlanner
this.dataSplitInRangeCondition = ingestMode.dataSplitField().map(field -> LogicalPlanUtils.getDataSplitInRangeCondition(stagingDataset(), field));
// Perform Deduplication & Filtering of Staging Dataset
this.enrichedStagingDataset = ingestMode().versioningStrategy()
.accept(new DatasetFilterAndDeduplicator(stagingDataset(), primaryKeys));
.accept(new DatasetDeduplicator(stagingDataset(), primaryKeys));
this.versioningCondition = ingestMode().versioningStrategy()
.accept(new VersioningConditionVisitor(mainDataset(), stagingDataset(), false, ingestMode().digestField()));
this.inverseVersioningCondition = ingestMode.versioningStrategy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,15 @@ public static void replaceField(List<Value> fieldsList, String oldFieldName, Str
});
}

public static Optional<Condition> getDatasetFilterCondition(Dataset dataSet)
public static Condition getDatasetFilterCondition(DerivedDataset derivedDataset)
{
Optional<Condition> filter = Optional.empty();
if (dataSet instanceof DerivedDataset)
List<DatasetFilter> datasetFilters = derivedDataset.datasetFilters();
List<Condition> conditions = new ArrayList<>();
for (DatasetFilter datasetFilter: datasetFilters)
{
DerivedDataset derivedDataset = (DerivedDataset) dataSet;
List<DatasetFilter> datasetFilters = derivedDataset.datasetFilters();
List<Condition> conditions = new ArrayList<>();
for (DatasetFilter datasetFilter: datasetFilters)
{
conditions.add(datasetFilter.mapFilterToCondition(dataSet.datasetReference()));
}
filter = Optional.of(And.of(conditions));
conditions.add(datasetFilter.mapFilterToCondition(derivedDataset.datasetReference()));
}
return filter;
return And.of(conditions);
}

public static List<DatasetFilter> getDatasetFilters(Dataset dataSet)
Expand Down Expand Up @@ -320,7 +314,7 @@ public static Selection getRecordCount(Dataset dataset, String alias)
public static Selection getRecordCount(Dataset dataset, String alias, Optional<Condition> condition)
{
return Selection.builder()
.source(dataset.datasetReference())
.source(dataset)
.addFields(FunctionImpl.builder().functionName(FunctionName.COUNT).alias(alias).addValue(All.INSTANCE).build())
.condition(condition)
.build();
Expand Down
Loading

0 comments on commit 594db31

Please sign in to comment.