Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better SparkSession settings for localhost #143

Open
MrPowers opened this issue Oct 19, 2023 · 10 comments
Open

Better SparkSession settings for localhost #143

MrPowers opened this issue Oct 19, 2023 · 10 comments

Comments

@MrPowers
Copy link
Collaborator

Users need to configure their SparkSession for localhost development so computations run fast and so that they don't run out of memory.

Here are some examples I ran on my local machine that has 64GB of RAM on the 1e9 h2o groupby dataset (has 1 billion rows of data).

Here's the "better config":

builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.executor.memory", "10G")
    .config("spark.driver.memory", "25G")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.sql.shuffle.partitions", "2")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Here's the default config:


builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

groupby query

This query takes 104 seconds with the "better config":

delta_table = delta.DeltaTable.forPath(
    spark, f"{Path.home()}/data/deltalake/G1_1e9_1e2_0_0"
)

delta_table.toDF().groupby("id3").agg(F.sum("v1"), F.mean("v3")).limit(10).collect()

This same query errors out with the default config.

join query

This query takes 69 seconds with the "better config", but 111 seconds with the default config:

x = spark.read.format("delta").load(f"{Path.home()}/data/deltalake/J1_1e9_1e9_0_0")
small = spark.read.format("parquet").load(f"{Path.home()}/data/J1_1e9_1e3_0_0.parquet")

spark.sql('select x.id2, sum(small.v2) from x join small using (id1) group by x.id2').show()

Conclusion

SparkSession configurations significantly impact the localhost Spark runtime experience.

How can we make it easy for Spark users to get optimal configurations for localhost development?

@lucazanna
Copy link

Good job @MrPowers to get this conversation started

I think we have 3 ways to do this (going from less likely to have an impact to most likely to have an impact)

1- recommend the different configuration on the Spark docs, get started guide, etc
This will help some users, but most users just use the standard config so this will likely have a small impact

2- change Spark so it automatically uses the ‘better’ configuration above
However the ‘better’ configuration will depend on the machine and the dataset. a better configuration in one case does not mean it’s the best configuration always

3- change Spark so that it ‘guesses’ and uses a configuration based on the ram available, the number of CPU cores, etc
I believe this is what Polars does. This config should changeable by the user

On top of improving Spark performance (which is important), I wonder what should be the Spark positioning : should Spark be the engine for big data or the engine for data of any size?
if the positioning for Spark is the engine for big data, then I would not mind if it’s slower than duckdb on small data because that’s not what it’s built for.

Your thoughts?

@MrPowers
Copy link
Collaborator Author

@lucazanna - I think we should first figure out the true capabilities of Spark locally and then figure out the best messaging. Here are the results for one of the h2o queries:

Screenshot 2023-10-20 at 9 50 50 AM

I think the current benchmarks are really misleading...

@jeffbrennan
Copy link
Collaborator

I'm in favor of the automatic configuration (leaning towards higher memory consumption) with configurable parameters that the user can change if needed.

I think these are good configurable parameters:

  • executor memory
  • driver memory
  • shuffle partitions

For executor and driver memory we could do a percentage of available system memory. It doesn't look like there's a good way to do this with Python's standard library but psutil https://pypi.org/project/psutil/ looks like a popular way to get this info.

@MrPowers
Copy link
Collaborator Author

@jeffbrennan - figuring out how to programatically set the best settings is a great goal.

The first step is to get everyone with the same datasets on their local machines so we can tinker and find what settings work best. There are so many Spark configuration options and I'm not even sure which knobs need to be turned (let alone how to optimally turn them)!

@MrPowers
Copy link
Collaborator Author

Here are some other suggestions that might be useful: https://luminousmen.com/post/how-to-speed-up-spark-jobs-on-small-test-datasets

@bjornjorgensen
Copy link
Contributor

I do use this one

# Standard library imports
import json
import multiprocessing
import os
import re
import sys
import random
import time

# Third-party imports
import numpy as np
import pandas as pd
import pyarrow

# Pyspark imports
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.functions import (
    col, concat, concat_ws, expr, lit, trim, udf
)
from pyspark.sql.types import (
    IntegerType, StringType, StructField, StructType,
    DoubleType, TimestampType
)
from pyspark import pandas as ps

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  # e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.repl.eagerEval.enabled", "True"
    ).set("spark.sql.adaptive.enabled", "True").set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "10000"
    ).set(
        "sc.setLogLevel", "ERROR"
    )

    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My app", SparkConf())
spark.sparkContext.setLogLevel("ERROR")

With this one users get max mem and cpu in local mode.

@MrPowers
Copy link
Collaborator Author

@geoHeil
Copy link

geoHeil commented Nov 16, 2023

Do you absolutely need spark? What about polars or duckdb in case you only target single node deployments?

@SemyonSinchenko
Copy link
Collaborator

Do you absolutely need spark? What about polars or duckdb in case you only target single node deployments?

The topic is about running unit tests of spark routines. These tests are running on a single node (locally). Maybe I'm missing something, but how polars/duckdb may help here?

@geoHeil
Copy link

geoHeil commented Nov 16, 2023

No- for these purposes it will not help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants