From 0c8955872ac46138dbd7ef7088e63a9eb1c4e8d1 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Tue, 18 Nov 2014 15:55:09 -0500 Subject: [PATCH 01/12] Add example that reads a local file, writes to a DFS path provided by the user, reads the file back from the DFS, and compares word counts on the local and DFS versions. Useful for verifying DFS correctness. --- .../spark/examples/DFSReadWriteTest.scala | 135 ++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala new file mode 100644 index 0000000000000..1b13ce7a0f561 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext._ +import scala.io.Source._ +import java.io.File + +object DFSReadWriteTest { + + private var localFilePath: File = new File(".") + private var dfsDirPath: File = new File(".") + + private val NPARAMS = 2 + + private def readFile(filename: String): List[String] = { + val lineIter: Iterator[String] = fromFile(filename).getLines() + val lineList: List[String] = lineIter.toList + lineList + } + + private def printUsage() { + val usage: String = "DFS Read-Write Test\n" + + "\n" + + "Usage: localFile dfsDir\n" + + "\n" + + "localFile - (string) local file to use in test\n" + + "dfsDir - (string) DFS directory for read/write tests\n" + + println(usage) + } + + private def parseArgs(args: Array[String]) { + if(args.length != NPARAMS) { + printUsage() + System.exit(1) + } + + var i = 0 + + localFilePath = new File(args(i)) + if(! localFilePath.exists()) { + System.err.println("Given path (" + args(i) + ") does not exist.\n") + printUsage() + System.exit(1) + } + + if(! localFilePath.isFile()) { + System.err.println("Given path (" + args(i) + ") is not a file.\n") + printUsage() + System.exit(1) + } + + i += 1 + dfsDirPath = new File(args(i)) + if(! dfsDirPath.exists()) { + System.err.println("Given path (" + args(i) + ") does not exist.\n") + printUsage() + System.exit(1) + } + + if(! dfsDirPath.isDirectory()) { + System.err.println("Given path (" + args(i) + ") is not a directory.\n") + printUsage() + System.exit(1) + } + } + + def runLocalWordCount(fileContents: List[String]) : Int = { + fileContents.flatMap { + ln => ln.split(" ") }.flatMap { + ln => ln.split("\t") }.filter { + ln => ln.size > 0 }.groupBy { + w => w }.mapValues { + lst => lst.size }.values.sum + } + + def main(args: Array[String]) { + parseArgs(args) + + println("Performing local word count") + val fileContents = readFile(localFilePath.toString()) + val localWordCount = runLocalWordCount(fileContents) + + println("Creating SparkConf") + val conf = new SparkConf().setAppName("DFS Read Write Test") + + println("Creating SparkContext") + val sc = new SparkContext(conf) + + println("Writing local file to DFS") + val dfsFilename = dfsDirPath.toString() + "/dfs_read_write_test" + val fileRDD = sc.parallelize(fileContents) + fileRDD.saveAsTextFile(dfsFilename) + + println("Reading file from DFS and running Word Count") + val readFileRDD = sc.textFile(dfsFilename) + + val dfsWordCount = readFileRDD.flatMap { + ln => ln.split(" ") }.flatMap { + ln => ln.split("\t") }.filter { + ln => ln.size > 0 }.map { + w => (w, 1) }.countByKey().values.sum + + sc.stop() + + if(localWordCount == dfsWordCount) + { + println("Success! Local Word Count (" + localWordCount + + ") and DFS Word Count (" + dfsWordCount + ") agree.") + } + else + { + println("Failure! Local Word Count (" + localWordCount + + ") and DFS Word Count (" + dfsWordCount + ") disagree.") + } + + } +} From a931d70f80dd4e672e55212712dfb19bc2c3f151 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:11:37 -0500 Subject: [PATCH 02/12] Fix import order --- .../scala/org/apache/spark/examples/DFSReadWriteTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 1b13ce7a0f561..577387f715b6f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -17,10 +17,10 @@ package org.apache.spark.examples +import java.io.File +import scala.io.Source._ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ -import scala.io.Source._ -import java.io.File object DFSReadWriteTest { From 1b314f0fefab3f5a3383997ebec047419c7b2542 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:14:49 -0500 Subject: [PATCH 03/12] Add scaladoc --- .../org/apache/spark/examples/DFSReadWriteTest.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 577387f715b6f..16c54b8ae9f2b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -22,6 +22,17 @@ import scala.io.Source._ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ +/** + * Simple test for reading and writing to a distributed + * file system. This example does the following: + * + * 1. Reads local file + * 2. Computes word count on local file + * 3. Writes local file to a DFS + * 4. Reads the file back from the DFS + * 5. Computes word count on the file using Spark + * 6. Compares the word count results + */ object DFSReadWriteTest { private var localFilePath: File = new File(".") From df59b6597b4ca953dbb685e69c5f1f569bdef71b Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:15:55 -0500 Subject: [PATCH 04/12] Fix if statements --- .../apache/spark/examples/DFSReadWriteTest.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 16c54b8ae9f2b..9c9a71a7e5c9a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -66,13 +66,13 @@ object DFSReadWriteTest { var i = 0 localFilePath = new File(args(i)) - if(! localFilePath.exists()) { + if(!localFilePath.exists) { System.err.println("Given path (" + args(i) + ") does not exist.\n") printUsage() System.exit(1) } - if(! localFilePath.isFile()) { + if(!localFilePath.isFile) { System.err.println("Given path (" + args(i) + ") is not a file.\n") printUsage() System.exit(1) @@ -80,13 +80,13 @@ object DFSReadWriteTest { i += 1 dfsDirPath = new File(args(i)) - if(! dfsDirPath.exists()) { + if(!dfsDirPath.exists) { System.err.println("Given path (" + args(i) + ") does not exist.\n") printUsage() System.exit(1) } - if(! dfsDirPath.isDirectory()) { + if(dfsDirPath.isDirectory) { System.err.println("Given path (" + args(i) + ") is not a directory.\n") printUsage() System.exit(1) @@ -131,13 +131,11 @@ object DFSReadWriteTest { sc.stop() - if(localWordCount == dfsWordCount) - { + if(localWordCount == dfsWordCount) { println("Success! Local Word Count (" + localWordCount + ") and DFS Word Count (" + dfsWordCount + ") agree.") } - else - { + else { println("Failure! Local Word Count (" + localWordCount + ") and DFS Word Count (" + dfsWordCount + ") disagree.") } From 94a469136b9c0f42d2e5c305b17721e2bc332ea3 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:16:17 -0500 Subject: [PATCH 05/12] Fix space --- .../main/scala/org/apache/spark/examples/DFSReadWriteTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 9c9a71a7e5c9a..e80ccd10633c5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -93,7 +93,7 @@ object DFSReadWriteTest { } } - def runLocalWordCount(fileContents: List[String]) : Int = { + def runLocalWordCount(fileContents: List[String]): Int = { fileContents.flatMap { ln => ln.split(" ") }.flatMap { ln => ln.split("\t") }.filter { From 44415b9bf07850de019a7d1e2d1bc553acd7c88e Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:28:47 -0500 Subject: [PATCH 06/12] Fix local wc style --- .../apache/spark/examples/DFSReadWriteTest.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index e80ccd10633c5..9bc9c597486d4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -94,12 +94,13 @@ object DFSReadWriteTest { } def runLocalWordCount(fileContents: List[String]): Int = { - fileContents.flatMap { - ln => ln.split(" ") }.flatMap { - ln => ln.split("\t") }.filter { - ln => ln.size > 0 }.groupBy { - w => w }.mapValues { - lst => lst.size }.values.sum + fileContents.flatMap(_.split(" ")) + .flatMap(_.split("\t")) + .filter(_.size > 0) + .groupBy(w => w) + .mapValues(_.size) + .values + .sum } def main(args: Array[String]) { From b9edf124eb3ba9b17d70e77e610bedd56376317b Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:32:15 -0500 Subject: [PATCH 07/12] Fix spark wc style --- .../apache/spark/examples/DFSReadWriteTest.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 9bc9c597486d4..a43560325285c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -124,11 +124,14 @@ object DFSReadWriteTest { println("Reading file from DFS and running Word Count") val readFileRDD = sc.textFile(dfsFilename) - val dfsWordCount = readFileRDD.flatMap { - ln => ln.split(" ") }.flatMap { - ln => ln.split("\t") }.filter { - ln => ln.size > 0 }.map { - w => (w, 1) }.countByKey().values.sum + val dfsWordCount = readFileRDD + .flatMap(_.split(" ")) + .flatMap(_.split("\t")) + .filter(_.size > 0) + .map(w => (w, 1)) + .countByKey() + .values + .sum sc.stop() From f74c1603e8260f6c29e338f530e65291e7a5a4ff Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:32:35 -0500 Subject: [PATCH 08/12] Fix else statement style --- .../scala/org/apache/spark/examples/DFSReadWriteTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index a43560325285c..322213e1a48a7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -138,8 +138,7 @@ object DFSReadWriteTest { if(localWordCount == dfsWordCount) { println("Success! Local Word Count (" + localWordCount + ") and DFS Word Count (" + dfsWordCount + ") agree.") - } - else { + } else { println("Failure! Local Word Count (" + localWordCount + ") and DFS Word Count (" + dfsWordCount + ") disagree.") } From 7d9a8df689c1f31e7de85171c6ca68db83ebfee6 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:34:07 -0500 Subject: [PATCH 09/12] Fix string style --- .../scala/org/apache/spark/examples/DFSReadWriteTest.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 322213e1a48a7..57fe93f4d2493 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -136,11 +136,9 @@ object DFSReadWriteTest { sc.stop() if(localWordCount == dfsWordCount) { - println("Success! Local Word Count (" + localWordCount + - ") and DFS Word Count (" + dfsWordCount + ") agree.") + println(s"Success! Local Word Count ($localWordCount) and DFS Word Count ($dfsWordCount) agree.") } else { - println("Failure! Local Word Count (" + localWordCount + - ") and DFS Word Count (" + dfsWordCount + ") disagree.") + println(s"Failure! Local Word Count ($localWordCount) and DFS Word Count ($dfsWordCount) disagree.") } } From 07c613245d4c7614001569eea514585fb92bdab3 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:39:49 -0500 Subject: [PATCH 10/12] Fix string style --- .../scala/org/apache/spark/examples/DFSReadWriteTest.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 57fe93f4d2493..8c143a8e2d760 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -136,9 +136,11 @@ object DFSReadWriteTest { sc.stop() if(localWordCount == dfsWordCount) { - println(s"Success! Local Word Count ($localWordCount) and DFS Word Count ($dfsWordCount) agree.") + println(s"Success! Local Word Count ($localWordCount) " + + "and DFS Word Count ($dfsWordCount) agree.") } else { - println(s"Failure! Local Word Count ($localWordCount) and DFS Word Count ($dfsWordCount) disagree.") + println(s"Failure! Local Word Count ($localWordCount) " + + "and DFS Word Count ($dfsWordCount) disagree.") } } From b0ef9ea387e031deddbe1ffda833d98eb5f42e08 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 09:54:44 -0500 Subject: [PATCH 11/12] Fix string style --- .../apache/spark/examples/DFSReadWriteTest.scala | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 8c143a8e2d760..1ee4fad0685c9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -80,17 +80,6 @@ object DFSReadWriteTest { i += 1 dfsDirPath = new File(args(i)) - if(!dfsDirPath.exists) { - System.err.println("Given path (" + args(i) + ") does not exist.\n") - printUsage() - System.exit(1) - } - - if(dfsDirPath.isDirectory) { - System.err.println("Given path (" + args(i) + ") is not a directory.\n") - printUsage() - System.exit(1) - } } def runLocalWordCount(fileContents: List[String]): Int = { @@ -137,10 +126,10 @@ object DFSReadWriteTest { if(localWordCount == dfsWordCount) { println(s"Success! Local Word Count ($localWordCount) " + - "and DFS Word Count ($dfsWordCount) agree.") + s"and DFS Word Count ($dfsWordCount) agree.") } else { println(s"Failure! Local Word Count ($localWordCount) " + - "and DFS Word Count ($dfsWordCount) disagree.") + s"and DFS Word Count ($dfsWordCount) disagree.") } } From af8ccb785098f3c3a99f9a9d9675abbe989401a0 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Thu, 20 Nov 2014 13:02:48 -0500 Subject: [PATCH 12/12] Don't use java.io.File since DFS may not be POSIX-compatible --- .../scala/org/apache/spark/examples/DFSReadWriteTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 1ee4fad0685c9..b693fe0c96ecc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -36,7 +36,7 @@ import org.apache.spark.SparkContext._ object DFSReadWriteTest { private var localFilePath: File = new File(".") - private var dfsDirPath: File = new File(".") + private var dfsDirPath: String = "" private val NPARAMS = 2 @@ -79,7 +79,7 @@ object DFSReadWriteTest { } i += 1 - dfsDirPath = new File(args(i)) + dfsDirPath = args(i) } def runLocalWordCount(fileContents: List[String]): Int = { @@ -106,7 +106,7 @@ object DFSReadWriteTest { val sc = new SparkContext(conf) println("Writing local file to DFS") - val dfsFilename = dfsDirPath.toString() + "/dfs_read_write_test" + val dfsFilename = dfsDirPath + "/dfs_read_write_test" val fileRDD = sc.parallelize(fileContents) fileRDD.saveAsTextFile(dfsFilename)