From 909f3bbbc0aa5ec0d6c65e66fe2e4e450dfa2df0 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Thu, 1 Sep 2016 15:28:03 +0800 Subject: [PATCH 1/3] fix OOM issue when generating JSON --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 5 ++++- .../src/test/scala/org/apache/spark/sql/QueryTest.scala | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 24a2dc9d3b35f..1026fcc2a5217 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -604,6 +604,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { }.toList } + // TODO: Fix toJSON so that we can more safely handle Map and Seq with loop. private def parseToJson(obj: Any): JValue = obj match { case b: Boolean => JBool(b) case b: Byte => JInt(b.toInt) @@ -617,7 +618,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case s: String => JString(s) case u: UUID => JString(u.toString) case dt: DataType => dt.jsonValue - case m: Metadata => m.jsonValue + // SPARK-17356: In usage of mllib, Metadata may store a huge vector of data, transforming + // it to JSON may trigger OutOfMemoryError. + case m: Metadata => Metadata.empty.jsonValue case s: StorageLevel => ("useDisk" -> s.useDisk) ~ ("useMemory" -> s.useMemory) ~ ("useOffHeap" -> s.useOffHeap) ~ ("deserialized" -> s.deserialized) ~ ("replication" -> s.replication) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index c7af40227d45f..c01a1375f3e8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.streaming.MemoryPlan -import org.apache.spark.sql.types.ObjectType +import org.apache.spark.sql.types.{Metadata, ObjectType} abstract class QueryTest extends PlanTest { @@ -274,6 +274,13 @@ abstract class QueryTest extends PlanTest { val normalized1 = logicalPlan.transformAllExpressions { case udf: ScalaUDF => udf.copy(function = null) case gen: UserDefinedGenerator => gen.copy(function = null) + // SPARK-17356: In usage of mllib, Metadata may store a huge vector of data, transforming + // it to JSON may trigger OutOfMemoryError. + case a @ Alias(child, name) if a.explicitMetadata.isDefined => + Alias(child, name)(a.exprId, a.qualifier, Some(Metadata.empty), a.isGenerated) + case a: AttributeReference if a.metadata != Metadata.empty => + AttributeReference(a.name, a.dataType, a.nullable, Metadata.empty)(a.exprId, a.qualifier, + a.isGenerated) } // RDDs/data are not serializable to JSON, so we need to collect LogicalPlans that contains From 39f3c63cbde086f44b8777d4ff708daa3bef2f18 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Fri, 2 Sep 2016 11:24:18 +0800 Subject: [PATCH 2/3] log warning if schema doesn't contain column for corrupted record --- sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index c01a1375f3e8d..d361f61764d1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -274,8 +274,9 @@ abstract class QueryTest extends PlanTest { val normalized1 = logicalPlan.transformAllExpressions { case udf: ScalaUDF => udf.copy(function = null) case gen: UserDefinedGenerator => gen.copy(function = null) - // SPARK-17356: In usage of mllib, Metadata may store a huge vector of data, transforming - // it to JSON may trigger OutOfMemoryError. + // After SPARK-17356: the JSON representation no longer has the Metadata. We need to remove + // the Metadata from the normalized plan so that we can compare this plan with the + // JSON-deserialzed plan. case a @ Alias(child, name) if a.explicitMetadata.isDefined => Alias(child, name)(a.exprId, a.qualifier, Some(Metadata.empty), a.isGenerated) case a: AttributeReference if a.metadata != Metadata.empty => From 20fa7e37946267b7a126653ba81270a404497e45 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Tue, 6 Sep 2016 07:32:19 +0800 Subject: [PATCH 3/3] address comment --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 1026fcc2a5217..66b5d5d41cfab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -604,7 +604,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { }.toList } - // TODO: Fix toJSON so that we can more safely handle Map and Seq with loop. private def parseToJson(obj: Any): JValue = obj match { case b: Boolean => JBool(b) case b: Byte => JInt(b.toInt)