1616 */
1717package org .apache .spark .sql .execution .datasources .csv
1818
19- import org .apache .spark .SparkConf
2019import org .apache .spark .benchmark .Benchmark
21- import org .apache .spark .sql .{Column , Row , SparkSession }
22- import org .apache .spark .sql .catalyst . plans . SQLHelper
20+ import org .apache .spark .sql .{Column , Row }
21+ import org .apache .spark .sql .execution . benchmark . SqlBasedBenchmark
2322import org .apache .spark .sql .functions .lit
2423import org .apache .spark .sql .types ._
2524
2625/**
2726 * Benchmark to measure CSV read/write performance.
28- * To run this:
29- * spark-submit --class <this class> --jars <spark sql test jar>
27+ * To run this benchmark:
28+ * {{{
29+ * 1. without sbt:
30+ * bin/spark-submit --class <this class> --jars <spark core test jar>,
31+ * <spark catalyst test jar> <spark sql test jar>
32+ * 2. build/sbt "sql/test:runMain <this class>"
33+ * 3. generate result:
34+ * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
35+ * Results will be written to "benchmarks/CSVBenchmark-results.txt".
36+ * }}}
3037 */
31- object CSVBenchmarks extends SQLHelper {
32- val conf = new SparkConf ()
33-
34- val spark = SparkSession .builder
35- .master(" local[1]" )
36- .appName(" benchmark-csv-datasource" )
37- .config(conf)
38- .getOrCreate()
38+
39+ object CSVBenchmark extends SqlBasedBenchmark {
3940 import spark .implicits ._
4041
4142 def quotedValuesBenchmark (rowsNum : Int , numIters : Int ): Unit = {
42- val benchmark = new Benchmark (s " Parsing quoted values " , rowsNum)
43+ val benchmark = new Benchmark (s " Parsing quoted values " , rowsNum, output = output )
4344
4445 withTempPath { path =>
4546 val str = (0 until 10000 ).map(i => s """ " $i" """ ).mkString(" ," )
@@ -56,20 +57,13 @@ object CSVBenchmarks extends SQLHelper {
5657 ds.filter((_ : Row ) => true ).count()
5758 }
5859
59- /*
60- Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
61-
62- Parsing quoted values: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
63- --------------------------------------------------------------------------------------------
64- One quoted string 30273 / 30549 0.0 605451.2 1.0X
65- */
6660 benchmark.run()
6761 }
6862 }
6963
7064 def multiColumnsBenchmark (rowsNum : Int ): Unit = {
7165 val colsNum = 1000
72- val benchmark = new Benchmark (s " Wide rows with $colsNum columns " , rowsNum)
66+ val benchmark = new Benchmark (s " Wide rows with $colsNum columns " , rowsNum, output = output )
7367
7468 withTempPath { path =>
7569 val fields = Seq .tabulate(colsNum)(i => StructField (s " col $i" , IntegerType ))
@@ -98,23 +92,14 @@ object CSVBenchmarks extends SQLHelper {
9892 ds.count()
9993 }
10094
101- /*
102- Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
103-
104- Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
105- --------------------------------------------------------------------------------------------
106- Select 1000 columns 81091 / 81692 0.0 81090.7 1.0X
107- Select 100 columns 30003 / 34448 0.0 30003.0 2.7X
108- Select one column 24792 / 24855 0.0 24792.0 3.3X
109- count() 24344 / 24642 0.0 24343.8 3.3X
110- */
11195 benchmark.run()
11296 }
11397 }
11498
11599 def countBenchmark (rowsNum : Int ): Unit = {
116100 val colsNum = 10
117- val benchmark = new Benchmark (s " Count a dataset with $colsNum columns " , rowsNum)
101+ val benchmark =
102+ new Benchmark (s " Count a dataset with $colsNum columns " , rowsNum, output = output)
118103
119104 withTempPath { path =>
120105 val fields = Seq .tabulate(colsNum)(i => StructField (s " col $i" , IntegerType ))
@@ -137,22 +122,15 @@ object CSVBenchmarks extends SQLHelper {
137122 ds.count()
138123 }
139124
140- /*
141- Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
142-
143- Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
144- ---------------------------------------------------------------------------------------------
145- Select 10 columns + count() 12598 / 12740 0.8 1259.8 1.0X
146- Select 1 column + count() 7960 / 8175 1.3 796.0 1.6X
147- count() 2332 / 2386 4.3 233.2 5.4X
148- */
149125 benchmark.run()
150126 }
151127 }
152128
153- def main (args : Array [String ]): Unit = {
154- quotedValuesBenchmark(rowsNum = 50 * 1000 , numIters = 3 )
155- multiColumnsBenchmark(rowsNum = 1000 * 1000 )
156- countBenchmark(10 * 1000 * 1000 )
129+ override def runBenchmarkSuite (mainArgs : Array [String ]): Unit = {
130+ runBenchmark(" Benchmark to measure CSV read/write performance" ) {
131+ quotedValuesBenchmark(rowsNum = 50 * 1000 , numIters = 3 )
132+ multiColumnsBenchmark(rowsNum = 1000 * 1000 )
133+ countBenchmark(10 * 1000 * 1000 )
134+ }
157135 }
158136}
0 commit comments