Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private[kafka010] class KafkaSource(
currentPartitionOffsets = Some(untilPartitionOffsets)
}

sqlContext.internalCreateDataFrame(rdd, schema)
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
}

/** Stop this source and free any resources it has allocated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1175,14 +1175,14 @@ object DecimalAggregates extends Rule[LogicalPlan] {
*/
object ConvertToLocalRelation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Project(projectList, LocalRelation(output, data))
case Project(projectList, LocalRelation(output, data, isStreaming))
if !projectList.exists(hasUnevaluableExpr) =>
val projection = new InterpretedProjection(projectList, output)
projection.initialize(0)
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
LocalRelation(projectList.map(_.toAttribute), data.map(projection), isStreaming)

case Limit(IntegerLiteral(limit), LocalRelation(output, data)) =>
LocalRelation(output, data.take(limit))
case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)
}

private def hasUnevaluableExpr(expr: Expression): Boolean = {
Expand All @@ -1207,7 +1207,7 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
*/
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Deduplicate(keys, child, streaming) if !streaming =>
case Deduplicate(keys, child) if !child.isStreaming =>
val keyExprIds = keys.map(_.exprId)
val aggCols = child.output.map { attr =>
if (keyExprIds.contains(attr.exprId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ object LocalRelation {
}
}

case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
case class LocalRelation(output: Seq[Attribute],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docs to explain what isStreaming is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. (I think this is a correct summary?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure this is same as the updated isStreaming docs (see my other comments)

data: Seq[InternalRow] = Nil,
// Indicates whether this relation has data from a streaming source.
override val isStreaming: Boolean = false)
extends LeafNode with analysis.MultiInstanceRelation {

// A local relation must have resolved output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ abstract class LogicalPlan
*/
def analyzed: Boolean = _analyzed

/** Returns true if this subtree contains any streaming data sources. */
/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming == true)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,10 @@ case class Sort(

/** Factory for constructing new `Range` nodes. */
object Range {
def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): Range = {
def apply(start: Long, end: Long, step: Long,
numSlices: Option[Int], isStreaming: Boolean = false): Range = {
val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
new Range(start, end, step, numSlices, output)
new Range(start, end, step, numSlices, output, isStreaming)
}
def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
Range(start, end, step, Some(numSlices))
Expand All @@ -443,7 +444,8 @@ case class Range(
end: Long,
step: Long,
numSlices: Option[Int],
output: Seq[Attribute])
output: Seq[Attribute],
override val isStreaming: Boolean)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can a Range have data from a streaming source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's necessarily a reason it shouldn't be able to; streaming sources are free to define getBatch() however they'd like.

Right now the only source actually doing that is a fake source in StreamSuite.

extends LeafNode with MultiInstanceRelation {

require(step != 0, s"step ($step) cannot be 0")
Expand Down Expand Up @@ -784,8 +786,7 @@ case class OneRowRelation() extends LeafNode {
/** A logical plan for `dropDuplicates`. */
case class Deduplicate(
keys: Seq[Attribute],
child: LogicalPlan,
streaming: Boolean) extends UnaryNode {
child: LogicalPlan) extends UnaryNode {

override def output: Seq[Attribute] = child.output
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
val table = UnresolvedInlineTable(Seq("c1"),
Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
val withTimeZone = ResolveTimeZone(conf).apply(table)
val LocalRelation(output, data) = ResolveInlineTables(conf).apply(withTimeZone)
val LocalRelation(output, data, _) = ResolveInlineTables(conf).apply(withTimeZone)
val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
.withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
assert(output.map(_.dataType) == Seq(TimestampType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,18 +368,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
Aggregate(
Seq(attributeWithWatermark),
aggExprs("c"),
Deduplicate(Seq(att), streamRelation, streaming = true)),
Deduplicate(Seq(att), streamRelation)),
outputMode = Append)

assertNotSupportedInStreamingPlan(
"Deduplicate - Deduplicate on streaming relation after aggregation",
Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation), streaming = true),
Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation)),
outputMode = Complete,
expectedMsgs = Seq("dropDuplicates"))

assertSupportedInStreamingPlan(
"Deduplicate - Deduplicate on batch relation inside a streaming query",
Deduplicate(Seq(att), batchRelation, streaming = false),
Deduplicate(Seq(att), batchRelation),
outputMode = Append
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class ReplaceOperatorSuite extends PlanTest {
val input = LocalRelation('a.int, 'b.int)
val attrA = input.output(0)
val attrB = input.output(1)
val query = Deduplicate(Seq(attrA), input, streaming = false) // dropDuplicates("a")
val query = Deduplicate(Seq(attrA), input) // dropDuplicates("a")
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Expand All @@ -95,9 +95,9 @@ class ReplaceOperatorSuite extends PlanTest {
}

test("don't replace streaming Deduplicate") {
val input = LocalRelation('a.int, 'b.int)
val input = LocalRelation(Seq('a.int, 'b.int), isStreaming = true)
val attrA = input.output(0)
val query = Deduplicate(Seq(attrA), input, streaming = true) // dropDuplicates("a")
val query = Deduplicate(Seq(attrA), input) // dropDuplicates("a")
val optimized = Optimize.execute(query.analyze)

comparePlans(optimized, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ class LogicalPlanSuite extends SparkFunSuite {

test("isStreaming") {
val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
val incrementalRelation = new LocalRelation(
Seq(AttributeReference("a", IntegerType, nullable = true)())) {
override def isStreaming(): Boolean = true
}
val incrementalRelation = LocalRelation(
Seq(AttributeReference("a", IntegerType, nullable = true)()), isStreaming = true)

case class TestBinaryRelation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

Dataset.ofRows(
sparkSession,
LogicalRDD(schema.toAttributes, parsed)(sparkSession))
LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession))
}

/**
Expand Down Expand Up @@ -473,7 +473,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

Dataset.ofRows(
sparkSession,
LogicalRDD(schema.toAttributes, parsed)(sparkSession))
LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
case (true, SaveMode.Overwrite) =>
// Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
case LogicalRelation(src: BaseRelation, _, _, _) => src
case relation: HiveTableRelation => relation.tableMeta.identifier
}

val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
EliminateSubqueryAliases(tableRelation) match {
// check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
// check hive table relation when overwrite mode
Expand Down
7 changes: 4 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ class Dataset[T] private[sql](
logicalPlan.output,
internalRdd,
outputPartitioning,
physicalPlan.outputOrdering
physicalPlan.outputOrdering,
isStreaming
)(sparkSession)).as[T]
}

Expand Down Expand Up @@ -2233,7 +2234,7 @@ class Dataset[T] private[sql](
}
cols
}
Deduplicate(groupCols, logicalPlan, isStreaming)
Deduplicate(groupCols, logicalPlan)
}

/**
Expand Down Expand Up @@ -2993,7 +2994,7 @@ class Dataset[T] private[sql](
*/
def inputFiles: Array[String] = {
val files: Seq[String] = queryExecution.optimizedPlan.collect {
case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) =>
fsBasedRelation.inputFiles
case fr: FileRelation =>
fr.inputFiles
Expand Down
7 changes: 5 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,11 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* converted to Catalyst rows.
*/
private[sql]
def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: StructType) = {
sparkSession.internalCreateDataFrame(catalystRows, schema)
def internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType,
isStreaming: Boolean = false) = {
sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,14 @@ class SparkSession private(
*/
private[sql] def internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType): DataFrame = {
schema: StructType,
isStreaming: Boolean = false): DataFrame = {
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
val logicalPlan = LogicalRDD(
schema.toAttributes,
catalystRows,
isStreaming = isStreaming)(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just isStreaming is fine. isStreaming = isStreaming is overkill. Its only useful when the value is a constant. E.g. isStreaming = true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary here because there are two other default arguments in the constructor.

Dataset.ofRows(self, logicalPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ case class LogicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
outputPartitioning: Partitioning = UnknownPartitioning(0),
outputOrdering: Seq[SortOrder] = Nil)(session: SparkSession)
outputOrdering: Seq[SortOrder] = Nil,
override val isStreaming: Boolean = false)(session: SparkSession)
extends LeafNode with MultiInstanceRelation {

override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil
Expand All @@ -150,11 +151,12 @@ case class LogicalRDD(
output.map(rewrite),
rdd,
rewrittenPartitioning,
rewrittenOrdering
rewrittenOrdering,
isStreaming
)(session).asInstanceOf[this.type]
}

override protected def stringArgs: Iterator[Any] = Iterator(output)
override protected def stringArgs: Iterator[Any] = Iterator(output, isStreaming)

override def computeStats(): Statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
child transform {
case plan if plan eq relation =>
relation match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
val partitionData = fsRelation.location.listFiles(Nil, Nil)
LocalRelation(partAttrs, partitionData.map(_.values))
LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)

case relation: HiveTableRelation =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
Expand Down Expand Up @@ -130,7 +130,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
object PartitionedRelation {

def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _)
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
if fsRelation.partitionSchema.nonEmpty =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
Some((AttributeSet(partAttrs), l))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,14 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

/**
* Used to plan aggregation queries that are computed incrementally as part of a
* Used to plan streaming aggregation queries that are computed incrementally as part of a
* [[StreamingQuery]]. Currently this rule is injected into the planner
* on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]]
*/
object StatefulAggregationStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case _ if !plan.isStreaming => Nil

case EventTimeWatermark(columnName, delay, child) =>
EventTimeWatermarkExec(columnName, delay, planLater(child)) :: Nil

Expand All @@ -248,7 +250,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
object StreamingDeduplicationStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case Deduplicate(keys, child, true) =>
case Deduplicate(keys, child) if child.isStreaming =>
StreamingDeduplicateExec(keys, planLater(child)) :: Nil

case _ => Nil
Expand Down Expand Up @@ -410,7 +412,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
case logical.LocalRelation(output, data, _) =>
LocalTableScanExec(output, data) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ case class DataSource(

val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
}.head
}
// For partitioned relation r, r.schema's column ordering can be different from the column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)

case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _),
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

case i @ InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, _) =>
l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, _) =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
// the user has specified static partitions, we add a Project operator on top of the query
// to include those constant column values in the query result.
Expand Down Expand Up @@ -177,7 +177,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast

val outputPath = t.location.rootPaths.head
val inputPaths = actualQuery.collect {
case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
}.flatten

val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
Expand Down Expand Up @@ -268,29 +268,30 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
import DataSourceStrategy._

def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) =>
pruneFilterProjectRaw(
l,
projects,
filters,
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) =>
case PhysicalOperation(projects, filters,
l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _)) =>
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) =>
pruneFilterProject(
l,
projects,
filters,
(a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil

case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
case l @ LogicalRelation(baseRelation: TableScan, _, _, _) =>
RowDataSourceScanExec(
l.output,
l.output.indices,
Expand Down
Loading