Skip to content

Commit

Permalink
Reorganize the flow of SFW clauses
Browse files Browse the repository at this point in the history
  • Loading branch information
johnedquinn committed Sep 8, 2022
1 parent b081a01 commit ccb9991
Showing 1 changed file with 48 additions and 92 deletions.
140 changes: 48 additions & 92 deletions lang/src/org/partiql/lang/eval/EvaluatingCompiler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1721,7 +1721,7 @@ internal class EvaluatingCompiler(
val letSourceThunks = selectExpr.fromLet?.let { compileLetSources(it) }
val sourceThunks = compileQueryWithoutProjection(selectExpr, fromSourceThunks, letSourceThunks)

val orderByThunk = selectExpr.order?.let { compileOrderByExpression(selectExpr, selectExpr.order, allFromSourceAliases) }
val orderByThunk = selectExpr.order?.let { compileOrderByExpression(selectExpr.order) }
val orderByLocationMeta = selectExpr.order?.metas?.sourceLocation

val offsetThunk = selectExpr.offset?.let { compileAstExpr(it) }
Expand Down Expand Up @@ -1753,32 +1753,32 @@ internal class EvaluatingCompiler(
groupByItems.isEmpty() && !hasAggregateCallSites ->
// Grouping is not needed -- simply project the results from the FROM clause directly.
thunkFactory.thunkEnv(metas) { env ->

val orderedRows = when (orderByThunk) {
null -> sourceThunks(env)
else -> evalOrderBy(sourceThunks(env), orderByThunk, orderByLocationMeta)
}

val projectedRows = orderedRows.map { (joinedValues, projectEnv) ->
val projectedRows = sourceThunks(env).map { (joinedValues, projectEnv) ->
selectProjectionThunk(projectEnv, joinedValues)
}

val quantifiedRows = when (selectExpr.setq ?: PartiqlAst.SetQuantifier.All()) {
// wrap the ExprValue to use ExprValue.equals as the equality
is PartiqlAst.SetQuantifier.Distinct -> projectedRows.filter(createUniqueExprValueFilter())
is PartiqlAst.SetQuantifier.All -> projectedRows
}.let { rowsWithOffsetAndLimit(it, env) }
}

// if order by is specified, return list otherwise bag
val orderedRows = when (orderByThunk) {
null -> quantifiedRows
else -> evalOrderBy(quantifiedRows, orderByThunk, orderByLocationMeta, env.session)
}

val offsetRows = rowsWithOffsetAndLimit(orderedRows, env)

// If order by is specified, return list otherwise bag
when (orderByThunk) {
null -> valueFactory.newBag(
quantifiedRows.map {
offsetRows.map {
// TODO make this expose the ordinal for ordered sequences
// make sure we don't expose the underlying value's name out of a SELECT
it.unnamedValue()
}
)
else -> valueFactory.newList(quantifiedRows.map { it.unnamedValue() })
else -> valueFactory.newList(offsetRows.map { it.unnamedValue() })
}
}
else -> {
Expand Down Expand Up @@ -1819,18 +1819,11 @@ internal class EvaluatingCompiler(
groupByItems.isEmpty() -> { // There are aggregates but no group by items
// Create a closure that groups all the rows in the FROM source into a single group.
thunkFactory.thunkEnv(metas) { env ->
// Evaluate the FROM clause
val orderedRows = when (orderByThunk) {
null -> sourceThunks(env)
else -> evalOrderBy(sourceThunks(env), orderByThunk, orderByLocationMeta)
}

val fromProductions: Sequence<FromProduction> =
rowsWithOffsetAndLimit(orderedRows, env)
val registers = createRegisterBank()
val fromProductions: Sequence<FromProduction> = rowsWithOffsetAndLimit(sourceThunks(env), env)

// note: the group key can be anything here because we only ever have a single
// group when aggregates are used without GROUP BY expression
val registers = createRegisterBank()
val syntheticGroup = Group(valueFactory.nullValue, registers)

// iterate over the values from the FROM clause and populate our
Expand All @@ -1847,10 +1840,14 @@ internal class EvaluatingCompiler(
listOf(syntheticGroup.key)
)

// if order by is specified, return list otherwise bag
val orderedRows = when (orderByThunk) {
null -> sequenceOf(groupResult)
else -> evalOrderBy(sequenceOf(groupResult), orderByThunk, orderByLocationMeta, env.session)
}

when (orderByThunk) {
null -> valueFactory.newBag(listOf(groupResult).asSequence())
else -> valueFactory.newList(listOf(groupResult).asSequence())
null -> valueFactory.newBag(orderedRows)
else -> valueFactory.newList(orderedRows)
}
}
}
Expand Down Expand Up @@ -1916,20 +1913,22 @@ internal class EvaluatingCompiler(
}

val groupByEnvValuePairs = env.groups.mapNotNull { g -> getGroupEnv(env, g.value) to g.value }.asSequence()
val orderedGroupEnvPairs = when (orderByThunk) {
null -> groupByEnvValuePairs
else -> evalOrderBy(groupByEnvValuePairs, orderByThunk, orderByLocationMeta)
}

// generate the final group by projection
val projectedRows = orderedGroupEnvPairs.mapNotNull { (groupByEnv, groupValue) ->
val projectedRows = groupByEnvValuePairs.mapNotNull { (groupByEnv, groupValue) ->
filterHavingAndProject(groupByEnv, groupValue)
}.asSequence().let { rowsWithOffsetAndLimit(it, env) }
}.asSequence()

val offsetLimitRows = rowsWithOffsetAndLimit(projectedRows, env)

val orderedRows = when (orderByThunk) {
null -> offsetLimitRows
else -> evalOrderBy(offsetLimitRows, orderByThunk, orderByLocationMeta, env.session)
}

// if order by is specified, return list otherwise bag
when (orderByThunk) {
null -> valueFactory.newBag(projectedRows)
else -> valueFactory.newList(projectedRows)
null -> valueFactory.newBag(orderedRows)
else -> valueFactory.newList(orderedRows)
}
}
}
Expand Down Expand Up @@ -2082,48 +2081,13 @@ internal class EvaluatingCompiler(
GroupKeyExprValue(valueFactory.ion, keyValues.asSequence(), uniqueNames)
}

/**
* Returns a modified environment where the local bindings contain the Projection Aliases
*/
private fun getOrderByEnv(projections: List<ProjectionElement>) = { env: Environment ->
val bindings = Bindings.buildLazyBindings<ExprValue> {
projections.forEach { projection ->
if (projection is SingleProjectionElement && projection.name.type == ExprValueType.STRING) {
addBinding(projection.name.stringValue()) {
projection.thunk.invoke(env)
}
}
}
}
env.nest(bindings)
}

/**
* Modifies the current environment to include the projection bindings and returns a compiled ORDER BY clause.
*/
private fun compileOrderByExpression(select: PartiqlAst.Expr.Select, order: PartiqlAst.OrderBy, fromAliases: Set<String>): List<CompiledOrderByItem> {
val projections = when (select.project) {
is PartiqlAst.Projection.ProjectList -> {
nestCompilationContext(ExpressionContext.SELECT_LIST, fromAliases) {
compileSelectListToProjectionElements(select.project)
}
}
else -> emptyList()
}
private fun compileOrderByExpression(order: PartiqlAst.OrderBy): List<CompiledOrderByItem> {
return order.sortSpecs.map {
it.orderingSpec
?: errNoContext(
"SortSpec.orderingSpec was not specified",
errorCode = ErrorCode.INTERNAL_ERROR,
internal = true
)

it.nullsSpec
?: errNoContext(
"SortSpec.nullsSpec was not specified",
errorCode = ErrorCode.INTERNAL_ERROR,
internal = true
)
it.orderingSpec ?: errNoContext("SortSpec.orderingSpec was not specified", ErrorCode.INTERNAL_ERROR, true)
it.nullsSpec ?: errNoContext("SortSpec.nullsSpec was not specified", ErrorCode.INTERNAL_ERROR, true)

val comparator = when (it.orderingSpec) {
is PartiqlAst.OrderingSpec.Asc ->
Expand All @@ -2138,35 +2102,27 @@ internal class EvaluatingCompiler(
}
}

CompiledOrderByItem(
comparator,
thunkFactory.thunkEnv(order.metas) { env ->
val orderByEnv = getOrderByEnv(projections)(env)
nestCompilationContext(ExpressionContext.NORMAL, emptySet()) {
compileAstExpr(it.expr)(orderByEnv)
}
}
)
CompiledOrderByItem(comparator, compileAstExpr(it.expr))
}
}

private fun <T> evalOrderBy(
rows: Sequence<T>,
orderByItems: List<CompiledOrderByItem>,
offsetLocationMeta: SourceLocationMeta?
offsetLocationMeta: SourceLocationMeta?,
session: EvaluationSession = EvaluationSession.standard()
): Sequence<T> {
val initialComparator: Comparator<T>? = null
val resultComparator = orderByItems.interruptibleFold(initialComparator) { intermediateComparator, orderByItem ->
if (intermediateComparator == null) {
return@interruptibleFold compareBy<T, ExprValue>(orderByItem.comparator) { row ->
val env = resolveEnvironment(row, offsetLocationMeta)
when (intermediateComparator) {
null -> return@interruptibleFold compareBy<T, ExprValue>(orderByItem.comparator) { row ->
val env = resolveEnvironment(row, offsetLocationMeta, session)
orderByItem.thunk(env)
}
else -> return@interruptibleFold intermediateComparator.thenBy(orderByItem.comparator) { row ->
val env = resolveEnvironment(row, offsetLocationMeta, session)
orderByItem.thunk(env)
}
}

return@interruptibleFold intermediateComparator.thenBy(orderByItem.comparator) { row ->
val env = resolveEnvironment(row, offsetLocationMeta)
orderByItem.thunk(env)
}
} ?: err(
"Order BY comparator cannot be null",
Expand All @@ -2178,9 +2134,9 @@ internal class EvaluatingCompiler(
return rows.sortedWith(resultComparator)
}

private fun <T> resolveEnvironment(envWrapper: T, offsetLocationMeta: SourceLocationMeta?): Environment {
private fun <T> resolveEnvironment(envWrapper: T, offsetLocationMeta: SourceLocationMeta?, session: EvaluationSession = EvaluationSession.standard()): Environment {
return when (envWrapper) {
is FromProduction -> envWrapper.env
is ExprValue -> Environment(envWrapper.bindings.delegate(session.globals), session = session)
is Pair<*, *> -> {
if (envWrapper.first is Environment) {
envWrapper.first as Environment
Expand Down

0 comments on commit ccb9991

Please sign in to comment.