Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion common/utils/src/main/java/org/apache/spark/internal/Logger.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public class Logger {
this.slf4jLogger = slf4jLogger;
}

public boolean isErrorEnabled() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is found that some logic uses this method during migration.

return slf4jLogger.isErrorEnabled();
}

public void error(String msg) {
slf4jLogger.error(msg);
}
Expand All @@ -58,6 +62,10 @@ public void error(String msg, Throwable throwable, MDC... mdcs) {
}
}

public boolean isWarnEnabled() {
return slf4jLogger.isWarnEnabled();
}

public void warn(String msg) {
slf4jLogger.warn(msg);
}
Expand All @@ -82,6 +90,10 @@ public void warn(String msg, Throwable throwable, MDC... mdcs) {
}
}

public boolean isInfoEnabled() {
return slf4jLogger.isInfoEnabled();
}

public void info(String msg) {
slf4jLogger.info(msg);
}
Expand All @@ -106,6 +118,10 @@ public void info(String msg, Throwable throwable, MDC... mdcs) {
}
}

public boolean isDebugEnabled() {
return slf4jLogger.isDebugEnabled();
}

public void debug(String msg) {
slf4jLogger.debug(msg);
}
Expand All @@ -126,6 +142,10 @@ public void debug(String msg, Throwable throwable) {
slf4jLogger.debug(msg, throwable);
}

public boolean isTraceEnabled() {
return slf4jLogger.isTraceEnabled();
}

public void trace(String msg) {
slf4jLogger.trace(msg);
}
Expand All @@ -146,7 +166,6 @@ public void trace(String msg, Throwable throwable) {
slf4jLogger.trace(msg, throwable);
}


private void withLogContext(
String pattern,
MDC[] mdcs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ object LogKeys {
case object EXCEPTION extends LogKey
case object EXECUTE_INFO extends LogKey
case object EXECUTE_KEY extends LogKey
case object EXECUTION_MEMORY_SIZE extends LogKey
case object EXECUTION_PLAN_LEAVES extends LogKey
case object EXECUTOR_BACKEND extends LogKey
case object EXECUTOR_DESIRED_COUNT extends LogKey
Expand Down Expand Up @@ -302,6 +303,7 @@ object LogKeys {
case object MAX_SLOTS extends LogKey
case object MAX_SPLIT_BYTES extends LogKey
case object MAX_TABLE_PARTITION_METADATA_SIZE extends LogKey
case object MEMORY_CONSUMER extends LogKey
case object MEMORY_POOL_NAME extends LogKey
case object MEMORY_SIZE extends LogKey
case object MERGE_DIR_NAME extends LogKey
Expand Down Expand Up @@ -342,6 +344,7 @@ object LogKeys {
case object NUM_CONCURRENT_WRITER extends LogKey
case object NUM_CORES extends LogKey
case object NUM_DROPPED_PARTITIONS extends LogKey
case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey
case object NUM_EVENTS extends LogKey
case object NUM_EXAMPLES extends LogKey
case object NUM_EXECUTOR_CORES extends LogKey
Expand Down Expand Up @@ -375,6 +378,8 @@ object LogKeys {
case object NUM_RIGHT_PARTITION_VALUES extends LogKey
case object NUM_SEQUENCES extends LogKey
case object NUM_SLOTS extends LogKey
case object NUM_SPILL_INFOS extends LogKey
case object NUM_SPILL_WRITERS extends LogKey
case object NUM_TASKS extends LogKey
case object NUM_TASK_CPUS extends LogKey
case object NUM_VERSIONS_RETAIN extends LogKey
Expand All @@ -394,6 +399,7 @@ object LogKeys {
case object OP_TYPE extends LogKey
case object OUTPUT extends LogKey
case object OVERHEAD_MEMORY_SIZE extends LogKey
case object PAGE_SIZE extends LogKey
case object PARSE_MODE extends LogKey
case object PARTITIONED_FILE_READER extends LogKey
case object PARTITIONER extends LogKey
Expand Down Expand Up @@ -502,6 +508,7 @@ object LogKeys {
case object SOCKET_ADDRESS extends LogKey
case object SPARK_DATA_STREAM extends LogKey
case object SPARK_PLAN_ID extends LogKey
case object SPILL_TIMES extends LogKey
case object SQL_TEXT extends LogKey
case object SRC_PATH extends LogKey
case object STAGE_ATTEMPT extends LogKey
Expand All @@ -516,6 +523,7 @@ object LogKeys {
case object STORAGE_LEVEL extends LogKey
case object STORAGE_LEVEL_DESERIALIZED extends LogKey
case object STORAGE_LEVEL_REPLICATION extends LogKey
case object STORAGE_MEMORY_SIZE extends LogKey
case object STORE_ID extends LogKey
case object STREAMING_DATA_SOURCE_DESCRIPTION extends LogKey
case object STREAMING_DATA_SOURCE_NAME extends LogKey
Expand Down Expand Up @@ -543,6 +551,7 @@ object LogKeys {
case object TEMP_PATH extends LogKey
case object TEST_SIZE extends LogKey
case object THREAD extends LogKey
case object THREAD_ID extends LogKey
case object THREAD_NAME extends LogKey
case object TID extends LogKey
case object TIME extends LogKey
Expand Down
19 changes: 11 additions & 8 deletions core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@
*/
package org.apache.spark.io;

import com.google.common.base.Preconditions;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking this chance, let's adjust the import order of java code

import com.google.common.base.Throwables;
import org.apache.spark.util.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -30,6 +23,16 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.util.ThreadUtils;

/**
* {@link InputStream} implementation which asynchronously reads ahead from the underlying input
Expand Down Expand Up @@ -205,7 +208,7 @@ private void closeUnderlyingInputStreamIfNecessary() {
try {
underlyingInputStream.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
logger.warn("{}", e, MDC.of(LogKeys.ERROR$.MODULE$, e.getMessage()));
}
}
}
Expand Down
28 changes: 19 additions & 9 deletions core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import java.util.TreeMap;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;

Expand Down Expand Up @@ -244,10 +246,12 @@ private long trySpillAndAcquire(
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + consumerToSpill, e);
logger.error("error while calling spill() on {}", e,
MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + consumerToSpill, e);
logger.error("error while calling spill() on {}", e,
MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + consumerToSpill + " : "
+ e.getMessage());
Expand All @@ -270,24 +274,29 @@ public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
* Dump the memory usage of all consumers.
*/
public void showMemoryUsage() {
logger.info("Memory used in task " + taskAttemptId);
logger.info("Memory used in task {}",
MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId));
synchronized (this) {
long memoryAccountedForByConsumers = 0;
for (MemoryConsumer c: consumers) {
long totalMemUsage = c.getUsed();
memoryAccountedForByConsumers += totalMemUsage;
if (totalMemUsage > 0) {
logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage));
logger.info("Acquired by {}: {}",
MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, c),
MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(totalMemUsage)));
}
}
long memoryNotAccountedFor =
memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers;
logger.info(
"{} bytes of memory were used by task {} but are not associated with specific consumers",
memoryNotAccountedFor, taskAttemptId);
MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, memoryNotAccountedFor),
MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId));
logger.info(
"{} bytes of memory are used for execution and {} bytes of memory are used for storage",
memoryManager.executionMemoryUsed(), memoryManager.storageMemoryUsed());
MDC.of(LogKeys.EXECUTION_MEMORY_SIZE$.MODULE$, memoryManager.executionMemoryUsed()),
MDC.of(LogKeys.STORAGE_MEMORY_SIZE$.MODULE$, memoryManager.storageMemoryUsed()));
}
}

Expand Down Expand Up @@ -333,7 +342,8 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
logger.warn("Failed to allocate a page ({} bytes), try again.",
MDC.of(LogKeys.PAGE_SIZE$.MODULE$, acquired));
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import scala.collection.Iterator;

import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -223,7 +225,8 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
writePartitionedDataWithStream(file, writer);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
logger.error("Unable to delete file for partition {}",
MDC.of(LogKeys.PARTITION_ID$.MODULE$, i));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
import java.util.LinkedList;
import java.util.zip.Checksum;

import org.apache.spark.SparkException;
import scala.Tuple2;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.internal.config.package$;
import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
Expand Down Expand Up @@ -159,11 +161,11 @@ private void writeSortedFile(boolean isFinalFile) {
if (!isFinalFile) {
logger.info(
"Task {} on Thread {} spilling sort data of {} to disk ({} {} so far)",
taskContext.taskAttemptId(),
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
spills.size(),
spills.size() != 1 ? " times" : " time");
MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskContext.taskAttemptId()),
MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()),
MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(getMemoryUsage())),
MDC.of(LogKeys.NUM_SPILL_INFOS$.MODULE$, spills.size()),
MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spills.size() != 1 ? "times" : "time"));
}

// This call performs the actual sort.
Expand Down Expand Up @@ -349,7 +351,8 @@ public void cleanupResources() {
}
for (SpillInfo spill : spills) {
if (spill.file.exists() && !spill.file.delete()) {
logger.error("Unable to delete spill file {}", spill.file.getPath());
logger.error("Unable to delete spill file {}",
MDC.of(LogKeys.PATH$.MODULE$, spill.file.getPath()));
}
}
}
Expand Down Expand Up @@ -416,8 +419,8 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
// for tests
assert(inMemSorter != null);
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
numElementsForSpillThreshold);
logger.info("Spilling data because number of spilledRecords crossed the threshold {}" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When re-implementing SPARK-27734, I found that there was something wrong with the logs here.

{}" + MDC.of -> {}", MDC.of

MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, numElementsForSpillThreshold));
spill();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.*;
import org.apache.spark.annotation.Private;
import org.apache.spark.internal.config.package$;
import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.io.NioBufferedFileInputStream;
Expand Down Expand Up @@ -226,7 +228,8 @@ void closeAndWriteOutput() throws IOException {
sorter = null;
for (SpillInfo spill : spills) {
if (spill.file.exists() && !spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
logger.error("Error while deleting spill file {}",
MDC.of(LogKeys.PATH$.MODULE$, spill.file.getPath()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
import java.nio.channels.WritableByteChannel;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.SparkConf;
import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
Expand Down Expand Up @@ -123,7 +124,8 @@ public MapOutputCommitMessage commitAllPartitions(long[] checksums) throws IOExc
public void abort(Throwable error) throws IOException {
cleanUp();
if (outputTempFile != null && outputTempFile.exists() && !outputTempFile.delete()) {
log.warn("Failed to delete temporary shuffle file at {}", outputTempFile.getAbsolutePath());
log.warn("Failed to delete temporary shuffle file at {}",
MDC.of(LogKeys.PATH$.MODULE$, outputTempFile.getAbsolutePath()));
}
}

Expand Down
Loading