Skip to content

Commit

Permalink
Format all Sink Classes
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Jun 8, 2023
1 parent 6a23b0a commit 6320261
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,44 +235,44 @@ public static RelationalSink get()
private AnsiSqlSink()
{
super(
Collections.emptySet(),
Collections.emptyMap(),
Collections.emptyMap(),
SqlGenUtils.QUOTE_IDENTIFIER,
LOGICAL_PLAN_VISITOR_BY_CLASS,
(x, y, z) ->
{
throw new UnsupportedOperationException();
},
(x, y, z) ->
{
throw new UnsupportedOperationException();
},
(v, w, x, y, z) ->
{
throw new UnsupportedOperationException();
});
Collections.emptySet(),
Collections.emptyMap(),
Collections.emptyMap(),
SqlGenUtils.QUOTE_IDENTIFIER,
LOGICAL_PLAN_VISITOR_BY_CLASS,
(x, y, z) ->
{
throw new UnsupportedOperationException();
},
(x, y, z) ->
{
throw new UnsupportedOperationException();
},
(v, w, x, y, z) ->
{
throw new UnsupportedOperationException();
});
}

protected AnsiSqlSink(
Set<Capability> capabilities,
Map<DataType, Set<DataType>> implicitDataTypeMapping,
Map<DataType, Set<DataType>> nonBreakingDataTypeMapping,
String quoteIdentifier,
Map<Class<?>, LogicalPlanVisitor<?>> logicalPlanVisitorByClass,
DatasetExists datasetExists,
ValidateMainDatasetSchema validateMainDatasetSchema,
ConstructDatasetFromDatabase constructDatasetFromDatabase)
Set<Capability> capabilities,
Map<DataType, Set<DataType>> implicitDataTypeMapping,
Map<DataType, Set<DataType>> nonBreakingDataTypeMapping,
String quoteIdentifier,
Map<Class<?>, LogicalPlanVisitor<?>> logicalPlanVisitorByClass,
DatasetExists datasetExists,
ValidateMainDatasetSchema validateMainDatasetSchema,
ConstructDatasetFromDatabase constructDatasetFromDatabase)
{
super(
capabilities,
implicitDataTypeMapping,
nonBreakingDataTypeMapping,
quoteIdentifier,
rightBiasedUnion(LOGICAL_PLAN_VISITOR_BY_CLASS, logicalPlanVisitorByClass),
datasetExists,
validateMainDatasetSchema,
constructDatasetFromDatabase);
capabilities,
implicitDataTypeMapping,
nonBreakingDataTypeMapping,
quoteIdentifier,
rightBiasedUnion(LOGICAL_PLAN_VISITOR_BY_CLASS, logicalPlanVisitorByClass),
datasetExists,
validateMainDatasetSchema,
constructDatasetFromDatabase);
}

@Override
Expand Down Expand Up @@ -306,4 +306,4 @@ private static Map<Class<?>, LogicalPlanVisitor<?>> rightBiasedUnion(Map<Class<?
union.putAll(map2);
return union;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -682,4 +682,4 @@ void testSnapshotMilestoningWithNullableColumnMissingInStagingTable()
List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.assertEquals(0, sqlsForSchemaEvolution.size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutor;
import org.finos.legend.engine.persistence.components.relational.jdbc.JdbcHelper;
import org.finos.legend.engine.persistence.components.relational.sql.TabularData;
import org.finos.legend.engine.persistence.components.relational.sqldom.SqlGen;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
Expand All @@ -52,6 +54,7 @@
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Style;

import java.sql.Connection;
import java.sql.Date;
import java.time.Clock;
import java.time.LocalDateTime;
Expand All @@ -72,11 +75,11 @@

@Immutable
@Style(
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
typeAbstract = "*Abstract",
typeImmutable = "*",
jdkOnly = true,
optionalAcceptNullable = true,
strictBuilder = true
)
public abstract class RelationalIngestorAbstract
{
Expand Down Expand Up @@ -135,18 +138,18 @@ public Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet()
protected PlannerOptions plannerOptions()
{
return PlannerOptions.builder()
.cleanupStagingData(cleanupStagingData())
.collectStatistics(collectStatistics())
.enableSchemaEvolution(enableSchemaEvolution())
.build();
.cleanupStagingData(cleanupStagingData())
.collectStatistics(collectStatistics())
.enableSchemaEvolution(enableSchemaEvolution())
.build();
}

@Derived
protected TransformOptions transformOptions()
{
TransformOptions.Builder builder = TransformOptions.builder()
.executionTimestampClock(executionTimestampClock())
.batchIdPattern(BATCH_ID_PATTERN);
.executionTimestampClock(executionTimestampClock())
.batchIdPattern(BATCH_ID_PATTERN);

relationalSink().optimizerForCaseConversion(caseConversion()).ifPresent(builder::addOptimizers);

Expand Down Expand Up @@ -212,17 +215,17 @@ private List<IngestorResult> ingest(RelationalConnection connection, Datasets da

// generate sql plans
RelationalGenerator generator = RelationalGenerator.builder()
.ingestMode(enrichedIngestMode)
.relationalSink(relationalSink())
.cleanupStagingData(cleanupStagingData())
.collectStatistics(collectStatistics())
.enableSchemaEvolution(enableSchemaEvolution())
.addAllSchemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet())
.caseConversion(caseConversion())
.executionTimestampClock(executionTimestampClock())
.batchStartTimestampPattern(BATCH_START_TS_PATTERN)
.batchIdPattern(BATCH_ID_PATTERN)
.build();
.ingestMode(enrichedIngestMode)
.relationalSink(relationalSink())
.cleanupStagingData(cleanupStagingData())
.collectStatistics(collectStatistics())
.enableSchemaEvolution(enableSchemaEvolution())
.addAllSchemaEvolutionCapabilitySet(schemaEvolutionCapabilitySet())
.caseConversion(caseConversion())
.executionTimestampClock(executionTimestampClock())
.batchStartTimestampPattern(BATCH_START_TS_PATTERN)
.batchIdPattern(BATCH_ID_PATTERN)
.build();

Planner planner = Planners.get(updatedDatasets, enrichedIngestMode, plannerOptions());
GeneratorResult generatorResult = generator.generateOperations(updatedDatasets, resourcesBuilder.build(), planner, enrichedIngestMode);
Expand All @@ -245,7 +248,7 @@ private List<IngestorResult> ingest(RelationalConnection connection, Datasets da
}

private List<IngestorResult> performIngestion(Datasets datasets, Transformer<SqlGen, SqlPlan> transformer, Planner planner, Executor<SqlGen,
TabularData, SqlPlan> executor, GeneratorResult generatorResult, List<DataSplitRange> dataSplitRanges, IngestMode ingestMode)
TabularData, SqlPlan> executor, GeneratorResult generatorResult, List<DataSplitRange> dataSplitRanges, IngestMode ingestMode)
{
try
{
Expand All @@ -261,12 +264,12 @@ private List<IngestorResult> performIngestion(Datasets datasets, Transformer<Sql
// Load main table, extract stats and update metadata table
Map<StatisticName, Object> statisticsResultMap = loadData(executor, generatorResult, placeHolderKeyValues);
IngestorResult result = IngestorResult.builder()
.putAllStatisticByName(statisticsResultMap)
.updatedDatasets(datasets)
.batchId(Optional.ofNullable(placeHolderKeyValues.containsKey(BATCH_ID_PATTERN) ? Integer.valueOf(placeHolderKeyValues.get(BATCH_ID_PATTERN)) : null))
.dataSplitRange(dataSplitRange)
.schemaEvolutionSql(generatorResult.schemaEvolutionSql())
.build();
.putAllStatisticByName(statisticsResultMap)
.updatedDatasets(datasets)
.batchId(Optional.ofNullable(placeHolderKeyValues.containsKey(BATCH_ID_PATTERN) ? Integer.valueOf(placeHolderKeyValues.get(BATCH_ID_PATTERN)) : null))
.dataSplitRange(dataSplitRange)
.schemaEvolutionSql(generatorResult.schemaEvolutionSql())
.build();
results.add(result);
dataSplitIndex++;
}
Expand All @@ -291,12 +294,12 @@ private Map<StatisticName, Object> loadData(Executor<SqlGen, TabularData, SqlPla
{
// Extract preIngest Statistics
Map<StatisticName, Object> statisticsResultMap = new HashMap<>(
executeStatisticsPhysicalPlan(executor, generatorResult.preIngestStatisticsSqlPlan(), placeHolderKeyValues));
executeStatisticsPhysicalPlan(executor, generatorResult.preIngestStatisticsSqlPlan(), placeHolderKeyValues));
// Execute ingest SqlPlan
executor.executePhysicalPlan(generatorResult.ingestSqlPlan(), placeHolderKeyValues);
// Extract postIngest Statistics
statisticsResultMap.putAll(
executeStatisticsPhysicalPlan(executor, generatorResult.postIngestStatisticsSqlPlan(), placeHolderKeyValues));
executeStatisticsPhysicalPlan(executor, generatorResult.postIngestStatisticsSqlPlan(), placeHolderKeyValues));
// Execute metadata ingest SqlPlan
if (generatorResult.metadataIngestSqlPlan().isPresent())
{
Expand All @@ -311,10 +314,10 @@ private Datasets importExternalDataset(IngestMode ingestMode, Datasets datasets,
DatasetReference mainDataSetReference = datasets.mainDataset().datasetReference();

externalDatasetReference = externalDatasetReference
.withName(externalDatasetReference.name().isPresent() ? externalDatasetReference.name().get() : LogicalPlanUtils.generateTableNameWithSuffix(mainDataSetReference.name().orElseThrow(IllegalStateException::new), STAGING))
.withDatabase(externalDatasetReference.database().isPresent() ? externalDatasetReference.database().get() : mainDataSetReference.database().orElse(null))
.withGroup(externalDatasetReference.group().isPresent() ? externalDatasetReference.group().get() : mainDataSetReference.group().orElse(null))
.withAlias(externalDatasetReference.alias().isPresent() ? externalDatasetReference.alias().get() : mainDataSetReference.alias().orElseThrow(RuntimeException::new) + UNDERSCORE + STAGING);
.withName(externalDatasetReference.name().isPresent() ? externalDatasetReference.name().get() : LogicalPlanUtils.generateTableNameWithSuffix(mainDataSetReference.name().orElseThrow(IllegalStateException::new), STAGING))
.withDatabase(externalDatasetReference.database().isPresent() ? externalDatasetReference.database().get() : mainDataSetReference.database().orElse(null))
.withGroup(externalDatasetReference.group().isPresent() ? externalDatasetReference.group().get() : mainDataSetReference.group().orElse(null))
.withAlias(externalDatasetReference.alias().isPresent() ? externalDatasetReference.alias().get() : mainDataSetReference.alias().orElseThrow(RuntimeException::new) + UNDERSCORE + STAGING);

// TODO : Auto infer schema in future

Expand Down Expand Up @@ -354,12 +357,12 @@ private boolean datasetEmpty(Dataset dataset, Transformer<SqlGen, SqlPlan> trans
List<TabularData> results = executor.executePhysicalPlanAndGetResults(physicalPlanForCheckIsDataSetEmpty);

String value = String.valueOf(results.stream()
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.map(Map::values)
.flatMap(t -> t.stream().findFirst())
.orElseThrow(IllegalStateException::new));
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.map(Map::values)
.flatMap(t -> t.stream().findFirst())
.orElseThrow(IllegalStateException::new));
return !value.equals(TABLE_IS_NON_EMPTY);
}

Expand All @@ -376,17 +379,17 @@ private Map<StatisticName, Object> executeStatisticsPhysicalPlan(Executor<SqlGen
Map<String, String> placeHolderKeyValues)
{
return statisticsSqlPlan.keySet()
.stream()
.collect(Collectors.toMap(
k -> k,
k -> executor.executePhysicalPlanAndGetResults(statisticsSqlPlan.get(k), placeHolderKeyValues)
.stream()
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.map(Map::values)
.flatMap(t -> t.stream().findFirst())
.orElseThrow(IllegalStateException::new)));
.stream()
.collect(Collectors.toMap(
k -> k,
k -> executor.executePhysicalPlanAndGetResults(statisticsSqlPlan.get(k), placeHolderKeyValues)
.stream()
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.map(Map::values)
.flatMap(t -> t.stream().findFirst())
.orElseThrow(IllegalStateException::new)));
}

private Map<String, String> extractPlaceHolderKeyValues(Datasets datasets, Executor<SqlGen, TabularData, SqlPlan> executor,
Expand Down Expand Up @@ -432,19 +435,19 @@ else if (lowerBound instanceof Number)
}

private Optional<Long> getNextBatchId(Datasets datasets, Executor<SqlGen, TabularData, SqlPlan> executor,
Transformer<SqlGen, SqlPlan> transformer, IngestMode ingestMode)
Transformer<SqlGen, SqlPlan> transformer, IngestMode ingestMode)
{
if (ingestMode.accept(IngestModeVisitors.IS_INGEST_MODE_TEMPORAL))
{
LogicalPlan logicalPlanForNextBatchId = LogicalPlanFactory.getLogicalPlanForNextBatchId(datasets);
List<TabularData> tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForNextBatchId));
Optional<Object> nextBatchId = Optional.ofNullable(tabularData.stream()
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.map(Map::values)
.flatMap(t -> t.stream().findFirst())
.orElseThrow(IllegalStateException::new));
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.map(Map::values)
.flatMap(t -> t.stream().findFirst())
.orElseThrow(IllegalStateException::new));
if (nextBatchId.isPresent())
{
if (nextBatchId.get() instanceof Integer)
Expand All @@ -461,7 +464,7 @@ private Optional<Long> getNextBatchId(Datasets datasets, Executor<SqlGen, Tabula
}

private Optional<Map<OptimizationFilter, Pair<Object, Object>>> getOptimizationFilterBounds(Datasets datasets, Executor<SqlGen, TabularData, SqlPlan> executor,
Transformer<SqlGen, SqlPlan> transformer, IngestMode ingestMode)
Transformer<SqlGen, SqlPlan> transformer, IngestMode ingestMode)
{
List<OptimizationFilter> filters = ingestMode.accept(IngestModeVisitors.RETRIEVE_OPTIMIZATION_FILTERS);
if (!filters.isEmpty())
Expand All @@ -472,10 +475,10 @@ private Optional<Map<OptimizationFilter, Pair<Object, Object>>> getOptimizationF
LogicalPlan logicalPlanForMinAndMaxForField = LogicalPlanFactory.getLogicalPlanForMinAndMaxForField(datasets.stagingDataset(), filter.fieldName());
List<TabularData> tabularData = executor.executePhysicalPlanAndGetResults(transformer.generatePhysicalPlan(logicalPlanForMinAndMaxForField));
Map<String, Object> resultMap = tabularData.stream()
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.orElseThrow(IllegalStateException::new);
.findFirst()
.map(TabularData::getData)
.flatMap(t -> t.stream().findFirst())
.orElseThrow(IllegalStateException::new);
// Put into map only when not null
Object lower = resultMap.get(MIN_OF_FIELD);
Object upper = resultMap.get(MAX_OF_FIELD);
Expand All @@ -488,4 +491,4 @@ private Optional<Map<OptimizationFilter, Pair<Object, Object>>> getOptimizationF
}
return Optional.empty();
}
}
}
Loading

0 comments on commit 6320261

Please sign in to comment.