diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 490c1ce8a7cc..69fca3b6be5a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -581,6 +581,114 @@ Starting from Spark 2.1, persistent datasource tables have per-partition metadat Note that partition information is not gathered by default when creating external datasource tables (those with a `path` option). To sync the partition information in the metastore, you can invoke `MSCK REPAIR TABLE`. +### Bucketing, Sorting and Partitioning + +For file-based data source, it is also possible to bucket and sort or partition the output. +Bucketing and sorting are applicable only to persistent tables: + +
+ +
+{% include_example write_sorting_and_bucketing scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +
+ +
+{% include_example write_sorting_and_bucketing java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +
+ +
+{% include_example write_sorting_and_bucketing python/sql/datasource.py %} +
+ +
+ +{% highlight sql %} + +CREATE TABLE users_bucketed_by_name( + name STRING, + favorite_color STRING, + favorite_numbers array +) USING parquet +CLUSTERED BY(name) INTO 42 BUCKETS; + +{% endhighlight %} + +
+ +
+ +while partitioning can be used with both `save` and `saveAsTable` when using the Dataset APIs. + + +
+ +
+{% include_example write_partitioning scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +
+ +
+{% include_example write_partitioning java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +
+ +
+{% include_example write_partitioning python/sql/datasource.py %} +
+ +
+ +{% highlight sql %} + +CREATE TABLE users_by_favorite_color( + name STRING, + favorite_color STRING, + favorite_numbers array +) USING csv PARTITIONED BY(favorite_color); + +{% endhighlight %} + +
+ +
+ +It is possible to use both partitioning and bucketing for a single table: + +
+ +
+{% include_example write_partition_and_bucket scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +
+ +
+{% include_example write_partition_and_bucket java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +
+ +
+{% include_example write_partition_and_bucket python/sql/datasource.py %} +
+ +
+ +{% highlight sql %} + +CREATE TABLE users_bucketed_and_partitioned( + name STRING, + favorite_color STRING, + favorite_numbers array +) USING parquet +PARTITIONED BY (favorite_color) +CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS; + +{% endhighlight %} + +
+ +
+ +`partitionBy` creates a directory structure as described in the [Partition Discovery](#partition-discovery) section. +Thus, it has limited applicability to columns with high cardinality. In contrast + `bucketBy` distributes +data across a fixed number of buckets and can be used when a number of unique values is unbounded. + ## Parquet Files [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index b66abaed6600..706856b5215e 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -120,6 +120,22 @@ private static void runBasicDataSourceExample(SparkSession spark) { Dataset sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); // $example off:direct_sql$ + // $example on:write_sorting_and_bucketing$ + peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed"); + // $example off:write_sorting_and_bucketing$ + // $example on:write_partitioning$ + usersDF.write().partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet"); + // $example off:write_partitioning$ + // $example on:write_partition_and_bucket$ + peopleDF + .write() + .partitionBy("favorite_color") + .bucketBy(42, "name") + .saveAsTable("people_partitioned_bucketed"); + // $example off:write_partition_and_bucket$ + + spark.sql("DROP TABLE IF EXISTS people_bucketed"); + spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed"); } private static void runBasicParquetExample(SparkSession spark) { diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index e4abb0933345..8777cca66bfe 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -35,15 +35,35 @@ def basic_datasource_example(spark): df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") # $example off:generic_load_save_functions$ + # $example on:write_partitioning$ + df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") + # $example off:write_partitioning$ + + # $example on:write_partition_and_bucket$ + df = spark.read.parquet("examples/src/main/resources/users.parquet") + (df + .write + .partitionBy("favorite_color") + .bucketBy(42, "name") + .saveAsTable("people_partitioned_bucketed")) + # $example off:write_partition_and_bucket$ + # $example on:manual_load_options$ df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") # $example off:manual_load_options$ + # $example on:write_sorting_and_bucketing$ + df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") + # $example off:write_sorting_and_bucketing$ + # $example on:direct_sql$ df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") # $example off:direct_sql$ + spark.sql("DROP TABLE IF EXISTS people_bucketed") + spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed") + def parquet_example(spark): # $example on:basic_parquet_example$ diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index ad74da72bd5e..6ff03bdb2212 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -52,6 +52,22 @@ object SQLDataSourceExample { // $example on:direct_sql$ val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") // $example off:direct_sql$ + // $example on:write_sorting_and_bucketing$ + peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") + // $example off:write_sorting_and_bucketing$ + // $example on:write_partitioning$ + usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") + // $example off:write_partitioning$ + // $example on:write_partition_and_bucket$ + peopleDF + .write + .partitionBy("favorite_color") + .bucketBy(42, "name") + .saveAsTable("people_partitioned_bucketed") + // $example off:write_partition_and_bucket$ + + spark.sql("DROP TABLE IF EXISTS people_bucketed") + spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed") } private def runBasicParquetExample(spark: SparkSession): Unit = {