From c194d5e4827955427b1ffac5bb582b994ae20cac Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 17 Sep 2014 14:41:49 -0700 Subject: [PATCH 01/18] add metadata field to StructField and Attribute --- .../expressions/namedExpressions.scala | 18 ++++++++++++------ .../spark/sql/catalyst/types/dataTypes.scala | 12 +++++++++--- .../org/apache/spark/sql/json/JsonRDD.scala | 13 ++++--------- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 7c4b9d4847e26..755b94c2a123b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -40,6 +40,7 @@ abstract class NamedExpression extends Expression { def name: String def exprId: ExprId def qualifiers: Seq[String] + def metadata: Map[String, Any] = Map.empty def toAttribute: Attribute @@ -112,9 +113,13 @@ case class Alias(child: Expression, name: String) * qualified way. Consider the examples tableName.name, subQueryAlias.name. * tableName and subQueryAlias are possible qualifiers. */ -case class AttributeReference(name: String, dataType: DataType, nullable: Boolean = true) - (val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil) - extends Attribute with trees.LeafNode[Expression] { +case class AttributeReference( + name: String, + dataType: DataType, + nullable: Boolean = true, + override val metadata: Map[String, Any] = Map.empty)( + val exprId: ExprId = NamedExpression.newExprId, + val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] { override def references = AttributeSet(this :: Nil) @@ -131,7 +136,8 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea h } - override def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers) + override def newInstance = + AttributeReference(name, dataType, nullable, metadata)(qualifiers = qualifiers) /** * Returns a copy of this [[AttributeReference]] with changed nullability. @@ -140,7 +146,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea if (nullable == newNullability) { this } else { - AttributeReference(name, dataType, newNullability)(exprId, qualifiers) + AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifiers) } } @@ -151,7 +157,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea if (newQualifiers == qualifiers) { this } else { - AttributeReference(name, dataType, nullable)(exprId, newQualifiers) + AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifiers) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 49520b7678e90..663e532bcff82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -296,8 +296,14 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT * @param name The name of this field. * @param dataType The data type of this field. * @param nullable Indicates if values of this field can be `null` values. + * @param metadata The metadata of this field, which is a map from string to simple type that can be + * serialized to JSON automatically. */ -case class StructField(name: String, dataType: DataType, nullable: Boolean) { +case class StructField( + name: String, + dataType: DataType, + nullable: Boolean, + metadata: Map[String, Any] = Map.empty) { private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = { builder.append(s"${prefix}-- ${name}: ${dataType.simpleString} (nullable = ${nullable})\n") @@ -307,7 +313,7 @@ case class StructField(name: String, dataType: DataType, nullable: Boolean) { object StructType { protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType = - StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable))) + StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))) } case class StructType(fields: Seq[StructField]) extends DataType { @@ -342,7 +348,7 @@ case class StructType(fields: Seq[StructField]) extends DataType { } protected[sql] def toAttributes = - fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)()) + fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) def treeString: String = { val builder = new StringBuilder diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 0f27fd13e7379..dcb283d4c9ef2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -112,10 +112,7 @@ private[sql] object JsonRDD extends Logging { } }.flatMap(field => field).toSeq - StructType( - (topLevelFields ++ structFields).sortBy { - case StructField(name, _, _) => name - }) + StructType((topLevelFields ++ structFields).sortBy(_.name)) } makeStruct(resolved.keySet.toSeq, Nil) @@ -123,7 +120,7 @@ private[sql] object JsonRDD extends Logging { private[sql] def nullTypeToStringType(struct: StructType): StructType = { val fields = struct.fields.map { - case StructField(fieldName, dataType, nullable) => { + case StructField(fieldName, dataType, nullable, _) => { val newType = dataType match { case NullType => StringType case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull) @@ -158,9 +155,7 @@ private[sql] object JsonRDD extends Logging { StructField(name, dataType, true) } } - StructType(newFields.toSeq.sortBy { - case StructField(name, _, _) => name - }) + StructType(newFields.toSeq.sortBy(_.name)) } case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) => ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2) @@ -385,7 +380,7 @@ private[sql] object JsonRDD extends Logging { // TODO: Reuse the row instead of creating a new one for every record. val row = new GenericMutableRow(schema.fields.length) schema.fields.zipWithIndex.foreach { - case (StructField(name, dataType, _), i) => + case (StructField(name, dataType, _, _), i) => row.update(i, json.get(name).flatMap(v => Option(v)).map( enforceCorrectType(_, dataType)).orNull) } From 367d237b3d5e445a67e6a8b9c9ae79abff26a045 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 7 Oct 2014 13:17:02 -0700 Subject: [PATCH 02/18] add test --- .../spark/sql/catalyst/ScalaReflection.scala | 2 +- .../sql/catalyst/expressions/Expression.scala | 1 + .../sql/catalyst/expressions/generators.scala | 2 +- .../expressions/namedExpressions.scala | 3 ++- .../plans/logical/basicOperators.scala | 2 +- .../spark/sql/catalyst/types/dataTypes.scala | 3 ++- .../org/apache/spark/sql/MetadataSuite.scala | 27 +++++++++++++++++++ 7 files changed, 35 insertions(+), 5 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 88a8fa7c28e0f..51ba0c3dac321 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -62,7 +62,7 @@ object ScalaReflection { params.head.map { p => val Schema(dataType, nullable) = schemaFor(p.typeSignature.substituteTypes(formalTypeArgs, actualTypeArgs)) - StructField(p.name.toString, dataType, nullable) + StructField(p.name.toString, dataType, nullable, Map.empty) }), nullable = true) // Need to decide if we actually need a special type here. case t if t <:< typeOf[Array[Byte]] => Schema(BinaryType, nullable = true) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 70507e7ee2be8..ab0179b14b592 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -41,6 +41,7 @@ abstract class Expression extends TreeNode[Expression] { */ def foldable: Boolean = false def nullable: Boolean + def metadata: Map[String, Any] = Map.empty def references: AttributeSet = AttributeSet(children.flatMap(_.references.iterator)) /** Returns the result of evaluating this expression on a given input Row */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 9c865254e0be9..ab0701fd9a80b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -43,7 +43,7 @@ abstract class Generator extends Expression { override type EvaluatedType = TraversableOnce[Row] override lazy val dataType = - ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable)))) + ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))) override def nullable = false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 755b94c2a123b..5564fa2b09bb6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -40,7 +40,6 @@ abstract class NamedExpression extends Expression { def name: String def exprId: ExprId def qualifiers: Seq[String] - def metadata: Map[String, Any] = Map.empty def toAttribute: Attribute @@ -99,6 +98,8 @@ case class Alias(child: Expression, name: String) override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix" override protected final def otherCopyArgs = exprId :: qualifiers :: Nil + + override def metadata: Map[String, Any] = child.metadata } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 5d10754c7b028..6cd2b456ec7c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -163,7 +163,7 @@ case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode { protected def lowerCaseSchema(dataType: DataType): DataType = dataType match { case StructType(fields) => StructType(fields.map(f => - StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable))) + StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable, f.metadata))) case ArrayType(elemType, containsNull) => ArrayType(lowerCaseSchema(elemType), containsNull) case otherType => otherType } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 663e532bcff82..99bd12dfa5e19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -57,7 +57,8 @@ object DataType extends RegexParsers { protected lazy val structField: Parser[StructField] = ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ { case name ~ tpe ~ nullable => - StructField(name, tpe, nullable = nullable) + // TODO: parse metadata + StructField(name, tpe, nullable = nullable, Map.empty) } protected lazy val boolVal: Parser[Boolean] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala new file mode 100644 index 0000000000000..3512998f9d832 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala @@ -0,0 +1,27 @@ +package org.apache.spark.sql + +import org.apache.spark.sql.test.TestSQLContext +import org.scalatest.FunSuite + +case class Person(name: String, age: Int) + +class MetadataSuite extends FunSuite { + + test("metadata") { + val sqlContext = TestSQLContext + import sqlContext._ + val members = sqlContext.sparkContext.makeRDD(Seq( + Person("mike", 10), + Person("jim", 20))) + val table: SchemaRDD = sqlContext.createSchemaRDD(members) + val schema: StructType = table.schema + println("schema: " + schema) + val ageField = schema("age").copy(metadata = Map("desc" -> "age (must be nonnegative)")) + val newSchema = schema.copy(Seq(schema("name"), ageField)) + val newTable = sqlContext.applySchema(table, newSchema) + val selectByExprAgeField = newTable.select('age).schema("age") + assert(selectByExprAgeField.metadata.nonEmpty) + val selectByNameAttrAgeField = newTable.select("age".attr).schema("age") + assert(selectByNameAttrAgeField.metadata.nonEmpty) + } +} From d65072e483da9fd2dbd4999a4976befe2ce054d1 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 7 Oct 2014 14:46:48 -0700 Subject: [PATCH 03/18] remove Map.empty --- .../spark/sql/catalyst/ScalaReflection.scala | 4 ++-- .../sql/catalyst/expressions/Expression.scala | 4 +++- .../expressions/namedExpressions.scala | 7 +++---- .../plans/logical/basicOperators.scala | 9 +++++---- .../spark/sql/catalyst/types/dataTypes.scala | 7 ++++--- .../org/apache/spark/sql/MetadataSuite.scala | 19 +++++++++++++------ 6 files changed, 30 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 51ba0c3dac321..18e10c95754ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -43,7 +43,7 @@ object ScalaReflection { /** Returns a Sequence of attributes for the given case class type. */ def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match { case Schema(s: StructType, _) => - s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)()) + s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) } /** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */ @@ -62,7 +62,7 @@ object ScalaReflection { params.head.map { p => val Schema(dataType, nullable) = schemaFor(p.typeSignature.substituteTypes(formalTypeArgs, actualTypeArgs)) - StructField(p.name.toString, dataType, nullable, Map.empty) + StructField(p.name.toString, dataType, nullable) }), nullable = true) // Need to decide if we actually need a special type here. case t if t <:< typeOf[Array[Byte]] => Schema(BinaryType, nullable = true) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index ab0179b14b592..6371582ddc6c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -41,9 +41,11 @@ abstract class Expression extends TreeNode[Expression] { */ def foldable: Boolean = false def nullable: Boolean - def metadata: Map[String, Any] = Map.empty def references: AttributeSet = AttributeSet(children.flatMap(_.references.iterator)) + /** Returns the metadata when an expression is a reference to another expression with metadata. */ + def metadata: Map[String, Any] = Map.empty + /** Returns the result of evaluating this expression on a given input Row */ def eval(input: Row = null): EvaluatedType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 5564fa2b09bb6..3167e4ba49a87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -85,11 +85,11 @@ case class Alias(child: Expression, name: String) override def dataType = child.dataType override def nullable = child.nullable - + override def metadata: Map[String, Any] = child.metadata override def toAttribute = { if (resolved) { - AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers) + AttributeReference(name, child.dataType, child.nullable, child.metadata)(exprId, qualifiers) } else { UnresolvedAttribute(name) } @@ -98,8 +98,6 @@ case class Alias(child: Expression, name: String) override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix" override protected final def otherCopyArgs = exprId :: qualifiers :: Nil - - override def metadata: Map[String, Any] = child.metadata } /** @@ -108,6 +106,7 @@ case class Alias(child: Expression, name: String) * @param name The name of this attribute, should only be used during analysis or for debugging. * @param dataType The [[DataType]] of this attribute. * @param nullable True if null is a valid value for this attribute. + * @param metadata The metadata of this attribute. * @param exprId A globally unique id used to check if different AttributeReferences refer to the * same attribute. * @param qualifiers a list of strings that can be used to referred to this attribute in a fully diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 6cd2b456ec7c6..c10e751ee3917 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -163,7 +163,7 @@ case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode { protected def lowerCaseSchema(dataType: DataType): DataType = dataType match { case StructType(fields) => StructType(fields.map(f => - StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable, f.metadata))) + StructField(f.name.toLowerCase, lowerCaseSchema(f.dataType), f.nullable, f.metadata))) case ArrayType(elemType, containsNull) => ArrayType(lowerCaseSchema(elemType), containsNull) case otherType => otherType } @@ -173,9 +173,10 @@ case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode { AttributeReference( a.name.toLowerCase, lowerCaseSchema(a.dataType), - a.nullable)( - a.exprId, - a.qualifiers) + a.nullable, + a.metadata)( + a.exprId, + a.qualifiers) case other => other } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 99bd12dfa5e19..05ac9d23b0284 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -298,7 +298,8 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT * @param dataType The data type of this field. * @param nullable Indicates if values of this field can be `null` values. * @param metadata The metadata of this field, which is a map from string to simple type that can be - * serialized to JSON automatically. + * serialized to JSON automatically. The metadata should be preserved during + * transformation if the content of the column is not modified, e.g, in selection. */ case class StructField( name: String, @@ -330,8 +331,8 @@ case class StructType(fields: Seq[StructField]) extends DataType { * have a name matching the given name, `null` will be returned. */ def apply(name: String): StructField = { - nameToField.get(name).getOrElse( - throw new IllegalArgumentException(s"Field ${name} does not exist.")) + nameToField.getOrElse(name, + throw new IllegalArgumentException(s"Field $name does not exist.")) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala index 3512998f9d832..f289461d8034a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala @@ -13,15 +13,22 @@ class MetadataSuite extends FunSuite { val members = sqlContext.sparkContext.makeRDD(Seq( Person("mike", 10), Person("jim", 20))) - val table: SchemaRDD = sqlContext.createSchemaRDD(members) - val schema: StructType = table.schema + val person: SchemaRDD = sqlContext.createSchemaRDD(members) + val schema: StructType = person.schema println("schema: " + schema) - val ageField = schema("age").copy(metadata = Map("desc" -> "age (must be nonnegative)")) + val ageField = schema("age").copy(metadata = Map("doc" -> "age (must be nonnegative)")) val newSchema = schema.copy(Seq(schema("name"), ageField)) - val newTable = sqlContext.applySchema(table, newSchema) + val newTable = sqlContext.applySchema(person, newSchema) + newTable.registerTempTable("person") val selectByExprAgeField = newTable.select('age).schema("age") - assert(selectByExprAgeField.metadata.nonEmpty) + assert(selectByExprAgeField.metadata.contains("doc")) val selectByNameAttrAgeField = newTable.select("age".attr).schema("age") - assert(selectByNameAttrAgeField.metadata.nonEmpty) + assert(selectByNameAttrAgeField.metadata.contains("doc")) + val selectAgeBySQL = sql("SELECT age FROM person").schema("age") + println(selectAgeBySQL) + assert(selectAgeBySQL.metadata.contains("doc")) + val selectStarBySQL = sql("SELECT * FROM person").schema("age") + println(selectStarBySQL) + assert(selectStarBySQL.metadata.contains("doc")) } } From 67fdebb7412484c6674052d2ac6c94573e846ce5 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 7 Oct 2014 15:06:12 -0700 Subject: [PATCH 04/18] add test on join --- .../org/apache/spark/sql/MetadataSuite.scala | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala index f289461d8034a..94ca949ab40f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala @@ -3,26 +3,31 @@ package org.apache.spark.sql import org.apache.spark.sql.test.TestSQLContext import org.scalatest.FunSuite -case class Person(name: String, age: Int) +case class Person(id: Int, name: String, age: Int) + +case class Score(personId: Int, score: Double) class MetadataSuite extends FunSuite { test("metadata") { val sqlContext = TestSQLContext import sqlContext._ - val members = sqlContext.sparkContext.makeRDD(Seq( - Person("mike", 10), - Person("jim", 20))) - val person: SchemaRDD = sqlContext.createSchemaRDD(members) - val schema: StructType = person.schema - println("schema: " + schema) - val ageField = schema("age").copy(metadata = Map("doc" -> "age (must be nonnegative)")) - val newSchema = schema.copy(Seq(schema("name"), ageField)) - val newTable = sqlContext.applySchema(person, newSchema) - newTable.registerTempTable("person") - val selectByExprAgeField = newTable.select('age).schema("age") + val person = sqlContext.sparkContext.makeRDD(Seq( + Person(0, "mike", 10), + Person(1, "jim", 20))).toSchemaRDD + val score = sqlContext.sparkContext.makeRDD(Seq( + Score(0, 4.0), + Score(1, 5.0))).toSchemaRDD + val personSchema: StructType = person.schema + println("schema: " + personSchema) + val ageField = personSchema("age").copy(metadata = Map("doc" -> "age (must be nonnegative)")) + val newPersonSchema = personSchema.copy(Seq(personSchema("id"), personSchema("name"), ageField)) + val newPerson = sqlContext.applySchema(person, newPersonSchema) + newPerson.registerTempTable("person") + score.registerTempTable("score") + val selectByExprAgeField = newPerson.select('age).schema("age") assert(selectByExprAgeField.metadata.contains("doc")) - val selectByNameAttrAgeField = newTable.select("age".attr).schema("age") + val selectByNameAttrAgeField = newPerson.select("age".attr).schema("age") assert(selectByNameAttrAgeField.metadata.contains("doc")) val selectAgeBySQL = sql("SELECT age FROM person").schema("age") println(selectAgeBySQL) @@ -30,5 +35,11 @@ class MetadataSuite extends FunSuite { val selectStarBySQL = sql("SELECT * FROM person").schema("age") println(selectStarBySQL) assert(selectStarBySQL.metadata.contains("doc")) + val selectStarJoinBySQL = sql("SELECT * FROM person JOIN score ON id = personId").schema("age") + println(selectStarJoinBySQL) + assert(selectStarJoinBySQL.metadata.contains("doc")) + val selectAgeJoinBySQL = sql("SELECT age, score FROM person JOIN score ON id = personId").schema("age") + println(selectAgeJoinBySQL) + assert(selectAgeJoinBySQL.metadata.contains("doc")) } } From d8af0edc105e767e22d3d2696587a502741b9416 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 7 Oct 2014 17:19:33 -0700 Subject: [PATCH 05/18] move tests to SQLQuerySuite --- .../org/apache/spark/sql/MetadataSuite.scala | 45 ------------------- .../org/apache/spark/sql/SQLQuerySuite.scala | 21 +++++++++ .../scala/org/apache/spark/sql/TestData.scala | 11 +++++ 3 files changed, 32 insertions(+), 45 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala deleted file mode 100644 index 94ca949ab40f7..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataSuite.scala +++ /dev/null @@ -1,45 +0,0 @@ -package org.apache.spark.sql - -import org.apache.spark.sql.test.TestSQLContext -import org.scalatest.FunSuite - -case class Person(id: Int, name: String, age: Int) - -case class Score(personId: Int, score: Double) - -class MetadataSuite extends FunSuite { - - test("metadata") { - val sqlContext = TestSQLContext - import sqlContext._ - val person = sqlContext.sparkContext.makeRDD(Seq( - Person(0, "mike", 10), - Person(1, "jim", 20))).toSchemaRDD - val score = sqlContext.sparkContext.makeRDD(Seq( - Score(0, 4.0), - Score(1, 5.0))).toSchemaRDD - val personSchema: StructType = person.schema - println("schema: " + personSchema) - val ageField = personSchema("age").copy(metadata = Map("doc" -> "age (must be nonnegative)")) - val newPersonSchema = personSchema.copy(Seq(personSchema("id"), personSchema("name"), ageField)) - val newPerson = sqlContext.applySchema(person, newPersonSchema) - newPerson.registerTempTable("person") - score.registerTempTable("score") - val selectByExprAgeField = newPerson.select('age).schema("age") - assert(selectByExprAgeField.metadata.contains("doc")) - val selectByNameAttrAgeField = newPerson.select("age".attr).schema("age") - assert(selectByNameAttrAgeField.metadata.contains("doc")) - val selectAgeBySQL = sql("SELECT age FROM person").schema("age") - println(selectAgeBySQL) - assert(selectAgeBySQL.metadata.contains("doc")) - val selectStarBySQL = sql("SELECT * FROM person").schema("age") - println(selectStarBySQL) - assert(selectStarBySQL.metadata.contains("doc")) - val selectStarJoinBySQL = sql("SELECT * FROM person JOIN score ON id = personId").schema("age") - println(selectStarJoinBySQL) - assert(selectStarJoinBySQL.metadata.contains("doc")) - val selectAgeJoinBySQL = sql("SELECT age, score FROM person JOIN score ON id = personId").schema("age") - println(selectAgeJoinBySQL) - assert(selectAgeJoinBySQL.metadata.contains("doc")) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 67563b6c55f4b..e4b8aeff60c63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -673,4 +673,25 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { sql("SELECT CAST(TRUE AS STRING), CAST(FALSE AS STRING) FROM testData LIMIT 1"), ("true", "false") :: Nil) } + + test("metadata is propagated correctly") { + val person = sql("SELECT * FROM person") + val schema = person.schema + val docKey = "doc" + val docValue = "first name" + val schemaWithMeta = new StructType(Seq( + schema("id"), schema("name").copy(metadata = Map(docKey -> docValue)), schema("age"))) + val personWithMeta = applySchema(person, schemaWithMeta) + def validateMetadata(rdd: SchemaRDD): Unit = { + assert(rdd.schema("name").metadata(docKey) === docValue) + } + personWithMeta.registerTempTable("personWithMeta") + validateMetadata(personWithMeta.select('name)) + validateMetadata(personWithMeta.select("name".attr)) + validateMetadata(personWithMeta.select('id, 'name)) + validateMetadata(sql("SELECT * FROM personWithMeta")) + validateMetadata(sql("SELECT id, name FROM personWithMeta")) + validateMetadata(sql("SELECT * FROM personWithMeta JOIN salary ON id = personId")) + validateMetadata(sql("SELECT name, salary FROM personWithMeta JOIN salary ON id = personId")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index eb33a61c6e811..9600ebbd0da59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -156,4 +156,15 @@ object TestData { // An RDD with 4 elements and 8 partitions val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8) withEmptyParts.registerTempTable("withEmptyParts") + + case class Person(id: Int, name: String, age: Int) + case class Salary(personId: Int, salary: Double) + val person = TestSQLContext.sparkContext.parallelize( + Person(0, "mike", 30) :: + Person(1, "jim", 20) :: Nil) + person.registerTempTable("person") + val salary = TestSQLContext.sparkContext.parallelize( + Salary(0, 2000.0) :: + Salary(1, 1000.0) :: Nil) + salary.registerTempTable("salary") } From 7e5a322eec77b3228f60f74ff8f573324b82c67c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 7 Oct 2014 19:46:53 -0700 Subject: [PATCH 06/18] do not output metadata in StructField.toString --- .../apache/spark/sql/catalyst/types/dataTypes.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 5d9deab4da184..1f4c9ce53ee53 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -57,9 +57,8 @@ object DataType extends RegexParsers { protected lazy val structField: Parser[StructField] = ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ { - case name ~ tpe ~ nullable => - // TODO: parse metadata - StructField(name, tpe, nullable = nullable, Map.empty) + // metadata is not included in StructField.toString + case name ~ tpe ~ nullable => StructField(name, tpe, nullable = nullable, Map.empty) } protected lazy val boolVal: Parser[Boolean] = @@ -336,6 +335,11 @@ case class StructField( builder.append(s"${prefix}-- ${name}: ${dataType.simpleString} (nullable = ${nullable})\n") DataType.buildFormattedString(dataType, s"$prefix |", builder) } + + override def toString: String = { + // TODO: Remove this function after SPARK-3713. + s"StructField($name,$dataType,$nullable)" + } } object StructType { @@ -356,8 +360,7 @@ case class StructType(fields: Seq[StructField]) extends DataType { * have a name matching the given name, `null` will be returned. */ def apply(name: String): StructField = { - nameToField.getOrElse(name, - throw new IllegalArgumentException(s"Field $name does not exist.")) + nameToField.getOrElse(name, throw new IllegalArgumentException(s"Field $name does not exist.")) } /** From 618e349f1a5b1494c0c8fcbf075768d6b08d91f9 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 8 Oct 2014 23:53:09 -0700 Subject: [PATCH 07/18] make tests work in scala --- .../apache/spark/sql/catalyst/types/dataTypes.scala | 10 ++++++---- .../scala/org/apache/spark/sql/DataTypeSuite.scala | 3 ++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 0b8eb00b087b9..db6fd891717dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -70,10 +70,11 @@ object DataType { private def parseStructField(json: JValue): StructField = json match { case JSortedObject( + ("metadata", metadata: JObject), ("name", JString(name)), ("nullable", JBool(nullable)), ("type", dataType: JValue)) => - StructField(name, parseDataType(dataType), nullable) + StructField(name, parseDataType(dataType), nullable, metadata.values) } @deprecated("Use DataType.fromJson instead") @@ -393,14 +394,15 @@ case class StructField( } override def toString: String = { - // TODO: Remove this function after SPARK-3713. + // Do not add metadata to be consistent with CaseClassStringParser. s"StructField($name,$dataType,$nullable)" } - + private[sql] def jsonValue: JValue = { ("name" -> name) ~ ("type" -> dataType.jsonValue) ~ - ("nullable" -> nullable) + ("nullable" -> nullable) ~ + ("metadata" -> Extraction.decompose(metadata)(DefaultFormats)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala index 100ecb45e9e88..a44e50af6a8cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala @@ -82,5 +82,6 @@ class DataTypeSuite extends FunSuite { checkDataTypeJsonRepr( StructType(Seq( StructField("a", IntegerType, nullable = true), - StructField("b", ArrayType(DoubleType), nullable = false)))) + StructField("b", ArrayType(DoubleType), nullable = false), + StructField("c", DoubleType, nullable = false, metadata = Map("name" -> "age"))))) } From 905bb893fdffd4baffb41b0c93365efe8da09c27 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 9 Oct 2014 00:29:38 -0700 Subject: [PATCH 08/18] java conversions --- .../expressions/namedExpressions.scala | 1 + .../apache/spark/sql/api/java/DataType.java | 24 +++++++++++++++---- .../spark/sql/api/java/StructField.java | 20 ++++++++++++++-- .../sql/types/util/DataTypeConversions.scala | 6 +++-- .../ScalaSideDataTypeConversionSuite.scala | 3 ++- 5 files changed, 44 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index d04071375bb21..b2340370a4934 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -134,6 +134,7 @@ case class AttributeReference( var h = 17 h = h * 37 + exprId.hashCode() h = h * 37 + dataType.hashCode() + h = h * 37 + metadata.hashCode() h } diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java index 37b4c8ffcba0b..7750a781996ff 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java @@ -17,9 +17,7 @@ package org.apache.spark.sql.api.java; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; /** * The base type of all Spark SQL data types. @@ -146,15 +144,31 @@ public static MapType createMapType( * Creates a StructField by specifying the name ({@code name}), data type ({@code dataType}) and * whether values of this field can be null values ({@code nullable}). */ - public static StructField createStructField(String name, DataType dataType, boolean nullable) { + public static StructField createStructField( + String name, + DataType dataType, + boolean nullable, + Map metadata) { if (name == null) { throw new IllegalArgumentException("name should not be null."); } if (dataType == null) { throw new IllegalArgumentException("dataType should not be null."); } + if (metadata == null) { + throw new IllegalArgumentException("metadata should not be null."); + } + + return new StructField(name, dataType, nullable, metadata); + } - return new StructField(name, dataType, nullable); + /** + * Creates a StructField with empty metadata. + * + * @see #createStructField(String, DataType, boolean, java.util.Map) + */ + public static StructField createStructField(String name, DataType dataType, boolean nullable) { + return createStructField(name, dataType, nullable, new HashMap()); } /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java index b48e2a2c5f953..86b6b3ca4f93d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.api.java; +import java.util.Map; + /** * A StructField object represents a field in a StructType object. * A StructField object comprises three fields, {@code String name}, {@code DataType dataType}, @@ -24,20 +26,28 @@ * The field of {@code dataType} specifies the data type of a StructField. * The field of {@code nullable} specifies if values of a StructField can contain {@code null} * values. + * The field of {@code metadata} provides extra information of the StructField, which is a map from + * string to simple type that can be serialized to JSON automatically * * To create a {@link StructField}, - * {@link DataType#createStructField(String, DataType, boolean)} + * {@link DataType#createStructField(String, DataType, boolean, Map)} * should be used. */ public class StructField { private String name; private DataType dataType; private boolean nullable; + private Map metadata; - protected StructField(String name, DataType dataType, boolean nullable) { + protected StructField( + String name, + DataType dataType, + boolean nullable, + Map metadata) { this.name = name; this.dataType = dataType; this.nullable = nullable; + this.metadata = metadata; } public String getName() { @@ -52,6 +62,10 @@ public boolean isNullable() { return nullable; } + public Map getMetadata() { + return metadata; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -62,6 +76,7 @@ public boolean equals(Object o) { if (nullable != that.nullable) return false; if (!dataType.equals(that.dataType)) return false; if (!name.equals(that.name)) return false; + if (!metadata.equals(that.metadata)) return false; return true; } @@ -71,6 +86,7 @@ public int hashCode() { int result = name.hashCode(); result = 31 * result + dataType.hashCode(); result = 31 * result + (nullable ? 1 : 0); + result = 31 * result + metadata.hashCode(); return result; } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala index 77353f4eb0227..37703abb62463 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala @@ -31,7 +31,8 @@ protected[sql] object DataTypeConversions { JDataType.createStructField( scalaStructField.name, asJavaDataType(scalaStructField.dataType), - scalaStructField.nullable) + scalaStructField.nullable, + scalaStructField.metadata.asJava.asInstanceOf[java.util.Map[String, Object]]) } /** @@ -67,7 +68,8 @@ protected[sql] object DataTypeConversions { StructField( javaStructField.getName, asScalaDataType(javaStructField.getDataType), - javaStructField.isNullable) + javaStructField.isNullable, + javaStructField.getMetadata.asScala.toMap) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala index ff1debff0f8c1..212f06a22c10f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala @@ -70,7 +70,8 @@ class ScalaSideDataTypeConversionSuite extends FunSuite { SStructField("simpleArray", simpleScalaArrayType, true) :: SStructField("simpleMap", simpleScalaMapType, true) :: SStructField("simpleStruct", simpleScalaStructType, true) :: - SStructField("boolean", org.apache.spark.sql.BooleanType, false) :: Nil) + SStructField("boolean", org.apache.spark.sql.BooleanType, false) :: + SStructField("withMeta", org.apache.spark.sql.DoubleType, false, Map("name" -> "age")) :: Nil) checkDataType(complexScalaStructType) // Complex ArrayType. From 93518fbfcef06621b81ea33439833a6e2c158bc7 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 9 Oct 2014 00:45:48 -0700 Subject: [PATCH 09/18] support metadata in python --- python/pyspark/sql.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index d3d36eb995ab6..827daf37a7c6d 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -305,12 +305,15 @@ class StructField(DataType): """ - def __init__(self, name, dataType, nullable): + def __init__(self, name, dataType, nullable, metadata={}): """Creates a StructField :param name: the name of this field. :param dataType: the data type of this field. :param nullable: indicates whether values of this field can be null. + :param metadata: metadata of this field, which is a map from string + to simple type that can be serialized to JSON + automatically >>> (StructField("f1", StringType, True) ... == StructField("f1", StringType, True)) @@ -322,6 +325,7 @@ def __init__(self, name, dataType, nullable): self.name = name self.dataType = dataType self.nullable = nullable + self.metadata = metadata def __repr__(self): return "StructField(%s,%s,%s)" % (self.name, self.dataType, @@ -330,13 +334,15 @@ def __repr__(self): def jsonValue(self): return {"name": self.name, "type": self.dataType.jsonValue(), - "nullable": self.nullable} + "nullable": self.nullable, + "metadata": self.metadata} @classmethod def fromJson(cls, json): return StructField(json["name"], _parse_datatype_json_value(json["type"]), - json["nullable"]) + json["nullable"], + json["metadata"]) class StructType(DataType): @@ -415,7 +421,8 @@ def _parse_datatype_json_string(json_string): ... StructField("simpleArray", simple_arraytype, True), ... StructField("simpleMap", simple_maptype, True), ... StructField("simpleStruct", simple_structtype, True), - ... StructField("boolean", BooleanType(), False)]) + ... StructField("boolean", BooleanType(), False), + ... StructField("withMeta", DoubleType(), False, {"name": "age"})]) >>> check_datatype(complex_structtype) True >>> # Complex ArrayType. From 60614c72f242852dde90c04d3222f305437e3a63 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 14 Oct 2014 01:05:55 -0700 Subject: [PATCH 10/18] add metadata --- .../spark/sql/catalyst/util/Metadata.scala | 160 ++++++++++++++++++ .../sql/catalyst/util/MetadataSuite.scala | 60 +++++++ 2 files changed, 220 insertions(+) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala new file mode 100644 index 0000000000000..bf5250e2c7341 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala @@ -0,0 +1,160 @@ +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import scala.reflect.ClassTag + +sealed class Metadata private[util] (val map: Map[String, Any]) extends Serializable { + + def getInt(key: String): Int = get(key) + + def getDouble(key: String): Double = get(key) + + def getBoolean(key: String): Boolean = get(key) + + def getString(key: String): String = get(key) + + def getMetadata(key: String): Metadata = get(key) + + def getIntArray(key: String): Array[Int] = getArray(key) + + def getDoubleArray(key: String): Array[Double] = getArray(key) + + def getBooleanArray(key: String): Array[Boolean] = getArray(key) + + def getStringArray(key: String): Array[String] = getArray(key) + + def getMetadataArray(key: String): Array[Metadata] = getArray(key) + + def toJson: String = { + compact(render(Metadata.toJValue(this))) + } + + private def get[T](key: String): T = { + map(key).asInstanceOf[T] + } + + private def getArray[T: ClassTag](key: String): Array[T] = { + map(key).asInstanceOf[Seq[T]].toArray + } + + override def toString: String = toJson +} + +object Metadata { + + def empty: Metadata = new Metadata(Map.empty) + + def fromJson(json: String): Metadata = { + val map = parse(json).values.asInstanceOf[Map[String, Any]] + fromMap(map.toMap) + } + + private def fromMap(map: Map[String, Any]): Metadata = { + val builder = new MetadataBuilder + map.foreach { + case (key, value: Int) => + builder.putInt(key, value) + case (key, value: BigInt) => + builder.putInt(key, value.toInt) + case (key, value: Double) => + builder.putDouble(key, value) + case (key, value: Boolean) => + builder.putBoolean(key, value) + case (key, value: String) => + builder.putString(key, value) + case (key, value: Map[_, _]) => + builder.putMetadata(key, fromMap(value.asInstanceOf[Map[String, Any]])) + case (key, value: Seq[_]) => + if (value.isEmpty) { + builder.putIntArray(key, Seq.empty) + } else { + value.head match { + case _: Int => + builder.putIntArray(key, value.asInstanceOf[Seq[Int]].toSeq) + case _: BigInt => + builder.putIntArray(key, value.asInstanceOf[Seq[BigInt]].map(_.toInt).toSeq) + case _: Double => + builder.putDoubleArray(key, value.asInstanceOf[Seq[Double]].toSeq) + case _: Boolean => + builder.putBooleanArray(key, value.asInstanceOf[Seq[Boolean]].toSeq) + case _: String => + builder.putStringArray(key, value.asInstanceOf[Seq[String]].toSeq) + case _: Map[String, Any] => + builder.putMetadataArray( + key, value.asInstanceOf[Seq[Map[String, Any]]].map(fromMap).toSeq) + case other => + throw new RuntimeException(s"Do not support array of type ${other.getClass}.") + } + } + case other => + throw new RuntimeException(s"Do not support type ${other.getClass}.") + } + builder.build() + } + + private def toJValue(obj: Any): JValue = { + obj match { + case map: Map[_, _] => + val fields = map.toList.map { case (k: String, v) => (k, toJValue(v)) } + JObject(fields) + case arr: Seq[_] => + val values = arr.toList.map(toJValue) + JArray(values) + case x: Int => + JInt(x) + case x: Double => + JDouble(x) + case x: Boolean => + JBool(x) + case x: String => + JString(x) + case x: Metadata => + toJValue(x.map) + case other => + throw new RuntimeException(s"Do not support type ${other.getClass}.") + } + } +} + +class MetadataBuilder { + + private val map: mutable.Map[String, Any] = mutable.Map.empty + + def withMetadata(metadata: Metadata): this.type = { + map ++= metadata.map + this + } + + def putInt(key: String, value: Int): this.type = put(key, value) + + def putDouble(key: String, value: Double): this.type = put(key, value) + + def putBoolean(key: String, value: Boolean): this.type = put(key, value) + + def putString(key: String, value: String): this.type = put(key, value) + + def putMetadata(key: String, value: Metadata): this.type = put(key, value) + + def putIntArray(key: String, value: Seq[Int]): this.type = put(key, value) + + def putDoubleArray(key: String, value: Seq[Double]): this.type = put(key, value) + + def putBooleanArray(key: String, value: Seq[Boolean]): this.type = put(key, value) + + def putStringArray(key: String, value: Seq[String]): this.type = put(key, value) + + def putMetadataArray(key: String, value: Seq[Metadata]): this.type = put(key, value) + + def build(): Metadata = { + new Metadata(map.toMap) + } + + private def put(key: String, value: Any): this.type = { + map.put(key, value) + this + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala new file mode 100644 index 0000000000000..1df7e22da212c --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala @@ -0,0 +1,60 @@ +package org.apache.spark.sql.catalyst.util + +import org.json4s.jackson.JsonMethods._ +import org.scalatest.FunSuite + +class MetadataSuite extends FunSuite { + + val baseMetadata = new MetadataBuilder() + .putString("purpose", "ml") + .build() + + val summary = new MetadataBuilder() + .putInt("numFeatures", 10) + .build() + + val age = new MetadataBuilder() + .putString("name", "age") + .putInt("index", 1) + .putBoolean("categorical", false) + .putDouble("average", 45.0) + .build() + + val gender = new MetadataBuilder() + .putString("name", "gender") + .putInt("index", 5) + .putBoolean("categorical", true) + .putStringArray("categories", Seq("male", "female")) + .build() + + val metadata = new MetadataBuilder() + .withMetadata(baseMetadata) + .putMetadata("summary", summary) + .putIntArray("int[]", Seq(0, 1)) + .putDoubleArray("double[]", Seq(3.0, 4.0)) + .putBooleanArray("boolean[]", Seq(true, false)) + .putMetadataArray("features", Seq(age, gender)) + .build() + + test("metadata builder and getters") { + assert(age.getInt("index") === 1) + assert(age.getDouble("average") === 45.0) + assert(age.getBoolean("categorical") === false) + assert(age.getString("name") === "age") + assert(metadata.getString("purpose") === "ml") + assert(metadata.getMetadata("summary") === summary) + assert(metadata.getIntArray("int[]").toSeq === Seq(0, 1)) + assert(metadata.getDoubleArray("double[]").toSeq === Seq(3.0, 4.0)) + assert(metadata.getBooleanArray("boolean[]").toSeq === Seq(true, false)) + assert(gender.getStringArray("categories").toSeq === Seq("male", "female")) + assert(metadata.getMetadataArray("features").toSeq === Seq(age, gender)) + } + + test("metadata json conversion") { + val json = metadata.toJson + withClue("toJson must produce a valid JSON string") { + parse(json) + } + assert(Metadata.fromJson(json) === metadata) + } +} From 60cc131da9c29874d0060f0041b7dc0b0a6d0304 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 14 Oct 2014 10:57:30 -0700 Subject: [PATCH 11/18] add doc and header --- .../spark/sql/catalyst/util/Metadata.scala | 169 ++++++++++++++---- .../sql/catalyst/util/MetadataSuite.scala | 72 +++++--- 2 files changed, 178 insertions(+), 63 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala index bf5250e2c7341..0b91cd01f7711 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.catalyst.util import scala.collection.mutable @@ -5,61 +22,99 @@ import scala.collection.mutable import org.json4s._ import org.json4s.jackson.JsonMethods._ -import scala.reflect.ClassTag - -sealed class Metadata private[util] (val map: Map[String, Any]) extends Serializable { - - def getInt(key: String): Int = get(key) - +/** + * Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean, + * Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and + * Array[Metadata]. JSON is used for serialization. + * + * The default constructor is private. User should use either [[MetadataBuilder]] or + * [[Metadata$#fromJson]] to create Metadata instances. + * + * @param map an immutable map that stores the data + */ +sealed class Metadata private[util] (private[util] val map: Map[String, Any]) extends Serializable { + + /** Gets a Long. */ + def getLong(key: String): Long = get(key) + + /** Gets a Double. */ def getDouble(key: String): Double = get(key) + /** Gets a Boolean. */ def getBoolean(key: String): Boolean = get(key) + /** Gets a String. */ def getString(key: String): String = get(key) + /** Gets a Metadata. */ def getMetadata(key: String): Metadata = get(key) - def getIntArray(key: String): Array[Int] = getArray(key) + /** Gets a Long array. */ + def getLongArray(key: String): Array[Long] = get(key) - def getDoubleArray(key: String): Array[Double] = getArray(key) + /** Gets a Double array. */ + def getDoubleArray(key: String): Array[Double] = get(key) - def getBooleanArray(key: String): Array[Boolean] = getArray(key) + /** Gets a Boolean array. */ + def getBooleanArray(key: String): Array[Boolean] = get(key) - def getStringArray(key: String): Array[String] = getArray(key) + /** Gets a String array. */ + def getStringArray(key: String): Array[String] = get(key) - def getMetadataArray(key: String): Array[Metadata] = getArray(key) + /** Gets a Metadata array. */ + def getMetadataArray(key: String): Array[Metadata] = get(key) + /** Converts to its JSON representation. */ def toJson: String = { compact(render(Metadata.toJValue(this))) } - private def get[T](key: String): T = { - map(key).asInstanceOf[T] - } + override def toString: String = toJson - private def getArray[T: ClassTag](key: String): Array[T] = { - map(key).asInstanceOf[Seq[T]].toArray + override def equals(obj: Any): Boolean = { + obj match { + case that: Metadata => + if (map.keySet == that.map.keySet) { + map.keys.forall { k => + (map(k), that.map(k)) match { + case (v0: Array[_], v1: Array[_]) => + v0.view == v1.view + case (v0, v1) => + v0 == v1 + } + } + } else { + false + } + case other => + false + } } - override def toString: String = toJson + override def hashCode: Int = Metadata.hash(this) + + private def get[T](key: String): T = { + map(key).asInstanceOf[T] + } } object Metadata { + /** Returns an empty Metadata. */ def empty: Metadata = new Metadata(Map.empty) + /** Creates a Metadata instance from JSON. */ def fromJson(json: String): Metadata = { val map = parse(json).values.asInstanceOf[Map[String, Any]] fromMap(map.toMap) } + /** Creates a Metadata instance from Map[String, Any]. */ private def fromMap(map: Map[String, Any]): Metadata = { val builder = new MetadataBuilder map.foreach { - case (key, value: Int) => - builder.putInt(key, value) case (key, value: BigInt) => - builder.putInt(key, value.toInt) + builder.putLong(key, value.toLong) case (key, value: Double) => builder.putDouble(key, value) case (key, value: Boolean) => @@ -70,22 +125,21 @@ object Metadata { builder.putMetadata(key, fromMap(value.asInstanceOf[Map[String, Any]])) case (key, value: Seq[_]) => if (value.isEmpty) { - builder.putIntArray(key, Seq.empty) + // If it is an empty array, we cannot infer its element type. We put an empty Array[Long]. + builder.putLongArray(key, Array.empty) } else { value.head match { - case _: Int => - builder.putIntArray(key, value.asInstanceOf[Seq[Int]].toSeq) case _: BigInt => - builder.putIntArray(key, value.asInstanceOf[Seq[BigInt]].map(_.toInt).toSeq) + builder.putLongArray(key, value.asInstanceOf[Seq[BigInt]].map(_.toLong).toArray) case _: Double => - builder.putDoubleArray(key, value.asInstanceOf[Seq[Double]].toSeq) + builder.putDoubleArray(key, value.asInstanceOf[Seq[Double]].toArray) case _: Boolean => - builder.putBooleanArray(key, value.asInstanceOf[Seq[Boolean]].toSeq) + builder.putBooleanArray(key, value.asInstanceOf[Seq[Boolean]].toArray) case _: String => - builder.putStringArray(key, value.asInstanceOf[Seq[String]].toSeq) - case _: Map[String, Any] => + builder.putStringArray(key, value.asInstanceOf[Seq[String]].toSeq.toArray) + case _: Map[_, _] => builder.putMetadataArray( - key, value.asInstanceOf[Seq[Map[String, Any]]].map(fromMap).toSeq) + key, value.asInstanceOf[Seq[Map[String, Any]]].map(fromMap).toArray) case other => throw new RuntimeException(s"Do not support array of type ${other.getClass}.") } @@ -96,15 +150,16 @@ object Metadata { builder.build() } + /** Converts to JSON AST. */ private def toJValue(obj: Any): JValue = { obj match { case map: Map[_, _] => - val fields = map.toList.map { case (k: String, v) => (k, toJValue(v)) } + val fields = map.toList.map { case (k: String, v) => (k, toJValue(v))} JObject(fields) - case arr: Seq[_] => + case arr: Array[_] => val values = arr.toList.map(toJValue) JArray(values) - case x: Int => + case x: Long => JInt(x) case x: Double => JDouble(x) @@ -118,37 +173,75 @@ object Metadata { throw new RuntimeException(s"Do not support type ${other.getClass}.") } } + + /** Computes the hash code for the types we support. */ + private def hash(obj: Any): Int = { + obj match { + case map: Map[_, _] => + map.mapValues(hash).## + case arr: Array[_] => + // Seq.empty[T] has the same hashCode regardless of T. + arr.toSeq.map(hash).## + case x: Long => + x.## + case x: Double => + x.## + case x: Boolean => + x.## + case x: String => + x.## + case x: Metadata => + hash(x.map) + case other => + throw new RuntimeException(s"Do not support type ${other.getClass}.") + } + } } +/** + * Builder for [[Metadata]]. If there is a key collision, the latter will overwrite the former. + */ class MetadataBuilder { private val map: mutable.Map[String, Any] = mutable.Map.empty + /** Include the content of an existing [[Metadata]] instance. */ def withMetadata(metadata: Metadata): this.type = { map ++= metadata.map this } - def putInt(key: String, value: Int): this.type = put(key, value) + /** Puts a Long. */ + def putLong(key: String, value: Long): this.type = put(key, value) + /** Puts a Double. */ def putDouble(key: String, value: Double): this.type = put(key, value) + /** Puts a Boolean. */ def putBoolean(key: String, value: Boolean): this.type = put(key, value) + /** Puts a String. */ def putString(key: String, value: String): this.type = put(key, value) + /** Puts a [[Metadata]]. */ def putMetadata(key: String, value: Metadata): this.type = put(key, value) - def putIntArray(key: String, value: Seq[Int]): this.type = put(key, value) + /** Puts a Long array. */ + def putLongArray(key: String, value: Array[Long]): this.type = put(key, value) - def putDoubleArray(key: String, value: Seq[Double]): this.type = put(key, value) + /** Puts a Double array. */ + def putDoubleArray(key: String, value: Array[Double]): this.type = put(key, value) - def putBooleanArray(key: String, value: Seq[Boolean]): this.type = put(key, value) + /** Puts a Boolean array. */ + def putBooleanArray(key: String, value: Array[Boolean]): this.type = put(key, value) - def putStringArray(key: String, value: Seq[String]): this.type = put(key, value) + /** Puts a String array. */ + def putStringArray(key: String, value: Array[String]): this.type = put(key, value) - def putMetadataArray(key: String, value: Seq[Metadata]): this.type = put(key, value) + /** Puts a [[Metadata]] array. */ + def putMetadataArray(key: String, value: Array[Metadata]): this.type = put(key, value) + /** Builds the [[Metadata]] instance. */ def build(): Metadata = { new Metadata(map.toMap) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala index 1df7e22da212c..55fb0f8ec2785 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala @@ -1,49 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.catalyst.util -import org.json4s.jackson.JsonMethods._ +import org.json4s.jackson.JsonMethods.parse import org.scalatest.FunSuite class MetadataSuite extends FunSuite { val baseMetadata = new MetadataBuilder() - .putString("purpose", "ml") - .build() + .putString("purpose", "ml") + .putBoolean("isBase", true) + .build() val summary = new MetadataBuilder() - .putInt("numFeatures", 10) - .build() + .putLong("numFeatures", 10L) + .build() val age = new MetadataBuilder() - .putString("name", "age") - .putInt("index", 1) - .putBoolean("categorical", false) - .putDouble("average", 45.0) - .build() + .putString("name", "age") + .putLong("index", 1L) + .putBoolean("categorical", false) + .putDouble("average", 45.0) + .build() val gender = new MetadataBuilder() - .putString("name", "gender") - .putInt("index", 5) - .putBoolean("categorical", true) - .putStringArray("categories", Seq("male", "female")) - .build() + .putString("name", "gender") + .putLong("index", 5) + .putBoolean("categorical", true) + .putStringArray("categories", Array("male", "female")) + .build() val metadata = new MetadataBuilder() - .withMetadata(baseMetadata) - .putMetadata("summary", summary) - .putIntArray("int[]", Seq(0, 1)) - .putDoubleArray("double[]", Seq(3.0, 4.0)) - .putBooleanArray("boolean[]", Seq(true, false)) - .putMetadataArray("features", Seq(age, gender)) - .build() + .withMetadata(baseMetadata) + .putBoolean("isBase", false) // overwrite an existing key + .putMetadata("summary", summary) + .putLongArray("long[]", Array(0L, 1L)) + .putDoubleArray("double[]", Array(3.0, 4.0)) + .putBooleanArray("boolean[]", Array(true, false)) + .putMetadataArray("features", Array(age, gender)) + .build() test("metadata builder and getters") { - assert(age.getInt("index") === 1) + assert(age.getLong("index") === 1L) assert(age.getDouble("average") === 45.0) assert(age.getBoolean("categorical") === false) assert(age.getString("name") === "age") assert(metadata.getString("purpose") === "ml") + assert(metadata.getBoolean("isBase") === false) assert(metadata.getMetadata("summary") === summary) - assert(metadata.getIntArray("int[]").toSeq === Seq(0, 1)) + assert(metadata.getLongArray("long[]").toSeq === Seq(0L, 1L)) assert(metadata.getDoubleArray("double[]").toSeq === Seq(3.0, 4.0)) assert(metadata.getBooleanArray("boolean[]").toSeq === Seq(true, false)) assert(gender.getStringArray("categories").toSeq === Seq("male", "female")) @@ -55,6 +75,8 @@ class MetadataSuite extends FunSuite { withClue("toJson must produce a valid JSON string") { parse(json) } - assert(Metadata.fromJson(json) === metadata) + val parsed = Metadata.fromJson(json) + assert(parsed === metadata) + assert(parsed.## === metadata.##) } } From 1fcbf13818383e8cc536360e73ae788faae594d3 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 14 Oct 2014 14:05:20 -0700 Subject: [PATCH 12/18] change metadata type in StructField for Scala/Java --- .../sql/catalyst/expressions/Expression.scala | 3 +- .../expressions/namedExpressions.scala | 5 +- .../spark/sql/catalyst/types/dataTypes.scala | 8 ++- .../spark/sql/catalyst/util/Metadata.scala | 59 +++++++++---------- .../sql/catalyst/util/MetadataSuite.scala | 2 +- .../apache/spark/sql/api/java/DataType.java | 8 ++- .../spark/sql/api/java/StructField.java | 8 ++- .../sql/types/util/DataTypeConversions.scala | 4 +- .../org/apache/spark/sql/DataTypeSuite.scala | 6 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 22 +++---- .../ScalaSideDataTypeConversionSuite.scala | 6 +- 11 files changed, 74 insertions(+), 57 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 5e3ea7dee1262..9776f51a1c0ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.types.{DataType, FractionalType, IntegralType, NumericType, NativeType} +import org.apache.spark.sql.catalyst.util.Metadata abstract class Expression extends TreeNode[Expression] { self: Product => @@ -44,7 +45,7 @@ abstract class Expression extends TreeNode[Expression] { def references: AttributeSet = AttributeSet(children.flatMap(_.references.iterator)) /** Returns the metadata when an expression is a reference to another expression with metadata. */ - def metadata: Map[String, Any] = Map.empty + def metadata: Metadata = Metadata.empty /** Returns the result of evaluating this expression on a given input Row */ def eval(input: Row = null): EvaluatedType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index b2340370a4934..ceb395f3bb041 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.util.Metadata object NamedExpression { private val curId = new java.util.concurrent.atomic.AtomicLong() @@ -86,7 +87,7 @@ case class Alias(child: Expression, name: String) override def dataType = child.dataType override def nullable = child.nullable - override def metadata: Map[String, Any] = child.metadata + override def metadata: Metadata = child.metadata override def toAttribute = { if (resolved) { @@ -118,7 +119,7 @@ case class AttributeReference( name: String, dataType: DataType, nullable: Boolean = true, - override val metadata: Map[String, Any] = Map.empty)( + override val metadata: Metadata = Metadata.empty)( val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index db6fd891717dd..970d98626fb7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.types import java.sql.Timestamp +import org.apache.spark.sql.catalyst.util.Metadata + import scala.math.Numeric.{BigDecimalAsIfIntegral, DoubleAsIfIntegral, FloatAsIfIntegral} import scala.reflect.ClassTag import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag} @@ -74,7 +76,7 @@ object DataType { ("name", JString(name)), ("nullable", JBool(nullable)), ("type", dataType: JValue)) => - StructField(name, parseDataType(dataType), nullable, metadata.values) + StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata)) } @deprecated("Use DataType.fromJson instead") @@ -386,7 +388,7 @@ case class StructField( name: String, dataType: DataType, nullable: Boolean, - metadata: Map[String, Any] = Map.empty) { + metadata: Metadata = Metadata.empty) { private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = { builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n") @@ -402,7 +404,7 @@ case class StructField( ("name" -> name) ~ ("type" -> dataType.jsonValue) ~ ("nullable" -> nullable) ~ - ("metadata" -> Extraction.decompose(metadata)(DefaultFormats)) + ("metadata" -> metadata.jsonValue) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala index 0b91cd01f7711..e16edd94fb24d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala @@ -65,11 +65,9 @@ sealed class Metadata private[util] (private[util] val map: Map[String, Any]) ex def getMetadataArray(key: String): Array[Metadata] = get(key) /** Converts to its JSON representation. */ - def toJson: String = { - compact(render(Metadata.toJValue(this))) - } + def json: String = compact(render(jsonValue)) - override def toString: String = toJson + override def toString: String = json override def equals(obj: Any): Boolean = { obj match { @@ -96,6 +94,8 @@ sealed class Metadata private[util] (private[util] val map: Map[String, Any]) ex private def get[T](key: String): T = { map(key).asInstanceOf[T] } + + private[sql] def jsonValue: JValue = Metadata.toJsonValue(this) } object Metadata { @@ -105,41 +105,40 @@ object Metadata { /** Creates a Metadata instance from JSON. */ def fromJson(json: String): Metadata = { - val map = parse(json).values.asInstanceOf[Map[String, Any]] - fromMap(map.toMap) + fromJObject(parse(json).asInstanceOf[JObject]) } - /** Creates a Metadata instance from Map[String, Any]. */ - private def fromMap(map: Map[String, Any]): Metadata = { + /** Creates a Metadata instance from JSON AST. */ + private[sql] def fromJObject(jObj: JObject): Metadata = { val builder = new MetadataBuilder - map.foreach { - case (key, value: BigInt) => + jObj.obj.foreach { + case (key, JInt(value)) => builder.putLong(key, value.toLong) - case (key, value: Double) => + case (key, JDouble(value)) => builder.putDouble(key, value) - case (key, value: Boolean) => + case (key, JBool(value)) => builder.putBoolean(key, value) - case (key, value: String) => + case (key, JString(value)) => builder.putString(key, value) - case (key, value: Map[_, _]) => - builder.putMetadata(key, fromMap(value.asInstanceOf[Map[String, Any]])) - case (key, value: Seq[_]) => + case (key, o: JObject) => + builder.putMetadata(key, fromJObject(o)) + case (key, JArray(value)) => if (value.isEmpty) { // If it is an empty array, we cannot infer its element type. We put an empty Array[Long]. builder.putLongArray(key, Array.empty) } else { value.head match { - case _: BigInt => - builder.putLongArray(key, value.asInstanceOf[Seq[BigInt]].map(_.toLong).toArray) - case _: Double => - builder.putDoubleArray(key, value.asInstanceOf[Seq[Double]].toArray) - case _: Boolean => - builder.putBooleanArray(key, value.asInstanceOf[Seq[Boolean]].toArray) - case _: String => - builder.putStringArray(key, value.asInstanceOf[Seq[String]].toSeq.toArray) - case _: Map[_, _] => + case _: JInt => + builder.putLongArray(key, value.asInstanceOf[List[JInt]].map(_.num.toLong).toArray) + case _: JDouble => + builder.putDoubleArray(key, value.asInstanceOf[List[JDouble]].map(_.num).toArray) + case _: JBool => + builder.putBooleanArray(key, value.asInstanceOf[List[JBool]].map(_.value).toArray) + case _: JString => + builder.putStringArray(key, value.asInstanceOf[List[JString]].map(_.s).toArray) + case _: JObject => builder.putMetadataArray( - key, value.asInstanceOf[Seq[Map[String, Any]]].map(fromMap).toArray) + key, value.asInstanceOf[List[JObject]].map(fromJObject).toArray) case other => throw new RuntimeException(s"Do not support array of type ${other.getClass}.") } @@ -151,13 +150,13 @@ object Metadata { } /** Converts to JSON AST. */ - private def toJValue(obj: Any): JValue = { + private def toJsonValue(obj: Any): JValue = { obj match { case map: Map[_, _] => - val fields = map.toList.map { case (k: String, v) => (k, toJValue(v))} + val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v))} JObject(fields) case arr: Array[_] => - val values = arr.toList.map(toJValue) + val values = arr.toList.map(toJsonValue) JArray(values) case x: Long => JInt(x) @@ -168,7 +167,7 @@ object Metadata { case x: String => JString(x) case x: Metadata => - toJValue(x.map) + toJsonValue(x.map) case other => throw new RuntimeException(s"Do not support type ${other.getClass}.") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala index 55fb0f8ec2785..0063d31666c85 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala @@ -71,7 +71,7 @@ class MetadataSuite extends FunSuite { } test("metadata json conversion") { - val json = metadata.toJson + val json = metadata.json withClue("toJson must produce a valid JSON string") { parse(json) } diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java index 7750a781996ff..cbb9b5a80d5b5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.api.java; +import org.apache.spark.sql.catalyst.util.Metadata; + import java.util.*; /** @@ -148,7 +150,7 @@ public static StructField createStructField( String name, DataType dataType, boolean nullable, - Map metadata) { + Metadata metadata) { if (name == null) { throw new IllegalArgumentException("name should not be null."); } @@ -165,10 +167,10 @@ public static StructField createStructField( /** * Creates a StructField with empty metadata. * - * @see #createStructField(String, DataType, boolean, java.util.Map) + * @see #createStructField(String, DataType, boolean, Metadata) */ public static StructField createStructField(String name, DataType dataType, boolean nullable) { - return createStructField(name, dataType, nullable, new HashMap()); + return createStructField(name, dataType, nullable, Metadata.empty()); } /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java index 86b6b3ca4f93d..3d84cda879124 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.api.java; +import org.apache.spark.sql.catalyst.util.Metadata; + import java.util.Map; /** @@ -37,13 +39,13 @@ public class StructField { private String name; private DataType dataType; private boolean nullable; - private Map metadata; + private Metadata metadata; protected StructField( String name, DataType dataType, boolean nullable, - Map metadata) { + Metadata metadata) { this.name = name; this.dataType = dataType; this.nullable = nullable; @@ -62,7 +64,7 @@ public boolean isNullable() { return nullable; } - public Map getMetadata() { + public Metadata getMetadata() { return metadata; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala index 37703abb62463..4b1fb6868e2c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala @@ -32,7 +32,7 @@ protected[sql] object DataTypeConversions { scalaStructField.name, asJavaDataType(scalaStructField.dataType), scalaStructField.nullable, - scalaStructField.metadata.asJava.asInstanceOf[java.util.Map[String, Object]]) + scalaStructField.metadata) } /** @@ -69,7 +69,7 @@ protected[sql] object DataTypeConversions { javaStructField.getName, asScalaDataType(javaStructField.getDataType), javaStructField.isNullable, - javaStructField.getMetadata.asScala.toMap) + javaStructField.getMetadata) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala index a44e50af6a8cb..f874d9d279098 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.util.MetadataBuilder import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.types.DataType @@ -79,9 +80,12 @@ class DataTypeSuite extends FunSuite { checkDataTypeJsonRepr(ArrayType(StringType, false)) checkDataTypeJsonRepr(MapType(IntegerType, StringType, true)) checkDataTypeJsonRepr(MapType(IntegerType, ArrayType(DoubleType), false)) + val metadata = new MetadataBuilder() + .putString("name", "age") + .build() checkDataTypeJsonRepr( StructType(Seq( StructField("a", IntegerType, nullable = true), StructField("b", ArrayType(DoubleType), nullable = false), - StructField("c", DoubleType, nullable = false, metadata = Map("name" -> "age"))))) + StructField("c", DoubleType, nullable = false, metadata)))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 785a8376679df..d9e35b8650901 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -17,16 +17,15 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.joins.BroadcastHashJoin -import org.apache.spark.sql.test._ -import org.scalatest.BeforeAndAfterAll import java.util.TimeZone -/* Implicits */ -import TestSQLContext._ -import TestData._ +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.TestData._ +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.MetadataBuilder +import org.apache.spark.sql.test.TestSQLContext._ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { // Make sure the tables are loaded. @@ -684,11 +683,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { val schema = person.schema val docKey = "doc" val docValue = "first name" + val metadata = new MetadataBuilder() + .putString(docKey, docValue) + .build() val schemaWithMeta = new StructType(Seq( - schema("id"), schema("name").copy(metadata = Map(docKey -> docValue)), schema("age"))) + schema("id"), schema("name").copy(metadata = metadata), schema("age"))) val personWithMeta = applySchema(person, schemaWithMeta) def validateMetadata(rdd: SchemaRDD): Unit = { - assert(rdd.schema("name").metadata(docKey) === docValue) + assert(rdd.schema("name").metadata.getString(docKey) == docValue) } personWithMeta.registerTempTable("personWithMeta") validateMetadata(personWithMeta.select('name)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala index 212f06a22c10f..688d9ff724dfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.api.java +import org.apache.spark.sql.catalyst.util.MetadataBuilder import org.apache.spark.sql.types.util.DataTypeConversions import org.scalatest.FunSuite @@ -66,12 +67,15 @@ class ScalaSideDataTypeConversionSuite extends FunSuite { checkDataType(simpleScalaStructType) // Complex StructType. + val metadata = new MetadataBuilder() + .putString("name", "age") + .build() val complexScalaStructType = SStructType( SStructField("simpleArray", simpleScalaArrayType, true) :: SStructField("simpleMap", simpleScalaMapType, true) :: SStructField("simpleStruct", simpleScalaStructType, true) :: SStructField("boolean", org.apache.spark.sql.BooleanType, false) :: - SStructField("withMeta", org.apache.spark.sql.DoubleType, false, Map("name" -> "age")) :: Nil) + SStructField("withMeta", org.apache.spark.sql.DoubleType, false, metadata) :: Nil) checkDataType(complexScalaStructType) // Complex ArrayType. From c9d7301b5e395b8e51030f1be17e35c05de23b7a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 14 Oct 2014 14:10:54 -0700 Subject: [PATCH 13/18] organize imports --- .../org/apache/spark/sql/catalyst/types/dataTypes.scala | 9 +++------ .../java/org/apache/spark/sql/api/java/DataType.java | 4 ++-- .../java/org/apache/spark/sql/api/java/StructField.java | 9 ++++----- .../test/scala/org/apache/spark/sql/DataTypeSuite.scala | 2 +- .../sql/api/java/ScalaSideDataTypeConversionSuite.scala | 9 ++++----- 5 files changed, 14 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 970d98626fb7a..15a2d599450e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,23 +19,21 @@ package org.apache.spark.sql.catalyst.types import java.sql.Timestamp -import org.apache.spark.sql.catalyst.util.Metadata - import scala.math.Numeric.{BigDecimalAsIfIntegral, DoubleAsIfIntegral, FloatAsIfIntegral} import scala.reflect.ClassTag import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag} import scala.util.parsing.combinator.RegexParsers -import org.json4s.JsonAST.JValue import org.json4s._ +import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.catalyst.ScalaReflectionLock import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.util.Metadata import org.apache.spark.util.Utils - object DataType { def fromJson(json: String): DataType = parseDataType(parse(json)) @@ -380,8 +378,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT * @param name The name of this field. * @param dataType The data type of this field. * @param nullable Indicates if values of this field can be `null` values. - * @param metadata The metadata of this field, which is a map from string to simple type that can be - * serialized to JSON automatically. The metadata should be preserved during + * @param metadata The metadata of this field. The metadata should be preserved during * transformation if the content of the column is not modified, e.g, in selection. */ case class StructField( diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java index cbb9b5a80d5b5..66eac38937b5e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java @@ -17,10 +17,10 @@ package org.apache.spark.sql.api.java; -import org.apache.spark.sql.catalyst.util.Metadata; - import java.util.*; +import org.apache.spark.sql.catalyst.util.Metadata; + /** * The base type of all Spark SQL data types. * diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java index 3d84cda879124..264464f90fdf3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java @@ -17,10 +17,10 @@ package org.apache.spark.sql.api.java; -import org.apache.spark.sql.catalyst.util.Metadata; - import java.util.Map; +import org.apache.spark.sql.catalyst.util.Metadata; + /** * A StructField object represents a field in a StructType object. * A StructField object comprises three fields, {@code String name}, {@code DataType dataType}, @@ -28,11 +28,10 @@ * The field of {@code dataType} specifies the data type of a StructField. * The field of {@code nullable} specifies if values of a StructField can contain {@code null} * values. - * The field of {@code metadata} provides extra information of the StructField, which is a map from - * string to simple type that can be serialized to JSON automatically + * The field of {@code metadata} provides extra information of the StructField. * * To create a {@link StructField}, - * {@link DataType#createStructField(String, DataType, boolean, Map)} + * {@link DataType#createStructField(String, DataType, boolean, Metadata)} * should be used. */ public class StructField { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala index f874d9d279098..a1d5907b357b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.util.MetadataBuilder import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.types.DataType +import org.apache.spark.sql.catalyst.util.MetadataBuilder class DataTypeSuite extends FunSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala index 688d9ff724dfa..91bbc00be7090 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.api.java -import org.apache.spark.sql.catalyst.util.MetadataBuilder -import org.apache.spark.sql.types.util.DataTypeConversions import org.scalatest.FunSuite -import org.apache.spark.sql.{DataType => SDataType, StructField => SStructField} -import org.apache.spark.sql.{StructType => SStructType} -import DataTypeConversions._ +import org.apache.spark.sql.{DataType => SDataType, StructField => SStructField, + StructType => SStructType} +import org.apache.spark.sql.catalyst.util.MetadataBuilder +import org.apache.spark.sql.types.util.DataTypeConversions._ class ScalaSideDataTypeConversionSuite extends FunSuite { From 3f49aab1342fd2d877749c7cce5cfa6bc8ac1fa7 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 14 Oct 2014 22:41:46 -0700 Subject: [PATCH 14/18] remove StructField.toString --- .../org/apache/spark/sql/catalyst/types/dataTypes.scala | 5 ----- .../scala/org/apache/spark/sql/catalyst/util/Metadata.scala | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index ec12f09bc6ffa..56bb980b20946 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -401,11 +401,6 @@ case class StructField( DataType.buildFormattedString(dataType, s"$prefix |", builder) } - override def toString: String = { - // Do not add metadata to be consistent with CaseClassStringParser. - s"StructField($name,$dataType,$nullable)" - } - private[sql] def jsonValue: JValue = { ("name" -> name) ~ ("type" -> dataType.jsonValue) ~ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala index e16edd94fb24d..469a429eccc82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala @@ -153,7 +153,7 @@ object Metadata { private def toJsonValue(obj: Any): JValue = { obj match { case map: Map[_, _] => - val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v))} + val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) } JObject(fields) case arr: Array[_] => val values = arr.toList.map(toJsonValue) From 4266f4dd4df4b006d3a54144558cb92bf46003a7 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 15 Oct 2014 12:52:14 -0700 Subject: [PATCH 15/18] add StructField.toString back for backward compatibility --- .../org/apache/spark/sql/catalyst/types/dataTypes.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 56bb980b20946..bad1ddb7baf92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -77,7 +77,7 @@ object DataType { StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata)) } - @deprecated("Use DataType.fromJson instead") + @deprecated("Use DataType.fromJson instead", "1.2.0") def fromCaseClassString(string: String): DataType = CaseClassStringParser(string) private object CaseClassStringParser extends RegexParsers { @@ -401,6 +401,9 @@ case class StructField( DataType.buildFormattedString(dataType, s"$prefix |", builder) } + // override the default toString to be compatible with legacy parquet files. + override def toString: String = s"StructField($name,$dataType,$nullable)" + private[sql] def jsonValue: JValue = { ("name" -> name) ~ ("type" -> dataType.jsonValue) ~ From 611d3c20cf4aed9927b596d89b9ac96b2cbbcdec Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Mon, 20 Oct 2014 22:31:50 -0700 Subject: [PATCH 16/18] move metadata from Expr to NamedExpr --- .../spark/sql/catalyst/expressions/Expression.scala | 3 --- .../sql/catalyst/expressions/namedExpressions.scala | 12 ++++++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 9776f51a1c0ec..39b120e8de485 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -44,9 +44,6 @@ abstract class Expression extends TreeNode[Expression] { def nullable: Boolean def references: AttributeSet = AttributeSet(children.flatMap(_.references.iterator)) - /** Returns the metadata when an expression is a reference to another expression with metadata. */ - def metadata: Metadata = Metadata.empty - /** Returns the result of evaluating this expression on a given input Row */ def eval(input: Row = null): EvaluatedType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 38649ae22bd92..07669298faba9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -44,6 +44,9 @@ abstract class NamedExpression extends Expression { def toAttribute: Attribute + /** Returns the metadata when an expression is a reference to another expression with metadata. */ + def metadata: Metadata = Metadata.empty + protected def typeSuffix = if (resolved) { dataType match { @@ -89,11 +92,16 @@ case class Alias(child: Expression, name: String) override def dataType = child.dataType override def nullable = child.nullable - override def metadata: Metadata = child.metadata + override def metadata: Metadata = { + child match { + case named: NamedExpression => named.metadata + case _ => Metadata.empty + } + } override def toAttribute = { if (resolved) { - AttributeReference(name, child.dataType, child.nullable, child.metadata)(exprId, qualifiers) + AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifiers) } else { UnresolvedAttribute(name) } From 1e2abcf66ff9b0f17280f083315905b122e9a584 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 23 Oct 2014 16:55:44 -0700 Subject: [PATCH 17/18] change default value of metadata to None in python --- python/pyspark/sql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 18e73da154968..33647c30e0b73 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -305,7 +305,7 @@ class StructField(DataType): """ - def __init__(self, name, dataType, nullable, metadata={}): + def __init__(self, name, dataType, nullable, metadata=None): """Creates a StructField :param name: the name of this field. :param dataType: the data type of this field. @@ -325,7 +325,7 @@ def __init__(self, name, dataType, nullable, metadata={}): self.name = name self.dataType = dataType self.nullable = nullable - self.metadata = metadata + self.metadata = metadata or {} def __repr__(self): return "StructField(%s,%s,%s)" % (self.name, self.dataType, From 886b85cb18a3635f9ae1f81e7d070605b5bec895 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 28 Oct 2014 18:47:52 -0700 Subject: [PATCH 18/18] Expose Metadata and MetadataBuilder through the public scala and java packages. --- .../spark/sql/catalyst/util/Metadata.scala | 3 ++ .../apache/spark/sql/api/java/DataType.java | 5 +-- .../apache/spark/sql/api/java/Metadata.java | 31 +++++++++++++++++++ .../spark/sql/api/java/MetadataBuilder.java | 28 +++++++++++++++++ .../spark/sql/api/java/StructField.java | 2 -- .../org/apache/spark/sql/SQLContext.scala | 2 -- .../scala/org/apache/spark/sql/package.scala | 23 ++++++++++++++ .../sql/types/util/DataTypeConversions.scala | 4 +-- .../org/apache/spark/sql/DataTypeSuite.scala | 3 -- .../org/apache/spark/sql/SQLQuerySuite.scala | 3 +- .../ScalaSideDataTypeConversionSuite.scala | 4 +-- 11 files changed, 90 insertions(+), 18 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/api/java/Metadata.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/api/java/MetadataBuilder.java diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala index 469a429eccc82..2f2082fa3c863 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala @@ -204,6 +204,9 @@ class MetadataBuilder { private val map: mutable.Map[String, Any] = mutable.Map.empty + /** Returns the immutable version of this map. Used for java interop. */ + protected def getMap = map.toMap + /** Include the content of an existing [[Metadata]] instance. */ def withMetadata(metadata: Metadata): this.type = { map ++= metadata.map diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java index b69fa0074fbb2..0c85cdc0aa640 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/DataType.java @@ -19,8 +19,6 @@ import java.util.*; -import org.apache.spark.sql.catalyst.util.Metadata; - /** * The base type of all Spark SQL data types. * @@ -175,7 +173,7 @@ public static StructField createStructField( * @see #createStructField(String, DataType, boolean, Metadata) */ public static StructField createStructField(String name, DataType dataType, boolean nullable) { - return createStructField(name, dataType, nullable, Metadata.empty()); + return createStructField(name, dataType, nullable, (new MetadataBuilder()).build()); } /** @@ -207,5 +205,4 @@ public static StructType createStructType(StructField[] fields) { return new StructType(fields); } - } diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/Metadata.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/Metadata.java new file mode 100644 index 0000000000000..0f819fb01a76a --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/Metadata.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.api.java; + +/** + * Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean, + * Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and + * Array[Metadata]. JSON is used for serialization. + * + * The default constructor is private. User should use [[MetadataBuilder]]. + */ +class Metadata extends org.apache.spark.sql.catalyst.util.Metadata { + Metadata(scala.collection.immutable.Map map) { + super(map); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/MetadataBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/MetadataBuilder.java new file mode 100644 index 0000000000000..6e6b12f0722c5 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/MetadataBuilder.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.api.java; + +/** + * Builder for [[Metadata]]. If there is a key collision, the latter will overwrite the former. + */ +public class MetadataBuilder extends org.apache.spark.sql.catalyst.util.MetadataBuilder { + @Override + public Metadata build() { + return new Metadata(getMap()); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java index 264464f90fdf3..7c60d492bcdf0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java +++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/StructField.java @@ -19,8 +19,6 @@ import java.util.Map; -import org.apache.spark.sql.catalyst.util.Metadata; - /** * A StructField object represents a field in a StructType object. * A StructField object comprises three fields, {@code String name}, {@code DataType dataType}, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0e4a9ca60b00d..c4da060ee6bdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -32,8 +32,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{Optimizer, DefaultOptimizer} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.catalyst.types.DataType -import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.execution.{SparkStrategies, _} import org.apache.spark.sql.json._ import org.apache.spark.sql.parquet.ParquetRelation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index e98d151286818..f0e57e2a7447b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -125,6 +125,9 @@ package object sql { @DeveloperApi type DataType = catalyst.types.DataType + @DeveloperApi + val DataType = catalyst.types.DataType + /** * :: DeveloperApi :: * @@ -414,4 +417,24 @@ package object sql { */ @DeveloperApi val StructField = catalyst.types.StructField + + /** + * :: DeveloperApi :: + * + * Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean, + * Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and + * Array[Metadata]. JSON is used for serialization. + * + * The default constructor is private. User should use either [[MetadataBuilder]] or + * [[Metadata$#fromJson]] to create Metadata instances. + * + * @param map an immutable map that stores the data + */ + @DeveloperApi + type Metadata = catalyst.util.Metadata + + /** + * Builder for [[Metadata]]. If there is a key collision, the latter will overwrite the former. + */ + type MetadataBuilder = catalyst.util.MetadataBuilder } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala index 793cc8e2e0dbf..aa3f76412b9b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.types.util import org.apache.spark.sql._ -import org.apache.spark.sql.api.java.{DataType => JDataType, StructField => JStructField} +import org.apache.spark.sql.api.java.{DataType => JDataType, StructField => JStructField, MetadataBuilder => JMetaDataBuilder} import scala.collection.JavaConverters._ @@ -32,7 +32,7 @@ protected[sql] object DataTypeConversions { scalaStructField.name, asJavaDataType(scalaStructField.dataType), scalaStructField.nullable, - scalaStructField.metadata) + (new JMetaDataBuilder).withMetadata(scalaStructField.metadata).build()) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala index a1d5907b357b0..6c9db639c0f6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala @@ -19,9 +19,6 @@ package org.apache.spark.sql import org.scalatest.FunSuite -import org.apache.spark.sql.catalyst.types.DataType -import org.apache.spark.sql.catalyst.util.MetadataBuilder - class DataTypeSuite extends FunSuite { test("construct an ArrayType") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9f59d665dd28e..f995c7c9553cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -25,8 +25,7 @@ import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.joins.BroadcastHashJoin -import org.apache.spark.sql.catalyst.util.MetadataBuilder + import org.apache.spark.sql.test.TestSQLContext._ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala index 91bbc00be7090..93c74fa903896 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/api/java/ScalaSideDataTypeConversionSuite.scala @@ -19,9 +19,7 @@ package org.apache.spark.sql.api.java import org.scalatest.FunSuite -import org.apache.spark.sql.{DataType => SDataType, StructField => SStructField, - StructType => SStructType} -import org.apache.spark.sql.catalyst.util.MetadataBuilder +import org.apache.spark.sql.{DataType => SDataType, StructField => SStructField, StructType => SStructType} import org.apache.spark.sql.types.util.DataTypeConversions._ class ScalaSideDataTypeConversionSuite extends FunSuite {