Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5775a29
[SPARK-52518][TEST] Fix package path
xu20160924 Jun 24, 2025
ad17c2b
[SPARK-52518][TEST] Fix package path
xu20160924 Jun 24, 2025
c035246
Merge branch 'apache:master' into master
xu20160924 Jul 3, 2025
d447ab2
Merge branch 'apache:master' into master
xu20160924 Jul 6, 2025
28cc475
Merge branch 'apache:master' into master
xu20160924 Jul 7, 2025
f747d58
Merge branch 'apache:master' into master
xu20160924 Jul 14, 2025
d138b19
Merge branch 'apache:master' into master
xu20160924 Jul 19, 2025
f000cf1
Merge branch 'apache:master' into master
xu20160924 Aug 7, 2025
3531c3c
Merge branch 'apache:master' into master
xu20160924 Oct 22, 2025
59bcb65
Merge branch 'apache:master' into master
xu20160924 Oct 22, 2025
e4a3fac
Merge branch 'apache:master' into master
xu20160924 Oct 22, 2025
34bcf3f
Merge branch 'apache:master' into master
xu20160924 Oct 24, 2025
98e7be7
Merge branch 'apache:master' into master
xu20160924 Dec 7, 2025
f692772
[SPARK-54629][CONNECT][TEST] Supplement test coverage for getString w…
xu20160924 Dec 7, 2025
0f75c8c
[SPARK-54629][CONNECT][TEST] Supplement test coverage for getString w…
xu20160924 Dec 7, 2025
e7d0e89
[SPARK-54629][CONNECT][TEST] Supplement test coverage for getString w…
xu20160924 Dec 7, 2025
0b37ffe
[SPARK-54622][SQL] Promote `RequiresDistributionAndOrdering` and its …
dongjoon-hyun Dec 7, 2025
00c09db
[SPARK-54631][PYTHON] Add profiler support for Arrow Grouped Iter Agg…
Yicong-Huang Dec 7, 2025
58ba7db
[SPARK-53615][FOLLOWUP][PYTHON][TEST] Fix test case for arrow grouped…
Yicong-Huang Dec 7, 2025
ff86856
[SPARK-54628][PYTHON] Remove all unnecessary explicit super() arguments
gaogaotiantian Dec 7, 2025
9f89584
[SPARK-54627][PYTHON] Remove redundant initializations in serializers
Yicong-Huang Dec 7, 2025
2ad4040
[SPARK-54623][PYTHON][TEST] Add coverage test for UDTF null checker
gaogaotiantian Dec 7, 2025
22a8d1e
[SPARK-54615][PYTHON] Always pass runner_conf to python worker
gaogaotiantian Dec 7, 2025
ec27785
[SPARK-54068][PYTHON] Fix `to_feather` to support PyArrow 22.0.0
ashrithb Dec 8, 2025
dd289aa
[SPARK-54635][BUILD] Remove commons-lang3 from network-shuffle
pan3793 Dec 8, 2025
c4c2949
[SPARK-54087][CORE][FOLLOWUP] Spark Executor launch task failed retur…
AngersZhuuuu Dec 8, 2025
42159fe
[SPARK-54638][SQL][TEST] Reuse statement instances if possible in `Sp…
cloud-fan Dec 8, 2025
3e6baf6
[SPARK-54640][PYTHON] Replace select.select with select.poll on UNIX
gaogaotiantian Dec 8, 2025
5c2676d
[SPARK-54641][BUILD] Upgrade `ammonite` to 3.0.5
dongjoon-hyun Dec 8, 2025
cbce202
[SPARK-54595][SQL] Keep existing behavior of MERGE INTO without SCHEM…
szehon-ho Dec 8, 2025
0ecd39e
[SPARK-54642][BUILD] Upgrade AWS SDK v2 to 2.35.4
dongjoon-hyun Dec 9, 2025
c1d1625
[SPARK-54581][SQL] Making fetchsize option case-insensitive for Postg…
marko-sisovic-db Dec 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
protected val hideTraceback: Boolean = false
protected val simplifiedTraceback: Boolean = false

protected val runnerConf: Map[String, String] = Map.empty

// All the Python functions should have the same exec, version and envvars.
protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars
protected val pythonExec: String = funcs.head.funcs.head.pythonExec
Expand Down Expand Up @@ -403,6 +405,17 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
*/
protected def writeCommand(dataOut: DataOutputStream): Unit

/**
* Writes worker configuration to the stream connected to the Python worker.
*/
protected def writeRunnerConf(dataOut: DataOutputStream): Unit = {
dataOut.writeInt(runnerConf.size)
for ((k, v) <- runnerConf) {
PythonWorkerUtils.writeUTF(k, dataOut)
PythonWorkerUtils.writeUTF(v, dataOut)
}
}

/**
* Writes input data to the stream connected to the Python worker.
* Returns true if any data was written to the stream, false if the input is exhausted.
Expand Down Expand Up @@ -532,6 +545,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
PythonWorkerUtils.writeBroadcasts(broadcastVars, worker, env, dataOut)

dataOut.writeInt(evalType)
writeRunnerConf(dataOut)
writeCommand(dataOut)

dataOut.flush()
Expand Down
4 changes: 0 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,6 @@ private[spark] class Executor(
TaskState.FAILED,
env.closureSerializer.newInstance().serialize(new ExceptionFailure(t, Seq.empty)))
} catch {
case oom: OutOfMemoryError =>
logError(log"Executor update launching task ${MDC(TASK_NAME, taskDescription.name)} " +
log"failed status failed, reason: ${MDC(REASON, oom.getMessage)}")
System.exit(SparkExitCode.OOM)
case t: Throwable =>
logError(log"Executor update launching task ${MDC(TASK_NAME, taskDescription.name)} " +
log"failed status failed, reason: ${MDC(REASON, t.getMessage)}")
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ azure-storage/7.0.1//azure-storage-7.0.1.jar
blas/3.0.4//blas-3.0.4.jar
breeze-macros_2.13/2.1.0//breeze-macros_2.13-2.1.0.jar
breeze_2.13/2.1.0//breeze_2.13-2.1.0.jar
bundle/2.29.52//bundle-2.29.52.jar
bundle/2.35.4//bundle-2.35.4.jar
cats-kernel_2.13/2.8.0//cats-kernel_2.13-2.8.0.jar
chill-java/0.10.0//chill-java-0.10.0.jar
chill_2.13/0.10.0//chill_2.13-0.10.0.jar
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@
<aws.kinesis.client.version>1.15.3</aws.kinesis.client.version>
<!-- Should be consistent with Kinesis client dependency -->
<aws.java.sdk.version>1.12.681</aws.java.sdk.version>
<aws.java.sdk.v2.version>2.29.52</aws.java.sdk.v2.version>
<aws.java.sdk.v2.version>2.35.4</aws.java.sdk.v2.version>
<!-- the producer is used in tests -->
<aws.kinesis.producer.version>1.0.5</aws.kinesis.producer.version>
<!-- Do not use 3.0.0: https://github.com/GoogleCloudDataproc/hadoop-connectors/issues/1114 -->
Expand Down Expand Up @@ -233,7 +233,7 @@
./python/packaging/client/setup.py, and ./python/packaging/connect/setup.py too.
-->
<arrow.version>18.3.0</arrow.version>
<ammonite.version>3.0.4</ammonite.version>
<ammonite.version>3.0.5</ammonite.version>
<jjwt.version>0.12.6</jjwt.version>

<!-- org.fusesource.leveldbjni will be used except on arm64 platform. -->
Expand Down
32 changes: 25 additions & 7 deletions python/pyspark/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,26 @@ def handle_sigterm(*args):

# Initialization complete
try:
poller = None
if os.name == "posix":
# select.select has a known limit on the number of file descriptors
# it can handle. We use select.poll instead to avoid this limit.
poller = select.poll()
fd_reverse_map = {0: 0, listen_sock.fileno(): listen_sock}
poller.register(0, select.POLLIN)
poller.register(listen_sock, select.POLLIN)

while True:
try:
ready_fds = select.select([0, listen_sock], [], [], 1)[0]
except select.error as ex:
if ex[0] == EINTR:
continue
else:
raise
if poller is not None:
ready_fds = [fd_reverse_map[fd] for fd, _ in poller.poll(1000)]
else:
try:
ready_fds = select.select([0, listen_sock], [], [], 1)[0]
except select.error as ex:
if ex[0] == EINTR:
continue
else:
raise

if 0 in ready_fds:
try:
Expand Down Expand Up @@ -208,6 +220,9 @@ def handle_sigterm(*args):

if pid == 0:
# in child process
if poller is not None:
poller.unregister(0)
poller.unregister(listen_sock)
listen_sock.close()

# It should close the standard input in the child process so that
Expand Down Expand Up @@ -256,6 +271,9 @@ def handle_sigterm(*args):
sock.close()

finally:
if poller is not None:
poller.unregister(0)
poller.unregister(listen_sock)
shutdown(1)


Expand Down
42 changes: 21 additions & 21 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ class _LinearSVCParams(
)

def __init__(self, *args: Any) -> None:
super(_LinearSVCParams, self).__init__(*args)
super().__init__(*args)
self._setDefault(
maxIter=100,
regParam=0.0,
Expand Down Expand Up @@ -743,7 +743,7 @@ def __init__(
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
aggregationDepth=2, maxBlockSizeInMB=0.0):
"""
super(LinearSVC, self).__init__()
super().__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.LinearSVC", self.uid
)
Expand Down Expand Up @@ -1019,7 +1019,7 @@ class _LogisticRegressionParams(
)

def __init__(self, *args: Any):
super(_LogisticRegressionParams, self).__init__(*args)
super().__init__(*args)
self._setDefault(
maxIter=100, regParam=0.0, tol=1e-6, threshold=0.5, family="auto", maxBlockSizeInMB=0.0
)
Expand Down Expand Up @@ -1328,7 +1328,7 @@ def __init__(
maxBlockSizeInMB=0.0):
If the threshold and thresholds Params are both set, they must be equivalent.
"""
super(LogisticRegression, self).__init__()
super().__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.LogisticRegression", self.uid
)
Expand Down Expand Up @@ -1676,7 +1676,7 @@ class _DecisionTreeClassifierParams(_DecisionTreeParams, _TreeClassifierParams):
"""

def __init__(self, *args: Any):
super(_DecisionTreeClassifierParams, self).__init__(*args)
super().__init__(*args)
self._setDefault(
maxDepth=5,
maxBins=32,
Expand Down Expand Up @@ -1809,7 +1809,7 @@ def __init__(
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", \
seed=None, weightCol=None, leafCol="", minWeightFractionPerNode=0.0)
"""
super(DecisionTreeClassifier, self).__init__()
super().__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.DecisionTreeClassifier", self.uid
)
Expand Down Expand Up @@ -1970,7 +1970,7 @@ class _RandomForestClassifierParams(_RandomForestParams, _TreeClassifierParams):
"""

def __init__(self, *args: Any):
super(_RandomForestClassifierParams, self).__init__(*args)
super().__init__(*args)
self._setDefault(
maxDepth=5,
maxBins=32,
Expand Down Expand Up @@ -2106,7 +2106,7 @@ def __init__(
numTrees=20, featureSubsetStrategy="auto", seed=None, subsamplingRate=1.0, \
leafCol="", minWeightFractionPerNode=0.0, weightCol=None, bootstrap=True)
"""
super(RandomForestClassifier, self).__init__()
super().__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.RandomForestClassifier", self.uid
)
Expand Down Expand Up @@ -2400,7 +2400,7 @@ class _GBTClassifierParams(_GBTParams, _HasVarianceImpurity):
)

def __init__(self, *args: Any):
super(_GBTClassifierParams, self).__init__(*args)
super().__init__(*args)
self._setDefault(
maxDepth=5,
maxBins=32,
Expand Down Expand Up @@ -2577,7 +2577,7 @@ def __init__(
validationIndicatorCol=None, leafCol="", minWeightFractionPerNode=0.0, \
weightCol=None)
"""
super(GBTClassifier, self).__init__()
super().__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.GBTClassifier", self.uid
)
Expand Down Expand Up @@ -2823,7 +2823,7 @@ class _NaiveBayesParams(_PredictorParams, HasWeightCol):
)

def __init__(self, *args: Any):
super(_NaiveBayesParams, self).__init__(*args)
super().__init__(*args)
self._setDefault(smoothing=1.0, modelType="multinomial")

@since("1.5.0")
Expand Down Expand Up @@ -2964,7 +2964,7 @@ def __init__(
probabilityCol="probability", rawPredictionCol="rawPrediction", smoothing=1.0, \
modelType="multinomial", thresholds=None, weightCol=None)
"""
super(NaiveBayes, self).__init__()
super().__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.NaiveBayes", self.uid
)
Expand Down Expand Up @@ -3093,7 +3093,7 @@ class _MultilayerPerceptronParams(
)

def __init__(self, *args: Any):
super(_MultilayerPerceptronParams, self).__init__(*args)
super().__init__(*args)
self._setDefault(maxIter=100, tol=1e-6, blockSize=128, stepSize=0.03, solver="l-bfgs")

@since("1.6.0")
Expand Down Expand Up @@ -3219,7 +3219,7 @@ def __init__(
solver="l-bfgs", initialWeights=None, probabilityCol="probability", \
rawPredictionCol="rawPrediction")
"""
super(MultilayerPerceptronClassifier, self).__init__()
super().__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.MultilayerPerceptronClassifier", self.uid
)
Expand Down Expand Up @@ -3484,7 +3484,7 @@ def __init__(
__init__(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \
rawPredictionCol="rawPrediction", classifier=None, weightCol=None, parallelism=1):
"""
super(OneVsRest, self).__init__()
super().__init__()
self._setDefault(parallelism=1)
kwargs = self._input_kwargs
self._set(**kwargs)
Expand Down Expand Up @@ -3749,7 +3749,7 @@ def validateParams(instance: Union[OneVsRest, "OneVsRestModel"]) -> None:
@inherit_doc
class OneVsRestReader(MLReader[OneVsRest]):
def __init__(self, cls: Type[OneVsRest]) -> None:
super(OneVsRestReader, self).__init__()
super().__init__()
self.cls = cls

def load(self, path: str) -> OneVsRest:
Expand All @@ -3765,7 +3765,7 @@ def load(self, path: str) -> OneVsRest:
@inherit_doc
class OneVsRestWriter(MLWriter):
def __init__(self, instance: OneVsRest):
super(OneVsRestWriter, self).__init__()
super().__init__()
self.instance = instance

def saveImpl(self, path: str) -> None:
Expand Down Expand Up @@ -3807,7 +3807,7 @@ def setRawPredictionCol(self, value: str) -> "OneVsRestModel":
return self._set(rawPredictionCol=value)

def __init__(self, models: List[ClassificationModel]):
super(OneVsRestModel, self).__init__()
super().__init__()
self.models = models
if is_remote() or not isinstance(models[0], JavaMLWritable):
return
Expand Down Expand Up @@ -3980,7 +3980,7 @@ def write(self) -> MLWriter:
@inherit_doc
class OneVsRestModelReader(MLReader[OneVsRestModel]):
def __init__(self, cls: Type[OneVsRestModel]):
super(OneVsRestModelReader, self).__init__()
super().__init__()
self.cls = cls

def load(self, path: str) -> OneVsRestModel:
Expand All @@ -4002,7 +4002,7 @@ def load(self, path: str) -> OneVsRestModel:
@inherit_doc
class OneVsRestModelWriter(MLWriter):
def __init__(self, instance: OneVsRestModel):
super(OneVsRestModelWriter, self).__init__()
super().__init__()
self.instance = instance

def saveImpl(self, path: str) -> None:
Expand Down Expand Up @@ -4119,7 +4119,7 @@ def __init__(
miniBatchFraction=1.0, initStd=0.01, maxIter=100, stepSize=1.0, \
tol=1e-6, solver="adamW", thresholds=None, seed=None)
"""
super(FMClassifier, self).__init__()
super().__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.FMClassifier", self.uid
)
Expand Down
Loading