Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join with an existing delta table and a forEachBatch dataframe Resulting in a Cross Join when Inner join is needed? #372

Closed
kant777 opened this issue Apr 1, 2020 · 0 comments

Comments

@kant777
Copy link

kant777 commented Apr 1, 2020

Join with an existing delta table and a forEachBatch dataframe Resulting in a Cross Join when Inner join is needed?

I am using Apache Spark 2.4.5
Delta: 0.5.0

The error org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans happens in following line of code

Dataset<Row> aggregatedDf =  batchDF
.selectExpr("CAST(value AS STRING) AS v1").as("srcDf")
.join(resultTable.toDF().as("new_result"), col("new_result.v2").equalTo("srcDf.v1"));

Sample code to reproduce

import io.delta.tables.DeltaTable;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;

import java.util.concurrent.TimeoutException;

import static org.apache.spark.sql.functions.col;

public class Test {

    public static void main(String... args) throws StreamingQueryException, TimeoutException {
        SparkSession sparkSession = SparkSession.builder().appName("join_test").getOrCreate();

        Dataset<Row> kafkaDSA = sparkSession
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "topicA1")
                .option("startingOffsets", "earliest")
                .option("maxOffsetsPerTrigger", "50000").load();

        kafkaDSA
                .selectExpr("CAST(value AS STRING) AS v1").as(Encoders.STRING())
                .writeStream()
                .trigger(Trigger.ProcessingTime(500))
                .format("delta")
                .outputMode("append")
                .foreachBatch((VoidFunction2<Dataset<String>, Long>) (batchDF, v2) -> {
                    DeltaTable resultTable = DeltaTable.forPath("/tmp/delta/result").as("result");
                    Dataset<Row> aggregatedDf =  batchDF.selectExpr("CAST(value AS STRING) AS v1").as("srcDf").join(resultTable.toDF(), col("srcDf.v1").equalTo("result.v2"));
                     aggregatedDf.count();
                    //aggregatedDf.select(col("result.v2"), col("srcDf.v1")).write().format("delta").mode("append").save("/tmp/delta/result");
                    //batchDF.write().format("delta").mode("append").save("/tmp/delta/result");
                })
                .option("checkpointLocation", "/tmp/delta/spark_checkpoint_topicA")
                .start();

        sparkSession.streams().awaitAnyTermination();
    }
}


The result table has the following schema.

scala> spark.read.format("delta").load("/tmp/delta/result").printSchema  
root
 |-- v2: string (nullable = true)
 |-- v1: string (nullable = true)

The same data of the result table (FYI, the numbers below are encoded as strings)

scala> spark.read.format("delta").load("/tmp/delta/result").show(5)
+---+---+
| v2| v1|
+---+---+
|  7|  7|
|  7|  7|
|  7|  7|
|100|100|
|100|100|
+---+---+
only showing top 5 rows

Look at the stage4 I am not sure why the union is happening

Screen Shot 2020-03-31 at 11 25 46 PM

Here is the full execution plan from the UI

Screen Shot 2020-03-31 at 11 18 10 PM

Screen Shot 2020-03-31 at 11 23 22 PM

The error.

org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
Project
+- Filter (isnotnull(value#725) && (value#725 = result.v2))
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#725]
      +- ExternalRDD [obj#724]
and
Project
+- Relation[v2#727,v1#728] parquet
Join condition is missing or trivial.
Either: use the CROSS JOIN syntax to allow cartesian products between these
relations, or: enable implicit cartesian products by setting the configuration
variable spark.sql.crossJoin.enabled=true;
	at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$22.applyOrElse(Optimizer.scala:1295)
	at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$22.applyOrElse(Optimizer.scala:1292)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:258)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:258)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:257)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:263)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:263)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:247)
	at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$.apply(Optimizer.scala:1292)
	at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$.apply(Optimizer.scala:1274)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
	at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
	at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
	at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:67)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:67)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
	at Test.lambda$main$3dd8454f$1(Test.java:37)
	at org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:390)
	at org.apache.spark.sql.streaming.DataStreamWriter$$anonfun$foreachBatch$1.apply(DataStreamWriter.scala:390)
	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
@kant777 kant777 changed the title Join with an existing delta table with a forEachBatch dataframe Resulting in a Cross Join when Inner join is needed? Join with an existing delta table and a forEachBatch dataframe Resulting in a Cross Join when Inner join is needed? Apr 1, 2020
@kant777 kant777 closed this as completed Apr 1, 2020
tdas pushed a commit to tdas/delta that referenced this issue May 31, 2023
…test utils and ExecutionITCaseBase class to common util class (delta-io#372)

* TestUtilCleanup - Move few methods from Flink sink test utils to common Flink connector tests utils.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanup - delta-io#2 Move few methods from Flink sink test utils to common Flink connector tests utils.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanup - delta-io#3 Move few methods from Flink sink test utils to common Flink connector tests utils.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - Changes after Review

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - new changes

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - more refactoring

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - additional refactoring

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - Merge From master + remove source partition table. Add log4j property file for tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* TestUtilCleanUp - Merge From master + remove source partition table. Add log4j property file for tests.

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant