From 32505505ca2428614840cd61cfe7e890b0be851e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Apr 2015 17:06:47 -0700 Subject: [PATCH 1/8] Changes to debug flaky streaming tests. --- dev/run-tests | 14 +++++++------- python/run-tests | 20 ++++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 861d1671182c2..f8622c71dc2a0 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -113,7 +113,7 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_RAT -./dev/check-license +# ./dev/check-license echo "" echo "=========================================================================" @@ -122,7 +122,7 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_SCALA_STYLE -./dev/lint-scala +# ./dev/lint-scala echo "" echo "=========================================================================" @@ -131,7 +131,7 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_PYTHON_STYLE -./dev/lint-python +# ./dev/lint-python echo "" echo "=========================================================================" @@ -185,7 +185,7 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_MIMA -./dev/mima +# ./dev/mima echo "" echo "=========================================================================" @@ -223,7 +223,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # QUESTION: Why doesn't 'yes "q"' work? # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? echo -e "q\n" \ - | build/sbt $SBT_MAVEN_PROFILES_ARGS "${SBT_MAVEN_TEST_ARGS[@]}" \ + | build/sbt $SBT_MAVEN_PROFILES_ARGS "test-only/streaming *WriteAheadLog*" "test-only/streaming *JobGenerator*" \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" fi } @@ -247,8 +247,8 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_SPARKR_UNIT_TESTS if [ $(command -v R) ]; then - ./R/install-dev.sh - ./R/run-tests.sh + #./R/install-dev.sh + #./R/run-tests.sh else echo "Ignoring SparkR tests as R was not found in PATH" fi diff --git a/python/run-tests b/python/run-tests index 88b63b84fdc27..9d372356e0844 100755 --- a/python/run-tests +++ b/python/run-tests @@ -134,10 +134,10 @@ fi echo "Testing with Python version:" $PYSPARK_PYTHON --version -run_core_tests -run_sql_tests -run_mllib_tests -run_ml_tests +# run_core_tests +# run_sql_tests +# run_mllib_tests +# run_ml_tests run_streaming_tests # Try to test with Python 3 @@ -146,10 +146,10 @@ if [ $(which python3.4) ]; then echo "Testing with Python3.4 version:" $PYSPARK_PYTHON --version - run_core_tests - run_sql_tests - run_mllib_tests - run_ml_tests + # run_core_tests + # run_sql_tests + # run_mllib_tests + # run_ml_tests run_streaming_tests fi @@ -159,8 +159,8 @@ if [ $(which pypy) ]; then echo "Testing with PyPy version:" $PYSPARK_PYTHON --version - run_core_tests - run_sql_tests + # run_core_tests + # run_sql_tests run_streaming_tests fi From b29047c63df12fbb5da0378f864e25fa96b68bed Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Apr 2015 19:19:23 -0700 Subject: [PATCH 2/8] tweak --- dev/run-tests | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index f8622c71dc2a0..bba6a3d87395d 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -173,7 +173,7 @@ CURRENT_BLOCK=$BLOCK_BUILD build/mvn $HIVE_BUILD_ARGS clean package -DskipTests else echo -e "q\n" \ - | build/sbt $HIVE_BUILD_ARGS package assembly/assembly streaming-kafka-assembly/assembly \ + | build/sbt package assembly/assembly streaming-kafka-assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" fi } @@ -222,8 +222,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # "${SBT_MAVEN_TEST_ARGS[@]}" is cool because it's an array. # QUESTION: Why doesn't 'yes "q"' work? # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? - echo -e "q\n" \ - | build/sbt $SBT_MAVEN_PROFILES_ARGS "test-only/streaming *WriteAheadLog*" "test-only/streaming *JobGenerator*" \ + build/sbt "test-only/streaming *WriteAheadLog*" "test-only/streaming *JobGenerator*" \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" fi } From af817b7a19de5f339b0f32c4bf4bb5024b1a3c3e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Apr 2015 20:10:32 -0700 Subject: [PATCH 3/8] brain-fart fixed. --- dev/run-tests | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index bba6a3d87395d..ce70a46f2ab78 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -222,8 +222,10 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # "${SBT_MAVEN_TEST_ARGS[@]}" is cool because it's an array. # QUESTION: Why doesn't 'yes "q"' work? # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? - build/sbt "test-only/streaming *WriteAheadLog*" "test-only/streaming *JobGenerator*" \ - | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" + + echo "Running this" + + build/sbt "streaming/test-only *WriteAheadLog*" "streaming/test-only *JobGenerator*" fi } From c387f4c383061077368f90147c13e6ce367372af Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Apr 2015 20:52:16 -0700 Subject: [PATCH 4/8] Small fix --- dev/run-tests | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/run-tests b/dev/run-tests index ce70a46f2ab78..c97ddeba41f54 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -250,6 +250,7 @@ CURRENT_BLOCK=$BLOCK_SPARKR_UNIT_TESTS if [ $(command -v R) ]; then #./R/install-dev.sh #./R/run-tests.sh + echo "" else echo "Ignoring SparkR tests as R was not found in PATH" fi From 0758b34e64c06ca6c6b9dbde97139c21d56e1787 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 1 May 2015 12:14:41 -0700 Subject: [PATCH 5/8] only hadoop 1 profile. --- dev/run-tests | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index c97ddeba41f54..1b88cf40b158b 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -173,7 +173,7 @@ CURRENT_BLOCK=$BLOCK_BUILD build/mvn $HIVE_BUILD_ARGS clean package -DskipTests else echo -e "q\n" \ - | build/sbt package assembly/assembly streaming-kafka-assembly/assembly \ + | build/sbt -Dhadoop.version=1.0.4 package assembly/assembly streaming-kafka-assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" fi } @@ -225,7 +225,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS echo "Running this" - build/sbt "streaming/test-only *WriteAheadLog*" "streaming/test-only *JobGenerator*" + build/sbt -Dhadoop.version=1.0.4 "streaming/test-only *WriteAheadLog*" "streaming/test-only *JobGenerator*" fi } From 8ca59f2e9b4e2b9238ef105d9247d5322ed3a327 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 1 May 2015 12:49:45 -0700 Subject: [PATCH 6/8] debug statements. --- .../spark/streaming/util/FileBasedWriteAheadLog.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 9985fedc35141..314b7bd4c3228 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -200,9 +200,14 @@ private[streaming] class FileBasedWriteAheadLog( private def initializeOrRecover(): Unit = synchronized { val logDirectoryPath = new Path(logDirectory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - + println(s"Path $logDirectoryPath exists = " + fileSystem.exists(logDirectoryPath)) + if (fileSystem.exists(logDirectoryPath)) { + println(s"Path $logDirectoryPath is dire = " + fileSystem.getFileStatus(logDirectoryPath).isDir) + } if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { + println("Inside condition") val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) + println("BEfore clear") pastLogs.clear() pastLogs ++= logFileInfo logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory") From 6e397f3d7e6578cf25fa9ee132702d4328fa54c6 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 1 May 2015 13:18:01 -0700 Subject: [PATCH 7/8] More debugging. --- .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 314b7bd4c3228..6ce3f5e8996c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.streaming.util +import java.io.File import java.nio.ByteBuffer import java.util.{Iterator => JIterator} @@ -200,9 +201,9 @@ private[streaming] class FileBasedWriteAheadLog( private def initializeOrRecover(): Unit = synchronized { val logDirectoryPath = new Path(logDirectory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - println(s"Path $logDirectoryPath exists = " + fileSystem.exists(logDirectoryPath)) + println(s"Path $logDirectoryPath exists = " + fileSystem.exists(logDirectoryPath) + ", " + new File(logDirectory).exists()) if (fileSystem.exists(logDirectoryPath)) { - println(s"Path $logDirectoryPath is dire = " + fileSystem.getFileStatus(logDirectoryPath).isDir) + println(s"Path $logDirectoryPath is dire = " + fileSystem.getFileStatus(logDirectoryPath).isDir + ", " + new File(logDirectory).isDirectory) } if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { println("Inside condition") From 22875eb2490d4d835d0437331e11f03501064b90 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 1 May 2015 15:45:12 -0700 Subject: [PATCH 8/8] Test fix --- .../streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index ebdf418f4ab6a..f3eebeddcc311 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -16,7 +16,9 @@ */ package org.apache.spark.streaming.rdd +import java.io.File import java.nio.ByteBuffer +import java.util.UUID import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -27,6 +29,7 @@ import org.apache.spark._ import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.util._ +import org.apache.spark.util.Utils /** * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]]. @@ -107,10 +110,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for // writing log data. However, the directory is not needed if data needs to be read, hence // a dummy path is provided to satisfy the method parameter requirements. - // FileBasedWriteAheadLog will not create any file or directory at that path. - val dummyDirectory = FileUtils.getTempDirectoryPath() + // FileBasedWriteAheadLog will not create any file or directory at that path. Also, + // this dummy directory should not already exist otherwise the WAL will try to recover + // past events from the directory and throw errors. + val nonExistentDirectory = new File( + FileUtils.getTempDirectory(), UUID.randomUUID().toString).getAbsolutePath writeAheadLog = WriteAheadLogUtils.createLogForReceiver( - SparkEnv.get.conf, dummyDirectory, hadoopConf) + SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle) } catch { case NonFatal(e) =>