Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f31065f
A query plan or a SchemaRDD can print out its schema.
yhuai Jun 6, 2014
f45583b
Infer the schema of a JSON dataset (a text file with one JSON object …
yhuai Jun 6, 2014
af91b23
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 6, 2014
0576406
Add Apache license header.
yhuai Jun 6, 2014
f3ce176
After type conflict resolution, if a NullType is found, StringType is…
yhuai Jun 6, 2014
a2313a6
Address PR comments.
yhuai Jun 10, 2014
666b957
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 10, 2014
0387523
Address PR comments.
yhuai Jun 10, 2014
52a2275
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 10, 2014
8846af5
API doc.
yhuai Jun 10, 2014
65b87f0
Fix sampling...
yhuai Jun 10, 2014
4325475
If a sampled dataset is used for schema inferring, update the schema …
yhuai Jun 11, 2014
a5a4b52
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 11, 2014
8ffed79
Update the example.
yhuai Jun 11, 2014
66f9e76
Update docs and use the entire dataset to infer the schema.
yhuai Jun 13, 2014
8347f2e
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 13, 2014
6df0891
Apache header.
yhuai Jun 13, 2014
ab810b0
Make JsonRDD private.
yhuai Jun 13, 2014
d0bd412
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 13, 2014
cff84cc
Use a SchemaRDD for a JSON dataset.
yhuai Jun 16, 2014
7027634
Java API.
yhuai Jun 16, 2014
9df8c5a
Python API.
yhuai Jun 16, 2014
4fbddf0
Programming guide.
yhuai Jun 16, 2014
6d20b85
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 16, 2014
e7a6c19
SchemaRDD.javaToPython should convert a field with the StructType to …
yhuai Jun 16, 2014
83013fb
Update Java Example.
yhuai Jun 16, 2014
6a5f5ef
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 16, 2014
7ea750e
marmbrus's comments.
yhuai Jun 16, 2014
d7a005c
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 16, 2014
1f908ce
Remove extra line.
yhuai Jun 16, 2014
5428451
Newline
yhuai Jun 16, 2014
79ea9ba
Fix typos.
yhuai Jun 17, 2014
e2773a6
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 17, 2014
ce31c81
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 17, 2014
94ffdaa
Remove "get" from method names.
yhuai Jun 17, 2014
bc9ac51
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 17, 2014
ce8eedd
rxin's comments.
yhuai Jun 18, 2014
227e89e
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 18, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ spark-env.sh.template
log4j-defaults.properties
sorttable.js
.*txt
.*json
.*data
.*log
cloudpickle.py
Expand Down
290 changes: 222 additions & 68 deletions docs/sql-programming-guide.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.examples.sql;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -56,6 +57,7 @@ public static void main(String[] args) throws Exception {
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
Expand Down Expand Up @@ -84,16 +86,88 @@ public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
for (String name: teenagerNames) {
System.out.println(name);
}

System.out.println("=== Data source: Parquet File ===");
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");

// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
JavaSchemaRDD teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
for (String name: teenagerNames) {
System.out.println(name);
}

System.out.println("=== Data source: JSON Dataset ===");
// A JSON dataset is pointed by path.
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a JavaSchemaRDD from the file(s) pointed by path
JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);

// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
peopleFromJsonFile.printSchema();
// The schema of people is ...
// root
// |-- age: IntegerType
// |-- name: StringType

// Register this JavaSchemaRDD as a table.
peopleFromJsonFile.registerAsTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.map(new Function<Row, String>() {
public String call(Row row) { return "Name: " + row.getString(0); }
}).collect();
for (String name: teenagerNames) {
System.out.println(name);
}

// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
// a RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD);

// Take a look at the schema of this new JavaSchemaRDD.
peopleFromJsonRDD.printSchema();
// The schema of anotherPeople is ...
// root
// |-- address: StructType
// | |-- city: StringType
// | |-- state: StringType
// |-- name: StringType

peopleFromJsonRDD.registerAsTable("people2");

JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
}
}).collect();
for (String name: nameAndCity) {
System.out.println(name);
}
}
}
3 changes: 3 additions & 0 deletions examples/src/main/resources/people.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
22 changes: 18 additions & 4 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object SparkBuild extends Build {

lazy val catalyst = Project("catalyst", file("sql/catalyst"), settings = catalystSettings) dependsOn(core)

lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core, catalyst)
lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core) dependsOn(catalyst % "compile->compile;test->test")

lazy val hive = Project("hive", file("sql/hive"), settings = hiveSettings) dependsOn(sql)

Expand Down Expand Up @@ -501,9 +501,23 @@ object SparkBuild extends Build {
def sqlCoreSettings = sharedSettings ++ Seq(
name := "spark-sql",
libraryDependencies ++= Seq(
"com.twitter" % "parquet-column" % parquetVersion,
"com.twitter" % "parquet-hadoop" % parquetVersion
)
"com.twitter" % "parquet-column" % parquetVersion,
"com.twitter" % "parquet-hadoop" % parquetVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.3.0" // json4s-jackson 3.2.6 requires jackson-databind 2.3.0.
),
initialCommands in console :=
"""
|import org.apache.spark.sql.catalyst.analysis._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to import all of these?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the console is primarily for developers then I find it pretty useful to have all the sorts of things I'd want for debugging in scope. This is how hive/console is already.

|import org.apache.spark.sql.catalyst.dsl._
|import org.apache.spark.sql.catalyst.errors._
|import org.apache.spark.sql.catalyst.expressions._
|import org.apache.spark.sql.catalyst.plans.logical._
|import org.apache.spark.sql.catalyst.rules._
|import org.apache.spark.sql.catalyst.types._
|import org.apache.spark.sql.catalyst.util._
|import org.apache.spark.sql.execution
|import org.apache.spark.sql.test.TestSQLContext._
|import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
)

// Since we don't include hive in the main assembly this project also acts as an alternative
Expand Down
64 changes: 62 additions & 2 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

from pyspark.rdd import RDD
from pyspark.rdd import RDD, PipelinedRDD
from pyspark.serializers import BatchedSerializer, PickleSerializer

from py4j.protocol import Py4JError
Expand Down Expand Up @@ -137,6 +137,53 @@ def parquetFile(self, path):
jschema_rdd = self._ssql_ctx.parquetFile(path)
return SchemaRDD(jschema_rdd, self)


def jsonFile(self, path):
"""Loads a text file storing one JSON object per line,
returning the result as a L{SchemaRDD}.
It goes through the entire dataset once to determine the schema.

>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> ofn = open(jsonFile, 'w')
>>> for json in jsonStrings:
... print>>ofn, json
>>> ofn.close()
>>> srdd = sqlCtx.jsonFile(jsonFile)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
True
"""
jschema_rdd = self._ssql_ctx.jsonFile(path)
return SchemaRDD(jschema_rdd, self)

def jsonRDD(self, rdd):
"""Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}.
It goes through the entire dataset once to determine the schema.

>>> srdd = sqlCtx.jsonRDD(json)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
True
"""
def func(split, iterator):
for x in iterator:
if not isinstance(x, basestring):
x = unicode(x)
yield x.encode("utf-8")
keyed = PipelinedRDD(rdd, func)
keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._jvm.BytesToString())
jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd())
return SchemaRDD(jschema_rdd, self)

def sql(self, sqlQuery):
"""Return a L{SchemaRDD} representing the result of the given query.

Expand Down Expand Up @@ -265,7 +312,7 @@ class SchemaRDD(RDD):

For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the
L{SchemaRDD} is not operated on directly, as it's underlying
implementation is a RDD composed of Java objects. Instead it is
implementation is an RDD composed of Java objects. Instead it is
converted to a PythonRDD in the JVM, on which Python operations can
be done.
"""
Expand Down Expand Up @@ -337,6 +384,14 @@ def saveAsTable(self, tableName):
"""Creates a new table with the contents of this SchemaRDD."""
self._jschema_rdd.saveAsTable(tableName)

def schemaString(self):
"""Returns the output schema in the tree format."""
return self._jschema_rdd.schemaString()

def printSchema(self):
"""Prints out the schema in the tree format."""
print self.schemaString()

def count(self):
"""Return the number of elements in this RDD.

Expand Down Expand Up @@ -436,6 +491,11 @@ def _test():
globs['sqlCtx'] = SQLContext(sc)
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
'{"field1" : 2, "field2": "row2", "field3":{"field4":22}}',
'{"field1" : 3, "field2": "row3", "field3":{"field4":33}}']
globs['jsonStrings'] = jsonStrings
globs['json'] = sc.parallelize(jsonStrings)
globs['nestedRdd1'] = sc.parallelize([
{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
{"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
Expand Down
28 changes: 28 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,34 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>

<!--
This plugin forces the generation of jar containing catalyst test classes,
so that the tests classes of external modules can use them. The two execution profiles
are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally,
'mvn compile' should not compile test classes and therefore should not need this.
However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559)
causes the compilation to fail if catalyst test-jar is not generated. Hence, the
second execution profile for 'mvn compile'.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
<execution>
<id>test-jar-on-compile</id>
<phase>compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types._

object HiveTypeCoercion {
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
// The conversion for integral and floating point types have a linear widening hierarchy:
val numericPrecedence =
Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
// Boolean is only wider than Void
val booleanPrecedence = Seq(NullType, BooleanType)
val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
}

/**
* A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that
* participate in operations into compatible ones. Most of these rules are based on Hive semantics,
Expand Down Expand Up @@ -116,19 +126,18 @@ trait HiveTypeCoercion {
*
* Additionally, all types when UNION-ed with strings will be promoted to strings.
* Other string conversions are handled by PromoteStrings.
*
* Widening types might result in loss of precision in the following cases:
* - IntegerType to FloatType
* - LongType to FloatType
* - LongType to DoubleType
*/
object WidenTypes extends Rule[LogicalPlan] {
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
// The conversion for integral and floating point types have a linear widening hierarchy:
val numericPrecedence =
Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
// Boolean is only wider than Void
val booleanPrecedence = Seq(NullType, BooleanType)
val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil

def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
// Try and find a promotion rule that contains both types in question.
val applicableConversion = allPromotions.find(p => p.contains(t1) && p.contains(t2))
val applicableConversion =
HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))

// If found return the widest common type, otherwise None
applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.catalyst.plans

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.types.{ArrayType, DataType, StructField, StructType}

abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
self: PlanType with Product =>
Expand Down Expand Up @@ -123,4 +125,53 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
case other => Nil
}.toSeq
}

protected def generateSchemaString(schema: Seq[Attribute]): String = {
val builder = new StringBuilder
builder.append("root\n")
val prefix = " |"
schema.foreach { attribute =>
val name = attribute.name
val dataType = attribute.dataType
dataType match {
case fields: StructType =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaString(fields, s"$prefix |", builder)
case ArrayType(fields: StructType) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaString(fields, s"$prefix |", builder)
case ArrayType(elementType: DataType) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case _ => builder.append(s"$prefix-- $name: $dataType\n")
}
}

builder.toString()
}

protected def generateSchemaString(
schema: StructType,
prefix: String,
builder: StringBuilder): StringBuilder = {
schema.fields.foreach {
case StructField(name, fields: StructType, _) =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(fields: StructType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(elementType: DataType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case StructField(name, fieldType: DataType, _) =>
builder.append(s"$prefix-- $name: $fieldType\n")
}

builder
}

/** Returns the output schema in the tree format. */
def schemaString: String = generateSchemaString(output)

/** Prints out the schema in the tree format */
def printSchema(): Unit = println(schemaString)
}
Loading