From 7de570641d5bd099ed0e4b68f513aeb2b7ea3f1a Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 24 Jun 2014 17:08:48 +0900 Subject: [PATCH 1/3] Make ScalaReflection be able to handle Generic case classes. --- .../spark/sql/catalyst/ScalaReflection.scala | 9 +++- .../sql/catalyst/ScalaReflectionSuite.scala | 46 +++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 196695a0a188..932e12120d0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -45,10 +45,15 @@ object ScalaReflection { val TypeRef(_, _, Seq(optType)) = t schemaFor(optType) case t if t <:< typeOf[Product] => - val params = t.member("": TermName).asMethod.paramss + val formalTypeArgs = t.typeSymbol.asClass.typeParams + val TypeRef(_, _, actualTypeArgs) = t + val params = t.member(nme.CONSTRUCTOR).asMethod.paramss StructType( params.head.map(p => - StructField(p.name.toString, schemaFor(p.typeSignature), nullable = true))) + StructField( + p.name.toString, + schemaFor(p.typeSignature.substituteTypes(formalTypeArgs, actualTypeArgs)), + nullable = true))) // Need to decide if we actually need a special type here. case t if t <:< typeOf[Array[Byte]] => BinaryType case t if t <:< typeOf[Array[_]] => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala new file mode 100644 index 000000000000..c4411b9c3fc8 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import java.sql.Timestamp + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +case class GenericData[A]( + genericField: A) + +class ScalaReflectionSuite extends FunSuite { + + test("generic data") { + val schema = ScalaReflection.schemaFor[GenericData[Int]] + assert(schema === + StructType(Seq( + StructField("genericField", IntegerType, nullable = true)))) + } + + test("tuple data") { + val schema = ScalaReflection.schemaFor[(Int, String)] + assert(schema === + StructType(Seq( + StructField("_1", IntegerType, nullable = true), + StructField("_2", StringType, nullable = true)))) + } +} From fac5faeba3e060fb8f26890f2dee07d67e9d1d4e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 26 Jun 2014 16:39:20 +0900 Subject: [PATCH 2/3] Remove unnecessary method receiver. --- .../org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index 70cb7fc5a100..c0438dbe52a4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -133,7 +133,7 @@ class ScalaReflectionSuite extends FunSuite { } test("generic data") { - val schema = ScalaReflection.schemaFor[GenericData[Int]] + val schema = schemaFor[GenericData[Int]] assert(schema === Schema( StructType(Seq( StructField("genericField", IntegerType, nullable = false))), @@ -141,7 +141,7 @@ class ScalaReflectionSuite extends FunSuite { } test("tuple data") { - val schema = ScalaReflection.schemaFor[(Int, String)] + val schema = schemaFor[(Int, String)] assert(schema === Schema( StructType(Seq( StructField("_1", IntegerType, nullable = false), From 32ef7c3a19d4d88328f086ebb385329a0dc06c88 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 28 Jun 2014 13:01:13 +0900 Subject: [PATCH 3/3] Add execution of `SHOW TABLES` before `TestHive.reset()`. --- .../org/apache/spark/sql/hive/execution/PruningSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 4d7c84f44387..34d8a061ccc8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -26,6 +26,9 @@ import scala.collection.JavaConversions._ * A set of test cases that validate partition and column pruning. */ class PruningSuite extends HiveComparisonTest { + // MINOR HACK: You must run a query before calling reset the first time. + TestHive.hql("SHOW TABLES") + // Column/partition pruning is not implemented for `InMemoryColumnarTableScan` yet, need to reset // the environment to ensure all referenced tables in this suites are not cached in-memory. // Refer to https://issues.apache.org/jira/browse/SPARK-2283 for details.