Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, HoursTransform, IdentityTransform, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, HoursTransform, IdentityTransform, MonthsTransform, SortOrder, Transform, YearsTransform}
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
Expand All @@ -46,7 +47,9 @@ class InMemoryTable(
val name: String,
val schema: StructType,
override val partitioning: Array[Transform],
override val properties: util.Map[String, String])
override val properties: util.Map[String, String],
val distribution: Distribution = Distributions.unspecified(),
val ordering: Array[SortOrder] = Array.empty)
extends Table with SupportsRead with SupportsWrite with SupportsDelete
with SupportsMetadataColumns {

Expand Down Expand Up @@ -274,7 +277,11 @@ class InMemoryTable(
this
}

override def build(): Write = new Write {
override def build(): Write = new Write with RequiresDistributionAndOrdering {
override def requiredDistribution: Distribution = distribution

override def requiredOrdering: Array[SortOrder] = ordering

override def toBatch: BatchWrite = writer

override def toStreaming: StreamingWrite = streamingWriter match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -69,13 +70,24 @@ class BasicInMemoryTableCatalog extends TableCatalog {
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
createTable(ident, schema, partitions, properties, Distributions.unspecified(), Array.empty)
}

def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String],
distribution: Distribution,
ordering: Array[SortOrder]): Table = {
if (tables.containsKey(ident)) {
throw new TableAlreadyExistsException(ident)
}

InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)

val table = new InMemoryTable(s"$name.${ident.quoted}", schema, partitions, properties)
val tableName = s"$name.${ident.quoted}"
val table = new InMemoryTable(tableName, schema, partitions, properties, distribution, ordering)
tables.put(ident, table)
namespaces.putIfAbsent(ident.namespace.toList, Map())
table
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Expression, NamedExpression, NullOrdering, NullsFirst, NullsLast, SortDirection, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RepartitionByExpression, Sort}
import org.apache.spark.sql.connector.distributions.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, IdentityTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortValue}
import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write}
import org.apache.spark.sql.internal.SQLConf

object DistributionAndOrderingUtils {

def prepareQuery(write: Write, query: LogicalPlan, conf: SQLConf): LogicalPlan = write match {
case write: RequiresDistributionAndOrdering =>
val resolver = conf.resolver

val distribution = write.requiredDistribution match {
case d: OrderedDistribution =>
d.ordering.map(e => toCatalyst(e, query, resolver))
case d: ClusteredDistribution =>
d.clustering.map(e => toCatalyst(e, query, resolver))
case _: UnspecifiedDistribution =>
Array.empty[Expression]
}

val queryWithDistribution = if (distribution.nonEmpty) {
val numShufflePartitions = conf.numShufflePartitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a limitation for data sources; for DSv1 they could inject any arbitrary operations, including calling repartition method Dataset provides. Most repartition methods have a parameter "numPartitions". Same for repartitionByRange methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, I have a data source written as DSv1 which provides ability to read the state store in streaming query and rewrite it. While the number of partitions in state store is determined by the number of shuffles in streaming query, the value is not guaranteed to be same across applications.

Furthermore, the data source supports rescale which should repartition to arbitrary number of partitions. It would be weird if I have to say "You should change the Spark configuration to set the target number of partitions."

Copy link
Contributor

@HeartSaVioR HeartSaVioR Jan 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a commit to address this (HeartSaVioR@485c56b), but you deserve the full credit of this PR and I wouldn't like to take a part of your credit.

That said, I prefer to handle this in follow-up JIRA issue (I'll submit a new PR once this PR is merged), but I'm also OK to address this altogether in this PR (I'll submit a new PR to your repo) if we prefer it.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR, I think it is going to be useful for some data sources. I did not want to cover this in first PRs as there was no consensus on the dev list around controlling the number of tasks. That's why I added this topic to non-goals of the design doc.

I think one point to think about is who should control the parallelism. I guess the parallelism should depend on incoming data volume in most cases (except when the number of requested partitions is static, like probably in the case mentioned above). Without having statistics about the number of incoming records or their shape, it will be hard for a data source to determine the right number of partitions.

That being said, I think making that number optional like in your change can be a reasonable starting point. However, I'd like us to think about how this will look like in the future. Should Spark report stats about the incoming batch so that data sources can make a better estimate? How will that API look like?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @aokolnychyi that it should be Spark to decide these physical details (like numShufflePartitions), for better performance. It's an ill-pattern to let the data source to decide it.

BTW why do we use conf.numShufflePartitions here? We can use None so that AQE can decide the number of partitions, which is even better.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I missed the change from @viirya that made it optional, @cloud-fan. I can switch to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is mostly the "static partitions" as I provided as an example like state data source. That's not a matter of whether it's ill-pattern or not, because for the case the ability of restricting the number of partitions is not optional but "required" - the data should be partitioned exactly the same with Spark partitions the rows for hash shuffle, and a partition shouldn't be written concurrently. I don't think end users should do the repartition manually in their queries to not break a thing.

That is easily achievable in DSv1 (I have an implementation based on DSv1 and want to migrate to DSv2) as Spark provides DataFrame to the data source on write. While I don't expect such flexibility for DSv2 (the behavior seems too open), I'm not sure the case is something we'd like to define as "not supported on DSv2 and have to live with DSv1".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even without the static partitioning, there might be something more to think of - data source may know about the physical information of the actual storage which may be some points on optimizing writes, or on opposite way, throttle on parallelism to not doing effective DDOS by ourselves. I guess it's beyond of the scope on this PR, but just wanted to bring this as a food for thought.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Jan 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew this would require discussion so I propose to do this in a follow-up like @HeartSaVioR suggested.

I don’t feel strongly about this point and it would be great to accommodate all use cases to drive the adoption of DS V2. However, we should be really careful and I think we should continue to discuss.

I still believe the parallelism should depend on data volume in a general case. That’s why it is going to be useful only if Spark propagates stats about the incoming batch.

// the conversion to catalyst expressions above produces SortOrder expressions
// for OrderedDistribution and generic expressions for ClusteredDistribution
// this allows RepartitionByExpression to pick either range or hash partitioning
RepartitionByExpression(distribution, query, numShufflePartitions)
Copy link
Member

@sunchao sunchao Jan 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding, is it possible that in certain situations the repartition and/or sort can be avoided? for instance the input data to write is already in the correct shape.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of doing this in the optimizer is that Spark will be smart enough to remove redundant sorts and repartitions. For example, I've extended EliminateSorts to cover more cases in PR #29089.

Also, checkWriteRequirements is written in a way that ensures there are only one shuffle and sort node. We can successfully dedup either local or global sorts now.

The situation with repartition nodes is worse. Specifically, CollapseRepartition runs in the operator optimization batch before we construct writes and it does not cover cases when there are filters or projections in between. That's why tests for duplicating repartition nodes are ignored for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aokolnychyi ! this is very helpful. I guess #30093 , which introduces a physical rule RemoveRedundantSorts, is also useful here. Not sure if the CollapseRepartition issue can be handled in a similar approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've outlined a few options we have in this comment. Let me know what you think, @sunchao.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
query
}

val ordering = write.requiredOrdering.toSeq
.map(e => toCatalyst(e, query, resolver))
.asInstanceOf[Seq[SortOrder]]

val queryWithDistributionAndOrdering = if (ordering.nonEmpty) {
Sort(ordering, global = false, queryWithDistribution)
} else {
queryWithDistribution
}

queryWithDistributionAndOrdering

case _ =>
query
}

private def toCatalyst(
expr: V2Expression,
query: LogicalPlan,
resolver: Resolver): Expression = {

// we cannot perform the resolution in the analyzer since we need to optimize expressions
// in nodes like OverwriteByExpression before constructing a logical write
def resolve(ref: FieldReference): NamedExpression = {
query.resolve(ref.parts, resolver) match {
case Some(attr) => attr
case None => throw new AnalysisException(s"Cannot resolve '$ref' using ${query.output}")
}
}

expr match {
case SortValue(child, direction, nullOrdering) =>
val catalystChild = toCatalyst(child, query, resolver)
SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty)
case IdentityTransform(ref) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious why other transforms are not supported yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot map other transforms to valid Catalyst expressions as Spark is not capable of resolving data source transforms. We need a function catalog for that.

resolve(ref)
case ref: FieldReference =>
resolve(ref)
case _ =>
throw new AnalysisException(s"$expr is not currently supported")
}
}

private def toCatalyst(direction: V2SortDirection): SortDirection = direction match {
case V2SortDirection.ASCENDING => Ascending
case V2SortDirection.DESCENDING => Descending
}

private def toCatalyst(nullOrdering: V2NullOrdering): NullOrdering = nullOrdering match {
case V2NullOrdering.NULLS_FIRST => NullsFirst
case V2NullOrdering.NULLS_LAST => NullsLast
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
case a @ AppendData(r: DataSourceV2Relation, query, options, _, None) =>
val writeBuilder = newWriteBuilder(r.table, query, options)
val write = writeBuilder.build()
a.copy(write = Some(write))
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, conf)
a.copy(write = Some(write), query = newQuery)

case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, options, _, None) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
Expand All @@ -63,7 +64,8 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
throw new SparkException(s"Table does not support overwrite by expression: $table")
}

o.copy(write = Some(write))
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, conf)
o.copy(write = Some(write), query = newQuery)

case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, options, _, None) =>
val table = r.table
Expand All @@ -74,7 +76,8 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
case _ =>
throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
}
o.copy(write = Some(write))
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, conf)
o.copy(write = Some(write), query = newQuery)
}

private def isTruncate(filters: Array[Filter]): Boolean = {
Expand Down
Loading