Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Unparsed}

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}

import org.apache.spark.internal.Logging
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -138,7 +140,8 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")

val totalLength = files.map { _.length }.sum
val fileLengths: Seq[Long] = files.map(Utils.getFileLength(_, worker.conf))
val totalLength = fileLengths.sum
val offset = offsetOption.getOrElse(totalLength - byteLength)
val startIndex = {
if (offset < 0) {
Expand All @@ -151,7 +154,7 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
}
val endIndex = math.min(startIndex + byteLength, totalLength)
logDebug(s"Getting log from $startIndex to $endIndex")
val logText = Utils.offsetBytes(files, startIndex, endIndex)
val logText = Utils.offsetBytes(files, fileLengths, startIndex, endIndex)
logDebug(s"Got log of length ${logText.length} bytes")
(logText, startIndex, endIndex, totalLength)
} catch {
Expand Down
80 changes: 71 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.nio.file.Files
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.zip.GZIPInputStream
import javax.net.ssl.HttpsURLConnection

import scala.annotation.tailrec
Expand All @@ -38,8 +39,10 @@ import scala.reflect.ClassTag
import scala.util.Try
import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.io.{ByteStreams, Files => GFiles}
import com.google.common.net.InetAddresses
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
Expand All @@ -55,6 +58,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.util.logging.RollingFileAppender

/** CallSite represents a place in user code. It can have a short and a long form. */
private[spark] case class CallSite(shortForm: String, longForm: String)
Expand Down Expand Up @@ -1448,14 +1452,72 @@ private[spark] object Utils extends Logging {
CallSite(shortForm, longForm)
}

private val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
"spark.worker.ui.compressedLogFileLengthCacheSize"
private val DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = 100
private var compressedLogFileLengthCache: LoadingCache[String, java.lang.Long] = null
private def getCompressedLogFileLengthCache(
sparkConf: SparkConf): LoadingCache[String, java.lang.Long] = this.synchronized {
if (compressedLogFileLengthCache == null) {
val compressedLogFileLengthCacheSize = sparkConf.getInt(
UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF,
DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE)
compressedLogFileLengthCache = CacheBuilder.newBuilder()
.maximumSize(compressedLogFileLengthCacheSize)
.build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
override def load(path: String): java.lang.Long = {
Utils.getCompressedFileLength(new File(path))
}
})
}
compressedLogFileLengthCache
}

/**
* Return the file length, if the file is compressed it returns the uncompressed file length.
* It also caches the uncompressed file size to avoid repeated decompression. The cache size is
* read from workerConf.
*/
def getFileLength(file: File, workConf: SparkConf): Long = {
if (file.getName.endsWith(".gz")) {
getCompressedLogFileLengthCache(workConf).get(file.getAbsolutePath)
} else {
file.length
}
}

/** Return uncompressed file length of a compressed file. */
private def getCompressedFileLength(file: File): Long = {
try {
// Uncompress .gz file to determine file size.
var fileSize = 0L
val gzInputStream = new GZIPInputStream(new FileInputStream(file))
val bufSize = 1024
val buf = new Array[Byte](bufSize)
var numBytes = IOUtils.read(gzInputStream, buf)
while (numBytes > 0) {
fileSize += numBytes
numBytes = IOUtils.read(gzInputStream, buf)
}
fileSize
} catch {
case e: Throwable =>
logError(s"Cannot get file length of ${file}", e)
throw e
}
}

/** Return a string containing part of a file from byte 'start' to 'end'. */
def offsetBytes(path: String, start: Long, end: Long): String = {
def offsetBytes(path: String, length: Long, start: Long, end: Long): String = {
val file = new File(path)
val length = file.length()
val effectiveEnd = math.min(length, end)
val effectiveStart = math.max(0, start)
val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
val stream = new FileInputStream(file)
val stream = if (path.endsWith(".gz")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use GZIP_LOG_SUFFIX?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, Utils.scala doesn't depend on FileAppender. It does use string literal in other places. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L463

new GZIPInputStream(new FileInputStream(file))
} else {
new FileInputStream(file)
}

try {
ByteStreams.skipFully(stream, effectiveStart)
Expand All @@ -1471,16 +1533,16 @@ private[spark] object Utils extends Logging {
* and `endIndex` is based on the cumulative size of all the files take in
* the given order. See figure below for more details.
*/
def offsetBytes(files: Seq[File], start: Long, end: Long): String = {
val fileLengths = files.map { _.length }
def offsetBytes(files: Seq[File], fileLengths: Seq[Long], start: Long, end: Long): String = {
assert(files.length == fileLengths.length)
val startIndex = math.max(start, 0)
val endIndex = math.min(end, fileLengths.sum)
val fileToLength = files.zip(fileLengths).toMap
logDebug("Log files: \n" + fileToLength.mkString("\n"))

val stringBuffer = new StringBuffer((endIndex - startIndex).toInt)
var sum = 0L
for (file <- files) {
files.zip(fileLengths).foreach { case (file, fileLength) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this. We should assert that the number of files and lengths are same. Zip can silently drop stuff if they dont match

val startIndexOfFile = sum
val endIndexOfFile = sum + fileToLength(file)
logDebug(s"Processing file $file, " +
Expand All @@ -1499,19 +1561,19 @@ private[spark] object Utils extends Logging {

if (startIndex <= startIndexOfFile && endIndex >= endIndexOfFile) {
// Case C: read the whole file
stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, fileToLength(file)))
stringBuffer.append(offsetBytes(file.getAbsolutePath, fileLength, 0, fileToLength(file)))
} else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) {
// Case A and B: read from [start of required range] to [end of file / end of range]
val effectiveStartIndex = startIndex - startIndexOfFile
val effectiveEndIndex = math.min(endIndex - startIndexOfFile, fileToLength(file))
stringBuffer.append(Utils.offsetBytes(
file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
file.getAbsolutePath, fileLength, effectiveStartIndex, effectiveEndIndex))
} else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) {
// Case D: read from [start of file] to [end of require range]
val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0)
val effectiveEndIndex = endIndex - startIndexOfFile
stringBuffer.append(Utils.offsetBytes(
file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
file.getAbsolutePath, fileLength, effectiveStartIndex, effectiveEndIndex))
}
sum += fileToLength(file)
logDebug(s"After processing file $file, string built is ${stringBuffer.toString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.util.logging

import java.io.{File, FileFilter, InputStream}
import java.io._
import java.util.zip.GZIPOutputStream

import com.google.common.io.Files
import org.apache.commons.io.IOUtils

import org.apache.spark.SparkConf

Expand All @@ -45,6 +47,7 @@ private[spark] class RollingFileAppender(
import RollingFileAppender._

private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
private val enableCompression = conf.getBoolean(ENABLE_COMPRESSION, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enable this by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to existing behavior.


/** Stop the appender */
override def stop() {
Expand Down Expand Up @@ -76,15 +79,42 @@ private[spark] class RollingFileAppender(
}
}

// Roll the log file and compress if enableCompression is true.
private def rotateFile(activeFile: File, rolloverFile: File): Unit = {
if (enableCompression) {
val gzFile = new File(rolloverFile.getAbsolutePath + GZIP_LOG_SUFFIX)
var gzOutputStream: GZIPOutputStream = null
var inputStream: InputStream = null
try {
inputStream = new FileInputStream(activeFile)
gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
IOUtils.copy(inputStream, gzOutputStream)
inputStream.close()
gzOutputStream.close()
activeFile.delete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is a good idea to delete the activeFile before closing the inputStream? I am not sure this is the right thing to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact the docs of IOUtils.closeQuietly says that it should not be used as a replacement for normal closing.
See https://commons.apache.org/proper/commons-io/javadocs/api-2.5/org/apache/commons/io/IOUtils.html#closeQuietly(java.io.Closeable...)

So this is not right.

} finally {
IOUtils.closeQuietly(inputStream)
IOUtils.closeQuietly(gzOutputStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kinds of error do we expect? If there is an exception, we will lose log data, right? Seems we should at least have a log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may throw some kind of IOException which will be logged at rollover() method.

}
} else {
Files.move(activeFile, rolloverFile)
}
}

// Check if the rollover file already exists.
private def rolloverFileExist(file: File): Boolean = {
file.exists || new File(file.getAbsolutePath + GZIP_LOG_SUFFIX).exists
}

/** Move the active log file to a new rollover file */
private def moveFile() {
val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
val rolloverFile = new File(
activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
if (activeFile.exists) {
if (!rolloverFile.exists) {
Files.move(activeFile, rolloverFile)
if (!rolloverFileExist(rolloverFile)) {
rotateFile(activeFile, rolloverFile)
logInfo(s"Rolled over $activeFile to $rolloverFile")
} else {
// In case the rollover file name clashes, make a unique file name.
Expand All @@ -97,11 +127,11 @@ private[spark] class RollingFileAppender(
altRolloverFile = new File(activeFile.getParent,
s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to double check. If we enable compression, the suffix will still be gz even we use the alternative file name, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be whatever we have before + .gz.

i += 1
} while (i < 10000 && altRolloverFile.exists)
} while (i < 10000 && rolloverFileExist(altRolloverFile))

logWarning(s"Rollover file $rolloverFile already exists, " +
s"rolled over $activeFile to file $altRolloverFile")
Files.move(activeFile, altRolloverFile)
rotateFile(activeFile, altRolloverFile)
}
} else {
logWarning(s"File $activeFile does not exist")
Expand Down Expand Up @@ -142,6 +172,9 @@ private[spark] object RollingFileAppender {
val SIZE_DEFAULT = (1024 * 1024).toString
val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
val DEFAULT_BUFFER_SIZE = 8192
val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt we document this in the spark docs?


val GZIP_LOG_SUFFIX = ".gz"

/**
* Get the sorted list of rolled over files. This assumes that the all the rolled
Expand All @@ -158,6 +191,6 @@ private[spark] object RollingFileAppender {
val file = new File(directory, activeFileName).getAbsoluteFile
if (file.exists) Some(file) else None
}
rolledOverFiles ++ activeFile
rolledOverFiles.sortBy(_.getName.stripSuffix(GZIP_LOG_SUFFIX)) ++ activeFile
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ import java.io.{File, FileWriter}
import org.mockito.Mockito.{mock, when}
import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.worker.Worker

class LogPageSuite extends SparkFunSuite with PrivateMethodTester {

test("get logs simple") {
val webui = mock(classOf[WorkerWebUI])
val worker = mock(classOf[Worker])
val tmpDir = new File(sys.props("java.io.tmpdir"))
val workDir = new File(tmpDir, "work-dir")
workDir.mkdir()
when(webui.workDir).thenReturn(workDir)
when(webui.worker).thenReturn(worker)
when(worker.conf).thenReturn(new SparkConf())
val logPage = new LogPage(webui)

// Prepare some fake log files to read later
Expand Down
60 changes: 57 additions & 3 deletions core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ package org.apache.spark.util
import java.io._
import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch
import java.util.zip.GZIPInputStream

import scala.collection.mutable.HashSet
import scala.reflect._

import com.google.common.io.Files
import org.apache.commons.io.IOUtils
import org.apache.log4j.{Appender, Level, Logger}
import org.apache.log4j.spi.LoggingEvent
import org.mockito.ArgumentCaptor
Expand Down Expand Up @@ -72,6 +74,25 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis)
}

test("rolling file appender - time-based rolling (compressed)") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
val rolloverIntervalMillis = 100
val durationMillis = 1000
val numRollovers = durationMillis / rolloverIntervalMillis
val textToAppend = (1 to numRollovers).map( _.toString * 10 )

val sparkConf = new SparkConf()
sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
val appender = new RollingFileAppender(testInputStream, testFile,
new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false),
sparkConf, 10)

testRolling(
appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true)
}

test("rolling file appender - size-based rolling") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
Expand All @@ -89,6 +110,25 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
}
}

test("rolling file appender - size-based rolling (compressed)") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
val rolloverSize = 1000
val textToAppend = (1 to 3).map( _.toString * 1000 )

val sparkConf = new SparkConf()
sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
val appender = new RollingFileAppender(testInputStream, testFile,
new SizeBasedRollingPolicy(rolloverSize, false), sparkConf, 99)

val files = testRolling(appender, testOutputStream, textToAppend, 0, isCompressed = true)
files.foreach { file =>
logInfo(file.toString + ": " + file.length + " bytes")
assert(file.length < rolloverSize)
}
}

test("rolling file appender - cleaning") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
Expand Down Expand Up @@ -273,7 +313,8 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
appender: FileAppender,
outputStream: OutputStream,
textToAppend: Seq[String],
sleepTimeBetweenTexts: Long
sleepTimeBetweenTexts: Long,
isCompressed: Boolean = false
): Seq[File] = {
// send data to appender through the input stream, and wait for the data to be written
val expectedText = textToAppend.mkString("")
Expand All @@ -290,10 +331,23 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
// verify whether all the data written to rolled over files is same as expected
val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
testFile.getParentFile.toString, testFile.getName)
logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
logInfo("Generate files: \n" + generatedFiles.mkString("\n"))
assert(generatedFiles.size > 1)
if (isCompressed) {
assert(
generatedFiles.filter(_.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)).size > 0)
}
val allText = generatedFiles.map { file =>
Files.toString(file, StandardCharsets.UTF_8)
if (file.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)) {
val inputStream = new GZIPInputStream(new FileInputStream(file))
try {
IOUtils.toString(inputStream, StandardCharsets.UTF_8)
} finally {
IOUtils.closeQuietly(inputStream)
}
} else {
Files.toString(file, StandardCharsets.UTF_8)
}
}.mkString("")
assert(allText === expectedText)
generatedFiles
Expand Down
Loading