Skip to content

Commit

Permalink
FIX: get rid of np.array_split on pd series future deprecation warning
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaak committed Apr 20, 2024
1 parent e43e6ed commit 9cd4177
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
7 changes: 4 additions & 3 deletions emm/indexing/pandas_normalized_tfidf.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ def transform_parallel(self, X: pd.Series | pd.DataFrame, n_jobs: int = -1) -> s
if effective_n_jobs(n_jobs) == 1:
return self.transform(X=X)

chunk_size = int(np.ceil(len(X) / effective_n_jobs(n_jobs)))
X_chunks = [X.iloc[i : i + chunk_size] for i in range(0, len(X), chunk_size)]

transform_splits = Parallel(n_jobs=n_jobs, backend="threading")(
delayed(self.transform)(X_split)
for X_split in np.array_split(X, effective_n_jobs(n_jobs))
if len(X_split) > 0
delayed(self.transform)(X_split) for X_split in X_chunks if len(X_split) > 0
)
return sp.vstack(transform_splits)
6 changes: 3 additions & 3 deletions emm/preprocessing/pandas_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from functools import partial
from typing import Any, Callable, Mapping

import numpy as np
import pandas as pd
from sklearn.base import TransformerMixin

Expand Down Expand Up @@ -109,10 +108,11 @@ def _spark_apply_steps(
) -> pd.Series:
# Remark: 'chunk_size' is not the same as 'partition_size'
# because here we just do name preprocessing and that can be done with much larger partitions
# than 'partition_size' that is designed to handle the fact that cosine similarity creates 10 times more data after the candidate generation
# than 'partition_size' that is designed to handle the fact that cosine similarity creates 10 times more data
# after the candidate generation

with Timer("PandasPreprocessor._spark_apply_steps") as timer:
X_chunks = np.array_split(series, (len(series) + chunk_size - 1) // chunk_size)
X_chunks = [series.iloc[i : i + chunk_size] for i in range(0, len(series), chunk_size)]
sc = self.spark_session.sparkContext
rdd = sc.parallelize(X_chunks, len(X_chunks))

Expand Down

0 comments on commit 9cd4177

Please sign in to comment.