From 9cd4177b1d91e8a976491f41417f32a9129c6c1f Mon Sep 17 00:00:00 2001 From: mbaak Date: Sat, 20 Apr 2024 08:52:21 +0200 Subject: [PATCH] FIX: get rid of np.array_split on pd series future deprecation warning https://github.com/ing-bank/EntityMatchingModel/issues/9 --- emm/indexing/pandas_normalized_tfidf.py | 7 ++++--- emm/preprocessing/pandas_preprocessor.py | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/emm/indexing/pandas_normalized_tfidf.py b/emm/indexing/pandas_normalized_tfidf.py index 082e4e5..ed22bfc 100644 --- a/emm/indexing/pandas_normalized_tfidf.py +++ b/emm/indexing/pandas_normalized_tfidf.py @@ -162,9 +162,10 @@ def transform_parallel(self, X: pd.Series | pd.DataFrame, n_jobs: int = -1) -> s if effective_n_jobs(n_jobs) == 1: return self.transform(X=X) + chunk_size = int(np.ceil(len(X) / effective_n_jobs(n_jobs))) + X_chunks = [X.iloc[i : i + chunk_size] for i in range(0, len(X), chunk_size)] + transform_splits = Parallel(n_jobs=n_jobs, backend="threading")( - delayed(self.transform)(X_split) - for X_split in np.array_split(X, effective_n_jobs(n_jobs)) - if len(X_split) > 0 + delayed(self.transform)(X_split) for X_split in X_chunks if len(X_split) > 0 ) return sp.vstack(transform_splits) diff --git a/emm/preprocessing/pandas_preprocessor.py b/emm/preprocessing/pandas_preprocessor.py index 12eaef1..3ab04ab 100644 --- a/emm/preprocessing/pandas_preprocessor.py +++ b/emm/preprocessing/pandas_preprocessor.py @@ -22,7 +22,6 @@ from functools import partial from typing import Any, Callable, Mapping -import numpy as np import pandas as pd from sklearn.base import TransformerMixin @@ -109,10 +108,11 @@ def _spark_apply_steps( ) -> pd.Series: # Remark: 'chunk_size' is not the same as 'partition_size' # because here we just do name preprocessing and that can be done with much larger partitions - # than 'partition_size' that is designed to handle the fact that cosine similarity creates 10 times more data after the candidate generation + # than 'partition_size' that is designed to handle the fact that cosine similarity creates 10 times more data + # after the candidate generation with Timer("PandasPreprocessor._spark_apply_steps") as timer: - X_chunks = np.array_split(series, (len(series) + chunk_size - 1) // chunk_size) + X_chunks = [series.iloc[i : i + chunk_size] for i in range(0, len(series), chunk_size)] sc = self.spark_session.sparkContext rdd = sc.parallelize(X_chunks, len(X_chunks))