Skip to content

Commit

Permalink
Added transactional sql executor for Big Query via Java API
Browse files Browse the repository at this point in the history
  • Loading branch information
shubham43 authored and prasar-ashutosh committed Jun 13, 2023
1 parent 31e6d5b commit d2c13f1
Show file tree
Hide file tree
Showing 28 changed files with 1,008 additions and 428 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public enum DataType

public static boolean isStringDatatype(DataType type)
{
List<DataType> stringDatatype = new ArrayList<DataType>(Arrays.asList(CHAR, CHARACTER, VARCHAR, LONGVARCHAR, NCHAR, NVARCHAR, LONGNVARCHAR, LONGTEXT, TEXT, JSON, STRING));
List<DataType> stringDatatype = new ArrayList<>(Arrays.asList(CHAR, CHARACTER, VARCHAR, LONGVARCHAR, NCHAR, NVARCHAR, LONGNVARCHAR, LONGTEXT, TEXT, JSON, STRING));
return stringDatatype.contains(type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.finos.legend.engine.persistence.components.relational.bigquery.optmizer.LowerCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.bigquery.optmizer.UpperCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.BigQueryDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.BigQueryJdbcPropertiesToLogicalDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.BigQueryDataTypeToLogicalDataTypeMapping;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.AlterVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.BatchEndTimestampVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.ClusterKeyVisitor;
Expand Down Expand Up @@ -111,34 +111,7 @@ public static RelationalSink get()
return INSTANCE;
}

// TODO #4: Review how can this be done with Simba Driver
public static Connection createConnection(String user,
String pwd,
String jdbcUrl,
String account,
String db,
String schema)
{
Properties properties = new Properties();
properties.put("user", user);
properties.put("password", pwd);
properties.put("account", account);
properties.put("db", db);
properties.put("schema", schema);

try
{
return DriverManager.getConnection(jdbcUrl, properties);
}
catch (SQLException e)
{
throw new IllegalArgumentException(e);
}
}

// TODO #5: Another entry point for API invocation via authentication ?


private BigQuerySink()
{
super(
Expand All @@ -148,19 +121,11 @@ private BigQuerySink()
SqlGenUtils.BACK_QUOTE_IDENTIFIER,
LOGICAL_PLAN_VISITOR_BY_CLASS,
// TODO # 6: Verify implementation of DatasetExists datasetExists,
(executor, sink, dataset) ->
{
//TODO: pass transformer as an argument
RelationalTransformer transformer = new RelationalTransformer(BigQuerySink.get());
LogicalPlan datasetExistLogicalPlan = LogicalPlanFactory.getLogicalPlanForDoesDatasetExist(dataset);
SqlPlan physicalPlanForDoesDatasetExist = transformer.generatePhysicalPlan(datasetExistLogicalPlan);
List<TabularData> results = executor.executePhysicalPlanAndGetResults(physicalPlanForDoesDatasetExist);
return results.size() > 0;
},
(executor, sink, dataset) -> sink.doesTableExist(dataset),
// TODO # 7: Verify implementation of ValidateMainDatasetSchema validateMainDatasetSchema,
(executor, sink, dataset) -> sink.validateDatasetSchema(dataset, new BigQueryDataTypeMapping()),
// TODO # 8: Verify implementation of ConstructDatasetFromDatabase constructDatasetFromDatabase)
(executor, sink, tableName, schemaName, databaseName) -> sink.constructDatasetFromDatabase(tableName, schemaName, databaseName, new BigQueryJdbcPropertiesToLogicalDataTypeMapping()));
(executor, sink, tableName, schemaName, databaseName) -> sink.constructDatasetFromDatabase(tableName, schemaName, databaseName, new BigQueryDataTypeToLogicalDataTypeMapping()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2022 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.bigquery.executor;

import com.google.cloud.bigquery.*;
import org.eclipse.collections.api.factory.Lists;
import org.finos.legend.engine.persistence.components.executor.Executor;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.bigquery.BigQuerySink;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.sqldom.schemaops.statements.DDLStatement;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;

public class BigQueryExecutor implements Executor<SqlGen, TabularData, SqlPlan>
{
private final BigQuerySink bigQuerySink;
private final BigQueryHelper bigQueryHelper;

public BigQueryExecutor(BigQuerySink bigQuerySink, BigQueryHelper bigQueryHelper)
{
this.bigQuerySink = bigQuerySink;
this.bigQueryHelper = bigQueryHelper;
}

@Override
public void executePhysicalPlan(SqlPlan physicalPlan)
{
boolean containsDDLStatements = physicalPlan.ops().stream().anyMatch(DDLStatement.class::isInstance);
List<String> sqlList = physicalPlan.getSqlList();
if(containsDDLStatements) {
for (String sql : sqlList) {
bigQueryHelper.executeQuery(sql);
}
}
}

@Override
public void executePhysicalPlan(SqlPlan physicalPlan, Map<String, String> placeholderKeyValues)
{
List<String> sqlList = physicalPlan.getSqlList();
for (String sql : sqlList)
{
String enrichedSql = getEnrichedSql(placeholderKeyValues, sql);
bigQueryHelper.executeStatement(enrichedSql);
}
}

@Override
public List<TabularData> executePhysicalPlanAndGetResults(SqlPlan physicalPlan)
{
List<TabularData> resultSetList = new ArrayList<>();
for (String sql : physicalPlan.getSqlList())
{
List<Map<String, Object>> queryResult = bigQueryHelper.executeQuery(sql);
if (!queryResult.isEmpty())
{
resultSetList.add(new TabularData(queryResult));
}
}
return resultSetList;
}

@Override
public List<TabularData> executePhysicalPlanAndGetResults(SqlPlan physicalPlan, Map<String, String> placeholderKeyValues)
{
List<TabularData> resultSetList = new ArrayList<>();
for (String sql : physicalPlan.getSqlList())
{
String enrichedSql = getEnrichedSql(placeholderKeyValues, sql);
List<Map<String, Object>> queryResult = bigQueryHelper.executeQuery(enrichedSql);
if (!queryResult.isEmpty())
{
resultSetList.add(new TabularData(queryResult));
}
}
return resultSetList;
}

@Override
public boolean datasetExists(Dataset dataset)
{
return bigQuerySink.datasetExistsFn().apply(this, bigQueryHelper, dataset);
}

@Override
public void validateMainDatasetSchema(Dataset dataset)
{
bigQuerySink.validateMainDatasetSchemaFn().execute(this, bigQueryHelper, dataset);
}

@Override
public Dataset constructDatasetFromDatabase(String tableName, String schemaName, String databaseName)
{
return bigQuerySink.constructDatasetFromDatabaseFn().execute(this, bigQueryHelper, tableName, schemaName, databaseName);
}

@Override
public void begin()
{
bigQueryHelper.beginTransaction();
}

@Override
public void commit()
{
bigQueryHelper.commitTransaction();
}

@Override
public void revert()
{
bigQueryHelper.revertTransaction();
}

@Override
public void close()
{
bigQueryHelper.closeTransactionManager();
}

private String getEnrichedSql(Map<String, String> placeholderKeyValues, String sql)
{
String enrichedSql = sql;
for (Map.Entry<String, String> entry : placeholderKeyValues.entrySet())
{
enrichedSql = enrichedSql.replaceAll(Pattern.quote(entry.getKey()), entry.getValue());
}
return enrichedSql;
}

}
Loading

0 comments on commit d2c13f1

Please sign in to comment.