Skip to content

Commit

Permalink
[SPARK-48182][SQL] SQL (java side): Migrate error/warn/info with va…
Browse files Browse the repository at this point in the history
…riables to structured logging framework

### What changes were proposed in this pull request?
The pr aims to
1.migrate `error/warn/info` in module `SQL` with variables to `structured logging framework` for java side.
2.convert all dependencies on `org.slf4j.Logger & org.slf4j.LoggerFactory` to `org.apache.spark.internal.Logger & org.apache.spark.internal.LoggerFactory`, in order to completely `prohibit` importing `org.slf4j.Logger & org.slf4j.LoggerFactory` in java code later.

### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#46450 from panbingkun/sql_java_sl.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
panbingkun authored and gengliangwang committed May 9, 2024
1 parent 5891b20 commit 85a6e35
Show file tree
Hide file tree
Showing 24 changed files with 222 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,8 @@ static MessageThrowable of(String message, Throwable throwable) {
return new MessageThrowable(message, throwable);
}
}

public org.slf4j.Logger getSlf4jLogger() {
return slf4jLogger;
}
}
13 changes: 13 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ object LogKeys {
case object FROM_TIME extends LogKey
case object FUNCTION_NAME extends LogKey
case object FUNCTION_PARAMETER extends LogKey
case object GLOBAL_INIT_FILE extends LogKey
case object GLOBAL_WATERMARK extends LogKey
case object GROUP_BY_EXPRS extends LogKey
case object GROUP_ID extends LogKey
Expand Down Expand Up @@ -275,6 +276,7 @@ object LogKeys {
case object KAFKA_RECORDS_PULLED_COUNT extends LogKey
case object KEY extends LogKey
case object KEYTAB extends LogKey
case object KEYTAB_FILE extends LogKey
case object LABEL_COLUMN extends LogKey
case object LARGEST_CLUSTER_INDEX extends LogKey
case object LAST_ACCESS_TIME extends LogKey
Expand All @@ -290,6 +292,7 @@ object LogKeys {
case object LOADED_VERSION extends LogKey
case object LOAD_FACTOR extends LogKey
case object LOAD_TIME extends LogKey
case object LOCAL_SCRATCH_DIR extends LogKey
case object LOCATION extends LogKey
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
Expand Down Expand Up @@ -411,6 +414,8 @@ object LogKeys {
case object OLD_GENERATION_GC extends LogKey
case object OLD_VALUE extends LogKey
case object OPEN_COST_IN_BYTES extends LogKey
case object OPERATION_HANDLE extends LogKey
case object OPERATION_HANDLE_IDENTIFIER extends LogKey
case object OPTIMIZED_PLAN_COLUMNS extends LogKey
case object OPTIMIZER_CLASS_NAME extends LogKey
case object OPTIONS extends LogKey
Expand Down Expand Up @@ -458,6 +463,7 @@ object LogKeys {
case object PROCESSING_TIME extends LogKey
case object PRODUCER_ID extends LogKey
case object PROPERTY_NAME extends LogKey
case object PROTOCOL_VERSION extends LogKey
case object PROVIDER extends LogKey
case object PUSHED_FILTERS extends LogKey
case object PVC_METADATA_NAME extends LogKey
Expand Down Expand Up @@ -523,9 +529,11 @@ object LogKeys {
case object SERVER_NAME extends LogKey
case object SERVICE_NAME extends LogKey
case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey
case object SESSION_HANDLE extends LogKey
case object SESSION_HOLD_INFO extends LogKey
case object SESSION_ID extends LogKey
case object SESSION_KEY extends LogKey
case object SET_CLIENT_INFO_REQUEST extends LogKey
case object SHARD_ID extends LogKey
case object SHELL_COMMAND extends LogKey
case object SHUFFLE_BLOCK_INFO extends LogKey
Expand Down Expand Up @@ -578,6 +586,7 @@ object LogKeys {
case object SUBSAMPLING_RATE extends LogKey
case object SUB_QUERY extends LogKey
case object TABLE_NAME extends LogKey
case object TABLE_TYPE extends LogKey
case object TABLE_TYPES extends LogKey
case object TARGET_NUM_EXECUTOR extends LogKey
case object TARGET_NUM_EXECUTOR_DELTA extends LogKey
Expand All @@ -595,13 +604,17 @@ object LogKeys {
case object THREAD extends LogKey
case object THREAD_ID extends LogKey
case object THREAD_NAME extends LogKey
case object THREAD_POOL_KEEPALIVE_TIME extends LogKey
case object THREAD_POOL_SIZE extends LogKey
case object THREAD_POOL_WAIT_QUEUE_SIZE extends LogKey
case object TID extends LogKey
case object TIME extends LogKey
case object TIMEOUT extends LogKey
case object TIMER extends LogKey
case object TIMESTAMP extends LogKey
case object TIME_UNITS extends LogKey
case object TIP extends LogKey
case object TOKEN extends LogKey
case object TOKEN_KIND extends LogKey
case object TOKEN_REGEX extends LogKey
case object TOPIC extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
import java.io.Closeable;
import java.io.IOException;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.memory.MemoryBlock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* RowBasedKeyValueBatch stores key value pairs in contiguous memory region.
*
Expand Down Expand Up @@ -127,7 +127,8 @@ private boolean acquirePage(long requiredSize) {
try {
page = allocatePage(requiredSize);
} catch (SparkOutOfMemoryError e) {
logger.warn("Failed to allocate page ({} bytes).", requiredSize);
logger.warn("Failed to allocate page ({} bytes).",
MDC.of(LogKeys.PAGE_SIZE$.MODULE$, requiredSize));
return false;
}
base = page.getBaseObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@

package org.apache.spark.sql.util;

import org.apache.spark.SparkIllegalArgumentException;
import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.annotation.Experimental;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -31,6 +25,14 @@
import java.util.Objects;
import java.util.Set;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;
import org.apache.spark.SparkIllegalArgumentException;
import org.apache.spark.SparkUnsupportedOperationException;

/**
* Case-insensitive map of string keys to string values.
* <p>
Expand Down Expand Up @@ -59,8 +61,8 @@ public CaseInsensitiveStringMap(Map<String, String> originalMap) {
for (Map.Entry<String, String> entry : originalMap.entrySet()) {
String key = toLowerCase(entry.getKey());
if (delegate.containsKey(key)) {
logger.warn("Converting duplicated key " + entry.getKey() +
" into CaseInsensitiveStringMap.");
logger.warn("Converting duplicated key {} into CaseInsensitiveStringMap.",
MDC.of(LogKeys.KEY$.MODULE$, entry.getKey()));
}
delegate.put(key, entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.util.List;

import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;

/**
* AbstractService.
Expand Down Expand Up @@ -85,7 +88,7 @@ public synchronized void init(HiveConf hiveConf) {
ensureCurrentState(STATE.NOTINITED);
this.hiveConf = hiveConf;
changeState(STATE.INITED);
LOG.info("Service:" + getName() + " is inited.");
LOG.info("Service:{} is inited.", MDC.of(LogKeys.SERVICE_NAME$.MODULE$, getName()));
}

/**
Expand All @@ -100,7 +103,7 @@ public synchronized void start() {
startTime = System.currentTimeMillis();
ensureCurrentState(STATE.INITED);
changeState(STATE.STARTED);
LOG.info("Service:" + getName() + " is started.");
LOG.info("Service:{} is started.", MDC.of(LogKeys.SERVICE_NAME$.MODULE$, getName()));
}

/**
Expand All @@ -121,7 +124,7 @@ public synchronized void stop() {
}
ensureCurrentState(STATE.STARTED);
changeState(STATE.STOPPED);
LOG.info("Service:" + getName() + " is stopped.");
LOG.info("Service:{} is stopped.", MDC.of(LogKeys.SERVICE_NAME$.MODULE$, getName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.util.List;

import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;

/**
* CompositeService.
Expand Down Expand Up @@ -70,7 +73,7 @@ public synchronized void start() {
}
super.start();
} catch (Throwable e) {
LOG.error("Error starting services " + getName(), e);
LOG.error("Error starting services {}", e, MDC.of(LogKeys.SERVICE_NAME$.MODULE$, getName()));
// Note that the state of the failed service is still INITED and not
// STARTED. Even though the last service is not started completely, still
// call stop() on all services including failed service to make sure cleanup
Expand Down Expand Up @@ -100,7 +103,7 @@ private synchronized void stop(int numOfServicesStarted) {
try {
service.stop();
} catch (Throwable t) {
LOG.info("Error stopping " + service.getName(), t);
LOG.info("Error stopping {}", t, MDC.of(LogKeys.SERVICE_NAME$.MODULE$, service.getName()));
}
}
}
Expand All @@ -123,7 +126,8 @@ public void run() {
// Stop the Composite Service
compositeService.stop();
} catch (Throwable t) {
LOG.info("Error stopping " + compositeService.getName(), t);
LOG.info("Error stopping {}", t,
MDC.of(LogKeys.SERVICE_NAME$.MODULE$, compositeService.getName()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import java.security.NoSuchAlgorithmException;

import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;

/**
* The cookie signer generates a signature based on SHA digest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.hive.service;

import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;

/**
* ServiceOperations.
Expand Down Expand Up @@ -129,9 +132,8 @@ public static Exception stopQuietly(Service service) {
try {
stop(service);
} catch (Exception e) {
LOG.warn("When stopping the service " + service.getName()
+ " : " + e,
e);
LOG.warn("When stopping the service {}", e,
MDC.of(LogKeys.SERVICE_NAME$.MODULE$, service.getName()));
return e;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.io.IOException;

import org.slf4j.Logger;
import org.apache.spark.internal.Logger;

public class ServiceUtils {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,19 @@
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;

/**
* This class helps in some aspects of authentication. It creates the proper Thrift classes for the
* given configuration as well as helps with authenticating requests.
*/
public class HiveAuthFactory {
private static final Logger LOG = LoggerFactory.getLogger(HiveAuthFactory.class);

private static final Logger LOG = LoggerFactory.getLogger(HiveAuthFactory.class);

public enum AuthTypes {
NOSASL("NOSASL"),
Expand Down Expand Up @@ -285,9 +288,9 @@ public String verifyDelegationToken(String delegationToken) throws HiveSQLExcept
try {
return delegationTokenManager.verifyDelegationToken(delegationToken);
} catch (IOException e) {
String msg = "Error verifying delegation token " + delegationToken;
LOG.error(msg, e);
throw new HiveSQLException(msg, "08S01", e);
String msg = "Error verifying delegation token";
LOG.error(msg + " {}", e, MDC.of(LogKeys.TOKEN$.MODULE$, delegationToken));
throw new HiveSQLException(msg + delegationToken, "08S01", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;
import org.apache.spark.internal.LogKeys;
import org.apache.spark.internal.MDC;

/**
* Utility functions for HTTP mode authentication.
Expand Down Expand Up @@ -109,7 +112,8 @@ public static String getUserNameFromCookieToken(String tokenStr) {
Map<String, String> map = splitCookieToken(tokenStr);

if (!map.keySet().equals(COOKIE_ATTRIBUTES)) {
LOG.error("Invalid token with missing attributes " + tokenStr);
LOG.error("Invalid token with missing attributes {}",
MDC.of(LogKeys.TOKEN$.MODULE$, tokenStr));
return null;
}
return map.get(COOKIE_CLIENT_USER_NAME);
Expand All @@ -129,7 +133,7 @@ private static Map<String, String> splitCookieToken(String tokenStr) {
String part = st.nextToken();
int separator = part.indexOf(COOKIE_KEY_VALUE_SEPARATOR);
if (separator == -1) {
LOG.error("Invalid token string " + tokenStr);
LOG.error("Invalid token string {}", MDC.of(LogKeys.TOKEN$.MODULE$, tokenStr));
return null;
}
String key = part.substring(0, separator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.Logger;
import org.apache.spark.internal.LoggerFactory;

/**
* This class is responsible for setting the ipAddress for operations executed via HiveServer2.
Expand All @@ -38,7 +39,7 @@
*/
public class TSetIpAddressProcessor<I extends Iface> extends TCLIService.Processor<Iface> {

private static final Logger LOGGER = LoggerFactory.getLogger(TSetIpAddressProcessor.class.getName());
private static final Logger LOGGER = LoggerFactory.getLogger(TSetIpAddressProcessor.class);

public TSetIpAddressProcessor(Iface iface) {
super(iface);
Expand Down
Loading

0 comments on commit 85a6e35

Please sign in to comment.