Skip to content

Conversation

@gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented Mar 26, 2024

What changes were proposed in this pull request?

Introduce Structured Logging Framework as per SPIP: Structured Logging Framework for Apache Spark .

  • The default logging output format will be json lines. For example
{
   "ts":"2023-03-12T12:02:46.661-0700",
   "level":"ERROR",
   "msg":"Cannot determine whether executor 289 is alive or not",
   "context":{
       "executor_id":"289"
   },
   "exception":{
      "class":"org.apache.spark.SparkException",
      "msg":"Exception thrown in awaitResult",
      "stackTrace":"..."
   },
   "source":"BlockManagerMasterEndpoint"
} 
  • Introduce a new configuration spark.log.structuredLogging.enabled to set the default log4j configuration. It is true by default. Users can disable it to get plain text log outputs.
  • The change will start with the logError method. Example changes on the API:
    from
logError(s"Cannot determine whether executor $executorId is alive or not.", e)

to

logError(log"Cannot determine whether executor ${MDC(EXECUTOR_ID, executorId)} is alive or not.", e)

Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging. This transition will change the format of the default log output from plain text to JSON lines, making it more analyzable.

Does this PR introduce any user-facing change?

Yes, the default log output format will be json lines instead of plain text. User can restore the default plain text output when disabling configuration spark.log.structuredLogging.enabled.
If a user is a customized log4j configuration, there is no changes in the log output.

How was this patch tested?

New Unit tests

Was this patch authored or co-authored using generative AI tooling?

Yes, some of the code comments are from github copilot

@gengliangwang
Copy link
Member Author

cc @steveloughran @dtenedor @@bart-samwel as well

Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super exciting start! Left some comments below.

// Mapped Diagnostic Context (MDC) that will be used in log messages.
// The values of the MDC will be inline in the log message, while the key-value pairs will be
// part of the ThreadContext.
case class MDC(key: LogKey.Value, value: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would probably be more readable fully spelled-out (MappedDiagnosticContext) rather than an acronym, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we are putting the class name into the log message. I am trying to avoid a long class name here.

lazy val map = new java.util.HashMap[String, String]()

args.foreach { arg =>
arg match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can skip this line and the arg => on the end of the previous line and just list the cases directly instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another code block at the end of the loop:

        if (processedParts.hasNext) {
          sb.append(processedParts.next())
        }

I am trying to build both the map and the string in one loop.

val logOutput = outContent.toString.split("\n").filter(_.contains(msg)).head
assert(logOutput.nonEmpty)
// scalastyle:off line.size.limit
val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log message","logger":"$className"}""".r
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this test suite. It looks like it will become the main means of exercising that the structured logging framework will work as intended as we develop it. I know we want to regex out spurious text in the result here, but the set of expected test cases might be easier to read if we keep whitespace formatting in each expected result. Then when we compare expected results against actual results, we can strip whitespace from both. For example, this becomes something like:

  val x =
    s"""
      |{
      |  "ts": [^"]+,
      |  "level": "ERROR",
      |  "msg": "This is a log message",
      |  "logger": $className
      |}
      |""".stripMargin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output is always one line of json. https://logging.apache.org/log4j/2.x/manual/json-template-layout.html doesn't support pretty print. Also, parsing multiple-line json is slower than single line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang I know, I was talking about in the unit test case, not the production JSON output itself. It is fine that the production JSON record is on one line. I am suggesting that we make the JSON in the unit tests formatted in a more readable way, and then we can ignore whitespace when comparing the expected JSON record against the expected one in each test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dtenedor @gengliangwang
Considering the readability of JSON format in UT, I submitted a PR to improve it
#45784

// scalastyle:off line.size.limit
val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log message","logger":"$className"}""".r
// scalastyle:on
assert(pattern.matches(logOutput))
Copy link
Contributor

@amaliujia amaliujia Mar 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding to the test case readability, I am wondering if we at last put the value of the logOutput as a comment here with newlines and whitespaces inserted to have better readability, so we can read the pattern and then quickly read the value in comment to understand what this test case does?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore if the value is not stable thus whenever we put it in the comment, it becomes stable very soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got the idea. Please check the latest test

@github-actions github-actions bot added the DOCS label Mar 28, 2024
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.EXECUTOR_ID

abstract class LoggingSuiteBase extends AnyFunSuite // scalastyle:ignore funsuite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we use SparkFunSuite?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test suite is under common/utils module and can't import SparkFunSuite

val childClasspath = new ArrayBuffer[String]()
val sparkConf = args.toSparkConf()
if (sparkConf.contains("spark.local.connect")) sparkConf.remove("spark.remote")
if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work for spark-shell as well? And thriftserver?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

@gengliangwang
Copy link
Member Author

@amaliujia @dtenedor @beliefer @HyukjinKwon @cloud-fan Thanks for the reviews!
The PR has been open for 3 days. I am merging this one to master and moving forward.

.createWithDefault(0.1)

private[spark] val STRUCTURED_LOGGING_ENABLED =
ConfigBuilder("spark.log.structuredLogging.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this config needs to be documented on configuration.md

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I just created https://issues.apache.org/jira/browse/SPARK-47671 for this.

@gengliangwang gengliangwang changed the title [SPARK-47574][INFRA] Introduce Structured Logging Framework [SPARK-47574][CORE] Introduce Structured Logging Framework Apr 3, 2024
@pan3793
Copy link
Member

pan3793 commented Apr 4, 2024

@gengliangwang I see the SPIP docs say

Spark identifiers (e.g., query ID, executor ID, task ID) will be tagged using ThreadContext, e.g., ThreadContext.set(EXECUTOR_ID, executorId).

Seems it does not get implemented in this PR, in the migration PRs, we still inject the Spark identifiers like APP_ID manually in each message. Another question is, as we use the enum LogKey to track all known MDC keys, is it possible to inject custom keys? For example, users may have custom labels on the Spark nodes, and they also want to aggregate logs by those custom labels.

@gengliangwang
Copy link
Member Author

Seems it does not get implemented in this PR, in the migration PRs, we still inject the Spark identifiers like APP_ID manually in each message.

This will be done.
The current migration is necessary, for example, there are logs about executor/worker on driver.

Another question is, as we use the enum LogKey to track all known MDC keys, is it possible to inject custom keys?

It is possible, developers can either use ThreadContext from Log4j, or create a customized MessageWithContext entry

case class MessageWithContext(message: String, context: java.util.HashMap[String, String])

@pan3793
Copy link
Member

pan3793 commented Apr 4, 2024

@gengliangwang thanks for the clarification, that makes sense.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit late, but added my comments to match my doc proposal

key: what happens with nested exceptions?

I'd also propose having a real parser of the exceptions which can then be used to take a file of logs and actually read it as input to an rdd.

given the goal is "generate logs processable by code", include the code to process it.

logError(log"Error in executor ${MDC(EXECUTOR_ID, "1")}.", exception))
assert(logOutput.nonEmpty)
// scalastyle:off line.size.limit
val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"Error in executor 1.","context":\\{"executor_id":"1"},"exception":\\{"class":"java.lang.RuntimeException","msg":"OOM","stacktrace":.*},"logger":"$className"}\n""".r
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. you should be deserializing this rather than just doing pattern matching, then validate the contents
  2. and verify that on nested exceptions, you get both messages and stack traces

HyukjinKwon pushed a commit that referenced this pull request Jul 18, 2024
…mproved structured logging for PySpark

### What changes were proposed in this pull request?

This PR introduces the `pyspark.logger` module to facilitate structured client-side logging for PySpark users.

This module includes a `PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format:
- `PySparkLogger.info`
- `PySparkLogger.warning`
- `PySparkLogger.error`

The logger can be easily configured to write logs to either the console or a specified file.

## DataFrame error log improvement

This PR also improves the DataFrame API error logs by leveraging this new logging framework:

### **Before**

We introduced structured logging from #45729, but PySpark log is still hard to figure out in the current structured log, because it is hidden and mixed within bunch of complex JVM stacktraces and it's also not very Python-friendly:

```json
{
  "ts": "2024-06-28T10:53:48.528Z",
  "level": "ERROR",
  "msg": "Exception in task 7.0 in stage 0.0 (TID 7)",
  "context": {
    "task_name": "task 7.0 in stage 0.0 (TID 7)"
  },
  "exception": {
    "class": "org.apache.spark.SparkArithmeticException",
    "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
    "stacktrace": [
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors$",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": 203
      },
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "project_doConsume_0$",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "processNext",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.execution.BufferedRowIterator",
        "method": "hasNext",
        "file": "BufferedRowIterator.java",
        "line": 43
      },
      {
        "class": "org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1",
        "method": "hasNext",
        "file": "WholeStageCodegenEvaluatorFactory.scala",
        "line": 50
      },
      {
        "class": "org.apache.spark.sql.execution.SparkPlan",
        "method": "$anonfun$getByteArrayRdd$1",
        "file": "SparkPlan.scala",
        "line": 388
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2$adapted",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.MapPartitionsRDD",
        "method": "compute",
        "file": "MapPartitionsRDD.scala",
        "line": 52
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "computeOrReadCheckpoint",
        "file": "RDD.scala",
        "line": 369
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "iterator",
        "file": "RDD.scala",
        "line": 333
      },
      {
        "class": "org.apache.spark.scheduler.ResultTask",
        "method": "runTask",
        "file": "ResultTask.scala",
        "line": 93
      },
      {
        "class": "org.apache.spark.TaskContext",
        "method": "runTaskWithListeners",
        "file": "TaskContext.scala",
        "line": 171
      },
      {
        "class": "org.apache.spark.scheduler.Task",
        "method": "run",
        "file": "Task.scala",
        "line": 146
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "$anonfun$run$5",
        "file": "Executor.scala",
        "line": 644
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally",
        "file": "SparkErrorUtils.scala",
        "line": 64
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally$",
        "file": "SparkErrorUtils.scala",
        "line": 61
      },
      {
        "class": "org.apache.spark.util.Utils$",
        "method": "tryWithSafeFinally",
        "file": "Utils.scala",
        "line": 99
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "run",
        "file": "Executor.scala",
        "line": 647
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1136
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 635
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 840
      }
    ]
  },
  "logger": "Executor"
}

```

### **After**

Now we can get a improved, simplified and also Python-friendly error log for DataFrame errors:

```json
{
  "ts": "2024-06-28 19:53:48,563",
  "level": "ERROR",
  "logger": "DataFrameQueryContextLogger",
  "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
  "context": {
    "file": "/.../spark/python/test_error_context.py",
    "line_no": "17",
    "fragment": "__truediv__"
    "error_class": "DIVIDE_BY_ZERO"
  },
  "exception": {
    "class": "Py4JJavaError",
    "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22\n\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)\n\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)\n\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)\n\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n",
    "stacktrace": ["Traceback (most recent call last):", "  File \"/Users/haejoon.lee/Desktop/git_repos/spark/python/pyspark/errors/exceptions/captured.py\", line 272, in deco", "    return f(*a, **kw)", "  File \"/Users/haejoon.lee/anaconda3/envs/pyspark-dev-env/lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value", "    raise Py4JJavaError(", "py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.", ": org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012", "== DataFrame ==", "\"__truediv__\" was called from", "/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22", "", "\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)", "\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)", "\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)", "\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)", "\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)", "\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)", "\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)", "\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)", "\tat org.apache.spark.scheduler.Task.run(Task.scala:146)", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)", "\tat java.base/java.lang.Thread.run(Thread.java:840)", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)", "\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)", "\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)", "\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)", "\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)", "\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)", "\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)", "\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)", "\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)", "\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)", "\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)", "\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)", "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)", "\tat py4j.Gateway.invoke(Gateway.java:282)", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)", "\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)", "\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)", "\tat java.base/java.lang.Thread.run(Thread.java:840)"]
  },
}
```

### Why are the changes needed?

**Before**

Currently we don't have PySpark dedicated logging module so we have to manually set up and customize the Python logging module, for example:

```python
logger = logging.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}")
```

This logs an information just in a following simple string:

```
INFO:TestLogger:User test_user takes an test_action
```

This is not very actionable, and it is hard to analyze not since it is not well-structured.

Or we can use Log4j from JVM which resulting in excessively detailed logs as described in the above example, and this way even cannot be applied to Spark Connect.

**After**

We can simply import and use `PySparkLogger` with minimal setup:

```python
from pyspark.logger import PySparkLogger
logger = PySparkLogger.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}", user=user, action=action)
```

This logs an information in a following JSON format:

```json
{
  "ts": "2024-06-28 19:44:19,030",
  "level": "WARNING",
  "logger": "TestLogger",
  "msg": "User test_user takes an test_action",
  "context": {
    "user": "test_user",
    "action": "test_action"
  },
}
```

**NOTE:** we can add as many keyword arguments as we want for each logging methods. These keyword arguments, such as `user` and `action` in the example, are included within the `"context"` field of the JSON log. This structure makes it easy to track and analyze the log.

### Does this PR introduce _any_ user-facing change?

No API changes, but the PySpark client-side logging is improved.

Also added user-facing documentation "Logging in PySpark":

<img width="1395" alt="Screenshot 2024-07-16 at 5 40 41 PM" src="https://github.com/user-attachments/assets/c77236aa-1c6f-4b5b-ad14-26ccdc474f59">

Also added API reference:

<img width="1417" alt="Screenshot 2024-07-16 at 5 40 58 PM" src="https://github.com/user-attachments/assets/6bb3fb23-6847-4086-8f4b-bcf9f4242724">

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47145 from itholic/pyspark_logger.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
…mproved structured logging for PySpark

### What changes were proposed in this pull request?

This PR introduces the `pyspark.logger` module to facilitate structured client-side logging for PySpark users.

This module includes a `PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format:
- `PySparkLogger.info`
- `PySparkLogger.warning`
- `PySparkLogger.error`

The logger can be easily configured to write logs to either the console or a specified file.

## DataFrame error log improvement

This PR also improves the DataFrame API error logs by leveraging this new logging framework:

### **Before**

We introduced structured logging from apache#45729, but PySpark log is still hard to figure out in the current structured log, because it is hidden and mixed within bunch of complex JVM stacktraces and it's also not very Python-friendly:

```json
{
  "ts": "2024-06-28T10:53:48.528Z",
  "level": "ERROR",
  "msg": "Exception in task 7.0 in stage 0.0 (TID 7)",
  "context": {
    "task_name": "task 7.0 in stage 0.0 (TID 7)"
  },
  "exception": {
    "class": "org.apache.spark.SparkArithmeticException",
    "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
    "stacktrace": [
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors$",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": 203
      },
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "project_doConsume_0$",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "processNext",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.execution.BufferedRowIterator",
        "method": "hasNext",
        "file": "BufferedRowIterator.java",
        "line": 43
      },
      {
        "class": "org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1",
        "method": "hasNext",
        "file": "WholeStageCodegenEvaluatorFactory.scala",
        "line": 50
      },
      {
        "class": "org.apache.spark.sql.execution.SparkPlan",
        "method": "$anonfun$getByteArrayRdd$1",
        "file": "SparkPlan.scala",
        "line": 388
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2$adapted",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.MapPartitionsRDD",
        "method": "compute",
        "file": "MapPartitionsRDD.scala",
        "line": 52
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "computeOrReadCheckpoint",
        "file": "RDD.scala",
        "line": 369
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "iterator",
        "file": "RDD.scala",
        "line": 333
      },
      {
        "class": "org.apache.spark.scheduler.ResultTask",
        "method": "runTask",
        "file": "ResultTask.scala",
        "line": 93
      },
      {
        "class": "org.apache.spark.TaskContext",
        "method": "runTaskWithListeners",
        "file": "TaskContext.scala",
        "line": 171
      },
      {
        "class": "org.apache.spark.scheduler.Task",
        "method": "run",
        "file": "Task.scala",
        "line": 146
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "$anonfun$run$5",
        "file": "Executor.scala",
        "line": 644
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally",
        "file": "SparkErrorUtils.scala",
        "line": 64
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally$",
        "file": "SparkErrorUtils.scala",
        "line": 61
      },
      {
        "class": "org.apache.spark.util.Utils$",
        "method": "tryWithSafeFinally",
        "file": "Utils.scala",
        "line": 99
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "run",
        "file": "Executor.scala",
        "line": 647
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1136
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 635
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 840
      }
    ]
  },
  "logger": "Executor"
}

```

### **After**

Now we can get a improved, simplified and also Python-friendly error log for DataFrame errors:

```json
{
  "ts": "2024-06-28 19:53:48,563",
  "level": "ERROR",
  "logger": "DataFrameQueryContextLogger",
  "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
  "context": {
    "file": "/.../spark/python/test_error_context.py",
    "line_no": "17",
    "fragment": "__truediv__"
    "error_class": "DIVIDE_BY_ZERO"
  },
  "exception": {
    "class": "Py4JJavaError",
    "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22\n\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)\n\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)\n\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)\n\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)\n\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)\n\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)\n\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)\n\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n",
    "stacktrace": ["Traceback (most recent call last):", "  File \"/Users/haejoon.lee/Desktop/git_repos/spark/python/pyspark/errors/exceptions/captured.py\", line 272, in deco", "    return f(*a, **kw)", "  File \"/Users/haejoon.lee/anaconda3/envs/pyspark-dev-env/lib/python3.9/site-packages/py4j/protocol.py\", line 326, in get_return_value", "    raise Py4JJavaError(", "py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.", ": org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012", "== DataFrame ==", "\"__truediv__\" was called from", "/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22", "", "\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)", "\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)", "\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)", "\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)", "\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)", "\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)", "\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)", "\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)", "\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)", "\tat org.apache.spark.scheduler.Task.run(Task.scala:146)", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)", "\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:647)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)", "\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)", "\tat java.base/java.lang.Thread.run(Thread.java:840)", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1007)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2479)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)", "\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2523)", "\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1052)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)", "\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)", "\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:412)", "\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1051)", "\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)", "\tat org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4449)", "\tat org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)", "\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:599)", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:154)", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:263)", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:118)", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:923)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:74)", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:218)", "\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)", "\tat org.apache.spark.sql.Dataset.head(Dataset.scala:3393)", "\tat org.apache.spark.sql.Dataset.take(Dataset.scala:3626)", "\tat org.apache.spark.sql.Dataset.getRows(Dataset.scala:294)", "\tat org.apache.spark.sql.Dataset.showString(Dataset.scala:330)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)", "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)", "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", "\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)", "\tat py4j.Gateway.invoke(Gateway.java:282)", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)", "\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)", "\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)", "\tat java.base/java.lang.Thread.run(Thread.java:840)"]
  },
}
```

### Why are the changes needed?

**Before**

Currently we don't have PySpark dedicated logging module so we have to manually set up and customize the Python logging module, for example:

```python
logger = logging.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}")
```

This logs an information just in a following simple string:

```
INFO:TestLogger:User test_user takes an test_action
```

This is not very actionable, and it is hard to analyze not since it is not well-structured.

Or we can use Log4j from JVM which resulting in excessively detailed logs as described in the above example, and this way even cannot be applied to Spark Connect.

**After**

We can simply import and use `PySparkLogger` with minimal setup:

```python
from pyspark.logger import PySparkLogger
logger = PySparkLogger.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}", user=user, action=action)
```

This logs an information in a following JSON format:

```json
{
  "ts": "2024-06-28 19:44:19,030",
  "level": "WARNING",
  "logger": "TestLogger",
  "msg": "User test_user takes an test_action",
  "context": {
    "user": "test_user",
    "action": "test_action"
  },
}
```

**NOTE:** we can add as many keyword arguments as we want for each logging methods. These keyword arguments, such as `user` and `action` in the example, are included within the `"context"` field of the JSON log. This structure makes it easy to track and analyze the log.

### Does this PR introduce _any_ user-facing change?

No API changes, but the PySpark client-side logging is improved.

Also added user-facing documentation "Logging in PySpark":

<img width="1395" alt="Screenshot 2024-07-16 at 5 40 41 PM" src="https://github.com/user-attachments/assets/c77236aa-1c6f-4b5b-ad14-26ccdc474f59">

Also added API reference:

<img width="1417" alt="Screenshot 2024-07-16 at 5 40 58 PM" src="https://github.com/user-attachments/assets/6bb3fb23-6847-4086-8f4b-bcf9f4242724">

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47145 from itholic/pyspark_logger.

Authored-by: Haejoon Lee <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants