From 386bc4354cd2b8a7a96edd0a9c05c81d0e1766d8 Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Thu, 29 Jun 2023 10:15:31 +0800 Subject: [PATCH 1/7] Add support to joblib>=1.3.0 --- joblibspark/backend.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/joblibspark/backend.py b/joblibspark/backend.py index dc7af28..0d99437 100644 --- a/joblibspark/backend.py +++ b/joblibspark/backend.py @@ -25,7 +25,12 @@ from joblib.parallel \ import AutoBatchingMixin, ParallelBackendBase, register_parallel_backend, SequentialBackend -from joblib._parallel_backends import SafeFunction + +try: + from joblib._parallel_backends import SafeFunction +except ImportError: + # joblib >= 1.3.0 + SafeFunction = None from py4j.clientserver import ClientServer @@ -201,11 +206,15 @@ def mapper_fn(_): inheritable_thread_target(run_on_worker_and_fetch_result) except ImportError: pass - - return self._get_pool().apply_async( - SafeFunction(run_on_worker_and_fetch_result), - callback=callback - ) + + if SafeFunction is not None: + return self._get_pool().apply_async( + SafeFunction(run_on_worker_and_fetch_result), callback=callback + ) + else: + return self._get_pool().apply_async( + run_on_worker_and_fetch_result, callback=callback + ) def get_nested_backend(self): """Backend instance to be used by nested Parallel calls. From 19d76d9c2d18394c8876aba8d3f29de92c57c51e Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Thu, 29 Jun 2023 10:29:23 +0800 Subject: [PATCH 2/7] Fix workflow --- .github/workflows/main.yml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 19c7947..a3525ff 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -25,11 +25,7 @@ jobs: - name: Install python packages run: | pip install joblib>=0.14.0 scikit-learn>=0.23.1 pytest pylint - if [ "${{ matrix.PYSPARK_VERSION }}" = "3.4.0" ]; then - pip install https://dist.apache.org/repos/dist/dev/spark/v3.4.0-rc1-bin/pyspark-3.4.0.tar.gz - else - pip install pyspark==${{ matrix.PYSPARK_VERSION }} - fi; + pip install pyspark==${{ matrix.PYSPARK_VERSION }} - name: Run pylint run: | ./run-pylint.sh From 38a292081e6008d762a1311e639fd5dfa5c46b42 Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Thu, 29 Jun 2023 10:37:34 +0800 Subject: [PATCH 3/7] Fix pylint --- joblibspark/backend.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/joblibspark/backend.py b/joblibspark/backend.py index 0d99437..ac6bee4 100644 --- a/joblibspark/backend.py +++ b/joblibspark/backend.py @@ -206,16 +206,17 @@ def mapper_fn(_): inheritable_thread_target(run_on_worker_and_fetch_result) except ImportError: pass - - if SafeFunction is not None: - return self._get_pool().apply_async( - SafeFunction(run_on_worker_and_fetch_result), callback=callback - ) - else: + + if SafeFunction is None: return self._get_pool().apply_async( run_on_worker_and_fetch_result, callback=callback ) + return self._get_pool().apply_async( + SafeFunction(run_on_worker_and_fetch_result), callback=callback + ) + + def get_nested_backend(self): """Backend instance to be used by nested Parallel calls. For nested backend, always use `SequentialBackend` From a9fc38993b8ee5ee74c97f037497ef70291b8b84 Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Thu, 29 Jun 2023 10:40:04 +0800 Subject: [PATCH 4/7] Update version --- joblibspark/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/joblibspark/__init__.py b/joblibspark/__init__.py index 7553d85..ab9872b 100644 --- a/joblibspark/__init__.py +++ b/joblibspark/__init__.py @@ -18,7 +18,7 @@ """ Joblib spark backend is a extension for joblib, which make joblib running on spark parallelly. """ -__version__ = '0.5.1' +__version__ = '0.5.2' def register_spark(): From ab09b90f31c5b1bf8da49bc12395e2a38e9c5403 Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Thu, 29 Jun 2023 14:08:07 +0800 Subject: [PATCH 5/7] Use _wrap_func_call when SafeFunction is None --- joblibspark/backend.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/joblibspark/backend.py b/joblibspark/backend.py index ac6bee4..e761c28 100644 --- a/joblibspark/backend.py +++ b/joblibspark/backend.py @@ -30,6 +30,7 @@ from joblib._parallel_backends import SafeFunction except ImportError: # joblib >= 1.3.0 + from joblib._parallel_backends import PoolManagerMixin SafeFunction = None from py4j.clientserver import ClientServer @@ -209,7 +210,8 @@ def mapper_fn(_): if SafeFunction is None: return self._get_pool().apply_async( - run_on_worker_and_fetch_result, callback=callback + PoolManagerMixin._wrap_func_call, (run_on_worker_and_fetch_result,), + callback=callback, error_callback=callback ) return self._get_pool().apply_async( From 0291514d527b428d668a81ce4985670da27fc18c Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Thu, 29 Jun 2023 14:15:20 +0800 Subject: [PATCH 6/7] Add test different joblib versions --- .github/workflows/main.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a3525ff..47e21ed 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -6,7 +6,8 @@ jobs: strategy: fail-fast: false matrix: - PYTHON_VERSION: ["3.7", "3.8", "3.9", "3.10"] + PYTHON_VERSION: ["3.8", "3.10"] + JOBLIB_VERSION: ["1.2.0", "1.3.0"] PIN_MODE: [false, true] PYSPARK_VERSION: ["3.0.3", "3.1.3", "3.2.3", "3.3.2", "3.4.0"] exclude: @@ -14,7 +15,7 @@ jobs: PIN_MODE: true - PYSPARK_VERSION: "3.1.3" PIN_MODE: true - name: Run test on pyspark ${{ matrix.PYSPARK_VERSION }}, pin_mode ${{ matrix.PIN_MODE }}, python ${{ matrix.PYTHON_VERSION }} + name: Run test on pyspark ${{ matrix.PYSPARK_VERSION }}, pin_mode ${{ matrix.PIN_MODE }}, python ${{ matrix.PYTHON_VERSION }}, joblib ${{ matrix.JOBLIB_VERSION }} steps: - uses: actions/checkout@v3 - name: Setup python ${{ matrix.PYTHON_VERSION }} @@ -24,7 +25,7 @@ jobs: architecture: x64 - name: Install python packages run: | - pip install joblib>=0.14.0 scikit-learn>=0.23.1 pytest pylint + pip install joblib==${{ matrix.JOBLIB_VERSION }} scikit-learn>=0.23.1 pytest pylint pip install pyspark==${{ matrix.PYSPARK_VERSION }} - name: Run pylint run: | From 7ebff1a7925a3eaec99053edec3fc25a5434480d Mon Sep 17 00:00:00 2001 From: Li Jiang Date: Thu, 29 Jun 2023 14:49:07 +0800 Subject: [PATCH 7/7] Fix pylint --- .github/workflows/main.yml | 3 +-- joblibspark/backend.py | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 47e21ed..0db9cb5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -25,8 +25,7 @@ jobs: architecture: x64 - name: Install python packages run: | - pip install joblib==${{ matrix.JOBLIB_VERSION }} scikit-learn>=0.23.1 pytest pylint - pip install pyspark==${{ matrix.PYSPARK_VERSION }} + pip install joblib==${{ matrix.JOBLIB_VERSION }} scikit-learn>=0.23.1 pytest pylint pyspark==${{ matrix.PYSPARK_VERSION }} - name: Run pylint run: | ./run-pylint.sh diff --git a/joblibspark/backend.py b/joblibspark/backend.py index e761c28..87dbfb8 100644 --- a/joblibspark/backend.py +++ b/joblibspark/backend.py @@ -209,6 +209,7 @@ def mapper_fn(_): pass if SafeFunction is None: + # pylint: disable=protected-access,used-before-assignment return self._get_pool().apply_async( PoolManagerMixin._wrap_func_call, (run_on_worker_and_fetch_result,), callback=callback, error_callback=callback