Skip to content

Commit 394d8cb

Browse files
andrewor14pwendell
authored andcommitted
Add tests for FileLogger, EventLoggingListener, and ReplayListenerBus
Modifications to Spark core are limited to exposing functionality to test files + minor style fixes. (728 / 769 lines are from tests) Author: Andrew Or <andrewor14@gmail.com> Closes #591 from andrewor14/event-log-tests and squashes the following commits: 2883837 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests c3afcea [Andrew Or] Compromise 2d5daf8 [Andrew Or] Use temp directory provided by the OS rather than /tmp 2b52151 [Andrew Or] Remove unnecessary file delete + add a comment 62010fd [Andrew Or] More cleanup (renaming variables, updating comments etc) ad2beff [Andrew Or] Clean up EventLoggingListenerSuite + modify a few comments 862e752 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests e0ba2f8 [Andrew Or] Fix test failures caused by race condition in processing/mutating events b990453 [Andrew Or] ReplayListenerBus suite - tests do not all pass yet ab66a84 [Andrew Or] Tests for FileLogger + delete file after tests 187bb25 [Andrew Or] Formatting and renaming variables 769336f [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests 5d38ffe [Andrew Or] Clean up EventLoggingListenerSuite + add comments e12f4b1 [Andrew Or] Preliminary tests for EventLoggingListener (need major cleanup)
1 parent 40cf6d3 commit 394d8cb

File tree

8 files changed

+791
-36
lines changed

8 files changed

+791
-36
lines changed

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
package org.apache.spark.scheduler
1919

2020
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
2122

2223
import org.apache.hadoop.conf.Configuration
2324
import org.apache.hadoop.fs.{FileSystem, Path}
2425
import org.apache.hadoop.fs.permission.FsPermission
26+
import org.json4s.JsonAST.JValue
2527
import org.json4s.jackson.JsonMethods._
2628

2729
import org.apache.spark.{Logging, SparkConf, SparkContext}
30+
import org.apache.spark.deploy.SparkHadoopUtil
2831
import org.apache.spark.io.CompressionCodec
2932
import org.apache.spark.util.{FileLogger, JsonProtocol}
3033

@@ -40,31 +43,36 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
4043
*/
4144
private[spark] class EventLoggingListener(
4245
appName: String,
43-
conf: SparkConf,
44-
hadoopConfiguration: Configuration)
46+
sparkConf: SparkConf,
47+
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
4548
extends SparkListener with Logging {
4649

4750
import EventLoggingListener._
4851

49-
private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
50-
private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
51-
private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
52-
private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
52+
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
53+
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
54+
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
55+
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
56+
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
5357
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
5458
val logDir = logBaseDir + "/" + name
5559

56-
private val logger =
57-
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
58-
shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
60+
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
61+
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
62+
63+
// For testing. Keep track of all JSON serialized events that have been logged.
64+
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
5965

6066
/**
6167
* Begin logging events.
6268
* If compression is used, log a file that indicates which compression library is used.
6369
*/
6470
def start() {
71+
logger.start()
6572
logInfo("Logging events to %s".format(logDir))
6673
if (shouldCompress) {
67-
val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
74+
val codec =
75+
sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
6876
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
6977
}
7078
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
@@ -73,11 +81,14 @@ private[spark] class EventLoggingListener(
7381

7482
/** Log the event as JSON. */
7583
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
76-
val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
77-
logger.logLine(eventJson)
84+
val eventJson = JsonProtocol.sparkEventToJson(event)
85+
logger.logLine(compact(render(eventJson)))
7886
if (flushLogger) {
7987
logger.flush()
8088
}
89+
if (testing) {
90+
loggedEvents += eventJson
91+
}
8192
}
8293

8394
// Events that do not trigger a flush
@@ -121,13 +132,12 @@ private[spark] class EventLoggingListener(
121132
}
122133

123134
private[spark] object EventLoggingListener extends Logging {
135+
val DEFAULT_LOG_DIR = "/tmp/spark-events"
124136
val LOG_PREFIX = "EVENT_LOG_"
125137
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
126138
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
127139
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
128-
val LOG_FILE_PERMISSIONS: FsPermission =
129-
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
130-
140+
val LOG_FILE_PERMISSIONS = FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
131141

132142
// A cache for compression codecs to avoid creating the same codec many times
133143
private val codecMap = new mutable.HashMap[String, CompressionCodec]

core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private[spark] trait SparkListenerBus {
3737
* Post an event to all attached listeners. This does nothing if the event is
3838
* SparkListenerShutdown.
3939
*/
40-
protected def postToAll(event: SparkListenerEvent) {
40+
def postToAll(event: SparkListenerEvent) {
4141
event match {
4242
case stageSubmitted: SparkListenerStageSubmitted =>
4343
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))

core/src/main/scala/org/apache/spark/util/FileLogger.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.util
1919

20-
import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
20+
import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter}
2121
import java.net.URI
2222
import java.text.SimpleDateFormat
2323
import java.util.Date
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
2727
import org.apache.hadoop.fs.permission.FsPermission
2828

2929
import org.apache.spark.{Logging, SparkConf}
30+
import org.apache.spark.deploy.SparkHadoopUtil
3031
import org.apache.spark.io.CompressionCodec
3132

3233
/**
@@ -39,8 +40,8 @@ import org.apache.spark.io.CompressionCodec
3940
*/
4041
private[spark] class FileLogger(
4142
logDir: String,
42-
conf: SparkConf,
43-
hadoopConfiguration: Configuration,
43+
sparkConf: SparkConf,
44+
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
4445
outputBufferSize: Int = 8 * 1024, // 8 KB
4546
compress: Boolean = false,
4647
overwrite: Boolean = true,
@@ -55,14 +56,19 @@ private[spark] class FileLogger(
5556
var fileIndex = 0
5657

5758
// Only used if compression is enabled
58-
private lazy val compressionCodec = CompressionCodec.createCodec(conf)
59+
private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf)
5960

6061
// Only defined if the file system scheme is not local
6162
private var hadoopDataStream: Option[FSDataOutputStream] = None
6263

6364
private var writer: Option[PrintWriter] = None
6465

65-
createLogDir()
66+
/**
67+
* Start this logger by creating the logging directory.
68+
*/
69+
def start() {
70+
createLogDir()
71+
}
6672

6773
/**
6874
* Create a logging directory with the given path.
@@ -83,7 +89,7 @@ private[spark] class FileLogger(
8389
}
8490
if (dirPermissions.isDefined) {
8591
val fsStatus = fileSystem.getFileStatus(path)
86-
if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) {
92+
if (fsStatus.getPermission.toShort != dirPermissions.get.toShort) {
8793
fileSystem.setPermission(path, dirPermissions.get)
8894
}
8995
}
@@ -92,14 +98,14 @@ private[spark] class FileLogger(
9298
/**
9399
* Create a new writer for the file identified by the given path.
94100
* If the permissions are not passed in, it will default to use the permissions
95-
* (dirpermissions) used when class was instantiated.
101+
* (dirPermissions) used when class was instantiated.
96102
*/
97103
private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
98104
val logPath = logDir + "/" + fileName
99105
val uri = new URI(logPath)
100-
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
101-
val isDefaultLocal = (defaultFs == null || defaultFs == "file")
102106
val path = new Path(logPath)
107+
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
108+
val isDefaultLocal = defaultFs == null || defaultFs == "file"
103109

104110
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
105111
* Therefore, for local files, use FileOutputStream instead. */
@@ -112,7 +118,7 @@ private[spark] class FileLogger(
112118
hadoopDataStream.get
113119
}
114120

115-
perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)}
121+
perms.orElse(dirPermissions).foreach { p => fileSystem.setPermission(path, p) }
116122
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
117123
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
118124
new PrintWriter(cstream)
@@ -127,7 +133,7 @@ private[spark] class FileLogger(
127133
val writeInfo = if (!withTime) {
128134
msg
129135
} else {
130-
val date = new Date(System.currentTimeMillis())
136+
val date = new Date(System.currentTimeMillis)
131137
dateFormat.get.format(date) + ": " + msg
132138
}
133139
writer.foreach(_.print(writeInfo))

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -646,11 +646,11 @@ private[spark] object JsonProtocol {
646646
}
647647

648648
def propertiesFromJson(json: JValue): Properties = {
649-
val properties = new Properties()
650-
if (json != JNothing) {
651-
mapFromJson(json).map { case (k, v) => properties.setProperty(k, v) }
652-
}
653-
properties
649+
Utils.jsonOption(json).map { value =>
650+
val properties = new Properties
651+
mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
652+
properties
653+
}.getOrElse(null)
654654
}
655655

656656
def UUIDFromJson(json: JValue): UUID = {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,15 +1062,25 @@ private[spark] object Utils extends Logging {
10621062
}
10631063

10641064
/**
1065-
* return true if this is Windows.
1065+
* Return the absolute path of a file in the given directory.
10661066
*/
1067-
def isWindows = Option(System.getProperty("os.name")).
1068-
map(_.startsWith("Windows")).getOrElse(false)
1067+
def getFilePath(dir: File, fileName: String): Path = {
1068+
assert(dir.isDirectory)
1069+
val path = new File(dir, fileName).getAbsolutePath
1070+
new Path(path)
1071+
}
1072+
1073+
/**
1074+
* Return true if this is Windows.
1075+
*/
1076+
def isWindows = {
1077+
Option(System.getProperty("os.name")).exists(_.startsWith("Windows"))
1078+
}
10691079

10701080
/**
10711081
* Indicates whether Spark is currently running unit tests.
10721082
*/
1073-
private[spark] def isTesting = {
1083+
def isTesting = {
10741084
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
10751085
}
10761086
}

0 commit comments

Comments
 (0)