Skip to content

Commit

Permalink
pyarrow fix (#65)
Browse files Browse the repository at this point in the history
pyarrow path fix for executor
  • Loading branch information
svittoz authored May 28, 2024
1 parent faa5a74 commit 09b7f50
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 12 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

### Fixed
- pyarrow fix did not work on spark executors

## v0.1.7 (2024-04-12)
### Changed
- Support for pyarrow > 0.17.0
Expand Down
14 changes: 3 additions & 11 deletions eds_scikit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,25 @@
import importlib
import os
import pathlib
import sys
import time
from packaging import version
from typing import List, Tuple
from pathlib import Path

import pandas as pd
import pyarrow
import pyarrow.ipc
import pyspark
from loguru import logger
from pyspark import SparkContext
from pyspark.sql import SparkSession

import eds_scikit.biology # noqa: F401 --> To register functions
from eds_scikit.io import improve_performances

pyarrow.open_stream = pyarrow.ipc.open_stream

sys.path.insert(
0, (pathlib.Path(__file__).parent / "package-override").absolute().as_posix()
)
os.environ["PYTHONPATH"] = ":".join(sys.path)
from eds_scikit.io import improve_performances, pyarrow_fix

# Remove SettingWithCopyWarning
pd.options.mode.chained_assignment = None

pyarrow_fix()

logger.warning(
"""To improve performances when using Spark and Koalas, please call `eds_scikit.improve_performances()`
This function optimally configures Spark. Use it as:
Expand Down
8 changes: 7 additions & 1 deletion eds_scikit/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from .files import PandasData
from .hive import HiveData
from .postgres import PostgresData
from .improve_performance import improve_performances, koalas_options, load_koalas
from .improve_performance import (
improve_performances,
koalas_options,
load_koalas,
pyarrow_fix,
)


__all__ = [
"BaseData",
Expand Down
31 changes: 31 additions & 0 deletions eds_scikit/io/improve_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import List, Tuple

import pyarrow
import pyarrow.ipc
import pyspark
from packaging import version
from pyspark import SparkContext
Expand Down Expand Up @@ -50,6 +51,36 @@ def set_env_variables() -> None:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "0"


def pyarrow_fix():
"""
Fixing error 'pyarrow has no attributes open_stream' due to PySpark 2 incompatibility with pyarrow > 0.17
"""

# Setting path to our patched pyarrow module
pyarrow.open_stream = pyarrow.ipc.open_stream

sys.path.insert(
0, (Path(__file__).parent / "package-override").absolute().as_posix()
)
os.environ["PYTHONPATH"] = ":".join(sys.path)

# Setting this path for Pyspark executors
global spark, sc, sql

spark = SparkSession.builder.getOrCreate()

conf = spark.sparkContext.getConf()
conf.set(
"spark.executorEnv.PYTHONPATH",
f"{Path(__file__).parent.parent}/package-override:{conf.get('spark.executorEnv.PYTHONPATH')}",
)
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

sc = spark.sparkContext

sql = spark.sql


def improve_performances(
to_add_conf: List[Tuple[str, str]] = [],
quiet_spark: bool = True,
Expand Down

0 comments on commit 09b7f50

Please sign in to comment.