Skip to content

Commit

Permalink
Added different Visitors for BigQuerySink
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Jun 13, 2023
1 parent 57ecb51 commit c90acfa
Show file tree
Hide file tree
Showing 28 changed files with 553 additions and 875 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.logicalplan.datasets;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.finos.legend.engine.persistence.components.logicalplan.values.Value;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Parameter;
import org.immutables.value.Value.Style;

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public interface PartitionKeyAbstract extends LogicalPlanNode
{
@Parameter(order = 0)
Value key();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface SchemaDefinitionAbstract extends Schema

List<ClusterKey> clusterKeys();

List<PartitionKey> partitionKeys();

Optional<ColumnStoreSpecification> columnStoreSpecification();

Optional<ShardSpecification> shardSpecification();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,11 @@ public enum FunctionName
UPPER,
ROW_NUMBER,
SUBSTRING,
PARSE_JSON;
PARSE_JSON,
DATE,
DATE_TRUNC,
DATETIME_TRUNC,
TIMESTAMP_TRUNC,
RANGE_BUCKET,
GENERATE_ARRAY;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanFactory;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.ClusterKey;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.DataType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.PartitionKey;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Alter;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Create;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Show;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Delete;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Truncate;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchEndTimestamp;
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
Expand All @@ -35,8 +37,11 @@
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.AlterVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.BatchEndTimestampVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.ClusterKeyVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.PartitionKeyVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.SQLCreateVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.SchemaDefinitionVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.DeleteVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.TruncateVisitor;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.utils.SqlGenUtils;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
Expand Down Expand Up @@ -68,20 +73,26 @@ public class BigQuerySink extends AnsiSqlSink
static
{
Set<Capability> capabilities = new HashSet<>();
// TODO #1: To review the capabilities
capabilities.add(Capability.MERGE);
capabilities.add(Capability.ADD_COLUMN);
capabilities.add(Capability.IMPLICIT_DATA_TYPE_CONVERSION);
capabilities.add(Capability.DATA_TYPE_LENGTH_CHANGE);
CAPABILITIES = Collections.unmodifiableSet(capabilities);

// TODO #2: To review the visitors - Support for Clustering,Partition BY, table creation , Schema Definition, Alter table
Map<Class<?>, LogicalPlanVisitor<?>> logicalPlanVisitorByClass = new HashMap<>();
logicalPlanVisitorByClass.put(SchemaDefinition.class, new SchemaDefinitionVisitor());
logicalPlanVisitorByClass.put(Create.class, new SQLCreateVisitor());
logicalPlanVisitorByClass.put(ClusterKey.class, new ClusterKeyVisitor());
logicalPlanVisitorByClass.put(PartitionKey.class, new PartitionKeyVisitor());
logicalPlanVisitorByClass.put(Alter.class, new AlterVisitor());
logicalPlanVisitorByClass.put(Delete.class, new DeleteVisitor());
logicalPlanVisitorByClass.put(Truncate.class, new TruncateVisitor());
logicalPlanVisitorByClass.put(BatchEndTimestamp.class, new BatchEndTimestampVisitor());
LOGICAL_PLAN_VISITOR_BY_CLASS = Collections.unmodifiableMap(logicalPlanVisitorByClass);

// TODO #3: To review the Schema Evolution Support - Implicit & Explicit Types
Map<DataType, Set<DataType>> implicitDataTypeMapping = new HashMap<>();
implicitDataTypeMapping.put(DataType.DECIMAL, new HashSet<>(Arrays.asList(DataType.TINYINT, DataType.SMALLINT, DataType.INTEGER, DataType.INT, DataType.BIGINT, DataType.FLOAT, DataType.DOUBLE, DataType.REAL, DataType.NUMERIC, DataType.NUMBER)));
implicitDataTypeMapping.put(DataType.DOUBLE, new HashSet<>(Arrays.asList(DataType.TINYINT, DataType.SMALLINT, DataType.INTEGER, DataType.INT, DataType.FLOAT, DataType.REAL)));
Expand All @@ -100,6 +111,7 @@ public static RelationalSink get()
return INSTANCE;
}

// TODO #4: Review how can this be done with Simba Driver
public static Connection createConnection(String user,
String pwd,
String jdbcUrl,
Expand All @@ -124,14 +136,18 @@ public static Connection createConnection(String user,
}
}

// TODO #5: Another entry point for API invocation via authentication ?


private BigQuerySink()
{
super(
CAPABILITIES,
IMPLICIT_DATA_TYPE_MAPPING,
EXPLICIT_DATA_TYPE_MAPPING,
SqlGenUtils.QUOTE_IDENTIFIER,
SqlGenUtils.BACK_QUOTE_IDENTIFIER,
LOGICAL_PLAN_VISITOR_BY_CLASS,
// TODO # 6: Verify implementation of DatasetExists datasetExists,
(executor, sink, dataset) ->
{
//TODO: pass transformer as an argument
Expand All @@ -141,7 +157,9 @@ private BigQuerySink()
List<TabularData> results = executor.executePhysicalPlanAndGetResults(physicalPlanForDoesDatasetExist);
return results.size() > 0;
},
// TODO # 7: Verify implementation of ValidateMainDatasetSchema validateMainDatasetSchema,
(executor, sink, dataset) -> sink.validateDatasetSchema(dataset, new BigQueryDataTypeMapping()),
// TODO # 8: Verify implementation of ConstructDatasetFromDatabase constructDatasetFromDatabase)
(executor, sink, tableName, schemaName, databaseName) -> sink.constructDatasetFromDatabase(tableName, schemaName, databaseName, new BigQueryJdbcPropertiesToLogicalDataTypeMapping()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public VisitorResult visit(PhysicalPlanNode prev, BatchEndTimestamp current, Vis
}
else
{
prev.push(new Function(FunctionName.SYSDATE, null, context.quoteIdentifier()));
prev.push(new Function(FunctionName.CURRENT_DATETIME, null, context.quoteIdentifier()));
}
return new VisitorResult();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Delete;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Truncate;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.statements.DeleteStatement;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;

import java.util.ArrayList;
import java.util.List;

public class DeleteVisitor implements LogicalPlanVisitor<Delete>
{

/*
DELETE always needs A WHERE CLAUSE IN BigQuery
If the condition is not provided, default to Truncate
*/

@Override
public VisitorResult visit(PhysicalPlanNode prev, Delete current, VisitorContext context)
{
if (!current.condition().isPresent())
{
return new TruncateVisitor().visit(prev, Truncate.of(current.dataset()), context);
}

DeleteStatement deleteStatement = new DeleteStatement();
prev.push(deleteStatement);

List<LogicalPlanNode> logicalPlanNodeList = new ArrayList<>();
logicalPlanNodeList.add(current.dataset());
if (current.condition().isPresent())
{
logicalPlanNodeList.add(current.condition().get());
}
return new VisitorResult(deleteStatement, logicalPlanNodeList);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.PartitionKey;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.table.PartitionKeyConstraint;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;

import java.util.ArrayList;
import java.util.List;

public class PartitionKeyVisitor implements LogicalPlanVisitor<PartitionKey>
{
@Override
public VisitorResult visit(PhysicalPlanNode prev, PartitionKey current, VisitorContext context)
{
PartitionKeyConstraint partitionKeyConstraint = new PartitionKeyConstraint();
prev.push(partitionKeyConstraint);

List<LogicalPlanNode> logicalPlanNodes = new ArrayList<>();
logicalPlanNodes.add(current.key());

return new VisitorResult(partitionKeyConstraint, logicalPlanNodes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.modifiers.IfNotExistsTableModifier;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Create;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.statements.CreateTable;
import org.finos.legend.engine.persistence.components.relational.bigquery.sqldom.schemaops.statements.CreateTable;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;

Expand All @@ -43,6 +43,13 @@ public VisitorResult visit(PhysicalPlanNode prev, Create current, VisitorContext
logicalPlanNodes.add(IfNotExistsTableModifier.INSTANCE);
}

// Add Partition Keys
if (current.dataset().schema().partitionKeys() != null && !current.dataset().schema().partitionKeys().isEmpty())
{
logicalPlanNodes.addAll(current.dataset().schema().partitionKeys());
}

// Add Clustering Keys
if (current.dataset().schema().clusterKeys() != null && !current.dataset().schema().clusterKeys().isEmpty())
{
logicalPlanNodes.addAll(current.dataset().schema().clusterKeys());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.BigQueryDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.bigquery.sqldom.constraints.columns.PKColumnConstraint;
import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.column.ColumnConstraint;
import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.column.NotNullColumnConstraint;
import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.column.PKColumnConstraint;
import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.column.UniqueColumnConstraint;
import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.table.PrimaryKeyTableConstraint;
import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.table.TableConstraint;
import org.finos.legend.engine.persistence.components.relational.sqldom.schema.DataType;
Expand All @@ -36,6 +35,12 @@

public class SchemaDefinitionVisitor implements LogicalPlanVisitor<SchemaDefinition>
{
/*
IN BigQuery World
Project => Database
Dataset => Schema
Table Name => Table
*/

@Override
public VisitorResult visit(PhysicalPlanNode prev, SchemaDefinition current, VisitorContext context)
Expand All @@ -56,10 +61,6 @@ public VisitorResult visit(PhysicalPlanNode prev, SchemaDefinition current, Visi
{
columnConstraints.add(new PKColumnConstraint());
}
if (f.unique())
{
columnConstraints.add(new UniqueColumnConstraint());
}
Column column = new Column(f.name(), dataType, columnConstraints, context.quoteIdentifier());
for (Optimizer optimizer : context.optimizers())
{
Expand All @@ -70,7 +71,7 @@ public VisitorResult visit(PhysicalPlanNode prev, SchemaDefinition current, Visi

if (pkNum > 1)
{
TableConstraint constraint = new PrimaryKeyTableConstraint(pkFields.stream().map(Field::name).collect(Collectors.toList()), context.quoteIdentifier());
TableConstraint constraint = new PrimaryKeyTableConstraint(pkFields.stream().map(Field::name).collect(Collectors.toList()), context.quoteIdentifier(), true);
for (Optimizer optimizer : context.optimizers())
{
constraint = (TableConstraint) optimizer.optimize(constraint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
package org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor;

import org.finos.legend.engine.persistence.components.logicalplan.operations.Truncate;
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.physicalplan.PhysicalPlanNode;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.statements.DeleteStatement;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.statements.TruncateTable;
import org.finos.legend.engine.persistence.components.transformer.LogicalPlanVisitor;
import org.finos.legend.engine.persistence.components.transformer.VisitorContext;

Expand All @@ -28,9 +29,19 @@ public class TruncateVisitor implements LogicalPlanVisitor<Truncate>
@Override
public VisitorResult visit(PhysicalPlanNode prev, Truncate current, VisitorContext context)
{
DeleteStatement deleteStatement = new DeleteStatement();
prev.push(deleteStatement);
// TODO For partitioned Tables, if the table requires a partition filter, Truncate will fail.
// This should be the approach in that case:
// 1. UPDATE the table to remove the partition filter requirement
// 2. Truncate table
// 3. UPDATE the table to add the partition filter requirement

return new VisitorResult(deleteStatement, Collections.singletonList(current.dataset()));
TruncateTable truncateTable = new TruncateTable();
for (Optimizer optimizer : context.optimizers())
{
truncateTable = (TruncateTable) optimizer.optimize(truncateTable);
}
prev.push(truncateTable);

return new VisitorResult(truncateTable, Collections.singletonList(current.dataset()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.ingestmode;
package org.finos.legend.engine.persistence.components.relational.bigquery.sqldom.constraints.columns;

import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.bigquery.BigQuerySink;
import org.finos.legend.engine.persistence.components.relational.sqldom.constraints.column.ColumnConstraint;

public class AppendOnlyTest extends org.finos.legend.engine.persistence.components.ingestmode.nontemporal.AppendOnlyTest
public class PKColumnConstraint extends ColumnConstraint
{

@Override
public RelationalSink getRelationalSink()
public void genSql(StringBuilder builder)
{
return BigQuerySink.get();
builder.append("PRIMARY KEY NOT ENFORCED");
}
}
Loading

0 comments on commit c90acfa

Please sign in to comment.