-
Notifications
You must be signed in to change notification settings - Fork 290
/
Copy pathPartitionedTable.scala
191 lines (151 loc) · 5.44 KB
/
PartitionedTable.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package sql
import java.io.File
import org.apache.spark.sql.SparkSession
//
// A common practice for organizing large amounts of data is to build a
// directory hierarchy that represents the overall structure of the data, and
// shard the data into files at the leaf nodes of this hierarchy. This is
// most commonly done with dates, where one level of the hierarchy is divided
// by year, the next by month, then by day, etc. It is also commonly done with
// locations and categories, and of course these can be combined into a single
// hierarchy.
//
// This practice is so common in big data that it is incorporated into various
// Hadoop and Spark features, to make it easy to create such hierarchies and
// also to efficiently and easily query them. It is now very commonly used
// to create Parquet files, but can easily be used for other file types.
//
// THie example illustrates how to create and update partitioned tables
// through Spark SQL, using static or dynamic partitioning or a mix of both.
//
object PartitionedTable {
case class Fact(year: Integer, month: Integer, id: Integer, cat: Integer)
def main(args: Array[String]) {
// output goes here
val exampleRoot = "/tmp/LearningSpark"
utils.PartitionedTableHierarchy.deleteRecursively(new File(exampleRoot))
val tableRoot = exampleRoot + "/Table"
val spark =
SparkSession.builder()
.appName("SQL-PartitionedTable")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// create some sample data
val ids = 1 to 1200
val facts = ids.map(id => {
val month = id % 12 + 1
val year = 2000 + (month % 12)
val cat = id % 4
Fact(year, month, id, cat)
})
// make it an RDD and convert to a DataFrame
val factsDF = spark.sparkContext.parallelize(facts, 4).toDF()
println("*** Here is some of the sample data")
factsDF.show(20)
//
// Register with a table name for SQL queries
//
factsDF.createOrReplaceTempView("original")
spark.sql(
s"""
| DROP TABLE IF EXISTS partitioned
""".stripMargin)
//
// Create the partitioned table, specifying the columns, the file format (Parquet),
// the partitioning scheme, and where the directory hierarchy and files
// will be located int he file system. Notice that Spark SQL allows you to
// specify the partition columns twice, and doesn't require you to make them
// the last columns.
//
spark.sql(
s"""
| CREATE TABLE partitioned
| (year INTEGER, month INTEGER, id INTEGER, cat INTEGER)
| USING PARQUET
| PARTITIONED BY (year, month)
| LOCATION "$tableRoot"
""".stripMargin)
//
// Now insert the sample data into the partitioned table, relying on
// dynamic partitioning. Important: notice that the partition columns
// must be provided last!
//
spark.sql(
s"""
| INSERT INTO TABLE partitioned
| SELECT id, cat, year, month FROM original
""".stripMargin)
// print the resulting directory hierarchy with the Parquet files
println("*** partitioned table in the file system, after the initial insert")
utils.PartitionedTableHierarchy.printRecursively(new File(tableRoot))
// now we can query the partitioned table
println("*** query summary of partitioned table")
val fromPartitioned = spark.sql(
s"""
| SELECT year, COUNT(*) as count
| FROM partitioned
| GROUP BY year
| ORDER BY year
""".stripMargin)
fromPartitioned.show()
// dynamic partition insert -- no PARTITION clause
spark.sql(
s"""
| INSERT INTO TABLE partitioned
| VALUES
| (1400, 1, 2016, 1),
| (1401, 2, 2017, 3)
""".stripMargin)
// dynamic partition insert -- equivalent form with PARTITION clause
spark.sql(
s"""
| INSERT INTO TABLE partitioned
| PARTITION (year, month)
| VALUES
| (1500, 1, 2016, 2),
| (1501, 2, 2017, 4)
""".stripMargin)
// static partition insert -- fully specify the partition using the
// PARTITION clause
spark.sql(
s"""
| INSERT INTO TABLE partitioned
| PARTITION (year = 2017, month = 7)
| VALUES
| (1600, 1),
| (1601, 2)
""".stripMargin)
// now for the mixed case -- in the PARTITION clause, 'year' is specified
// statically and 'month' is dynamic
spark.sql(
s"""
| INSERT INTO TABLE partitioned
| PARTITION (year = 2017, month)
| VALUES
| (1700, 1, 9),
| (1701, 2, 10)
""".stripMargin)
// check that all these inserted the data we expected
println("*** the additional rows that were inserted")
val afterInserts = spark.sql(
s"""
| SELECT year, month, id, cat
| FROM partitioned
| WHERE year > 2011
| ORDER BY year, month
""".stripMargin)
afterInserts.show()
println("*** partitioned table in the file system, after all additional inserts")
utils.PartitionedTableHierarchy.printRecursively(new File(tableRoot))
println("*** query summary of partitioned table, after additional inserts")
val finalCheck = spark.sql(
s"""
| SELECT year, COUNT(*) as count
| FROM partitioned
| GROUP BY year
| ORDER BY year
""".stripMargin)
finalCheck.show()
}
}