Skip to content

Commit

Permalink
Add df.etl.insert_columns
Browse files Browse the repository at this point in the history
Other: automatically shutdown the processes created by joblib and loky
  • Loading branch information
GianlucaFicarelli committed Apr 4, 2024
1 parent 27969ea commit 62b565d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 10 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
Changelog
=========

Version 0.2.0
-------------

New Features
~~~~~~~~~~~~

- Add ``df.etl.insert_columns()`` to simplify the insertion of multiple columns at once in a DataFrame.

Improvements
~~~~~~~~~~~~

- In ``run_parallel()``, automatically shutdown the processes created by joblib and loky.


Version 0.1.8
-------------

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Scientific/Engineering :: Bio-Informatics",
]
dependencies = [
Expand Down
7 changes: 7 additions & 0 deletions src/blueetl_core/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,13 @@ def iterdict(self) -> Iterator[tuple[dict, dict]]:
):
yield named_index, dict(zip(columns, value))

def insert_columns(self, loc: int, columns: list, values: list) -> None:
"""Insert multiple columns, similar to repeatedly calling DataFrame.insert()."""
if len(columns) != len(values):
raise ValueError("columns and values must have the same length")
for col, val in zip(reversed(columns), reversed(values)):
self._obj.insert(loc, col, val)

def _query_list(self, query_list: list[dict[str, Any]]) -> pd.DataFrame:
"""Given a list of query dicts, return the DataFrame filtered by columns and index."""
return query_frame(self._obj, query_list)
Expand Down
28 changes: 18 additions & 10 deletions src/blueetl_core/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import numpy as np
from joblib import Parallel, delayed
from joblib.externals.loky import get_reusable_executor

from blueetl_core.constants import (
BLUEETL_JOBLIB_BACKEND,
Expand Down Expand Up @@ -73,6 +74,7 @@ def run_parallel(
backend: Optional[str] = None,
verbose: Optional[int] = None,
base_seed: Optional[int] = None,
shutdown_executor: bool = True,
) -> list[Any]:
"""Run tasks in parallel.
Expand All @@ -87,6 +89,7 @@ def run_parallel(
verbose: verbosity of joblib. If not specified, use the BLUEETL_JOBLIB_VERBOSE.
base_seed: initial base seed. If specified, a different seed is added to the task context,
and passed to each callable object.
shutdown_executor: if True and using loky, shutdown the subprocesses before returning.
Returns:
list of objects returned by the callable objects, in the same order.
Expand All @@ -100,15 +103,20 @@ def run_parallel(
jobs = int(jobs_env) if jobs_env else max((os.cpu_count() or 1) // 2, 1)
if not backend:
backend = os.getenv(BLUEETL_JOBLIB_BACKEND)
parallel = Parallel(n_jobs=jobs, backend=backend, verbose=verbose)
return parallel(
delayed(task)(
ctx=TaskContext(
task_id=i,
loglevel=loglevel,
seed=None if base_seed is None else base_seed + i,
ppid=os.getpid(),
try:
parallel = Parallel(n_jobs=jobs, backend=backend, verbose=verbose)
return parallel(
delayed(task)(
ctx=TaskContext(
task_id=i,
loglevel=loglevel,
seed=None if base_seed is None else base_seed + i,
ppid=os.getpid(),
)
)
for i, task in enumerate(tasks)
)
for i, task in enumerate(tasks)
)
finally:
if shutdown_executor and (not backend or backend == "loky"):
# shutdown the pool of processes used by loky
get_reusable_executor().shutdown(wait=True)
21 changes: 21 additions & 0 deletions tests/test_etl_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,27 @@ def test_iterdict(dataframe1):
assert value == {"v0": 0, "v1": 4}


def test_insert_columns(dataframe1):
columns = ["pre0", "pre1", "pre2"]
values = [111, [200, 201, 202, 203], np.nan]
original_columns = list(dataframe1.columns)

dataframe1.etl.insert_columns(loc=0, columns=columns, values=values)

assert_array_equal(dataframe1.columns, columns + original_columns)
assert_array_equal(dataframe1["pre0"], [111] * len(dataframe1))
assert_array_equal(dataframe1["pre1"], [200, 201, 202, 203])
assert_array_equal(dataframe1["pre2"], [np.nan] * len(dataframe1))


def test_insert_columns_raises(dataframe1):
columns = ["pre0", "pre1", "pre2"]
values = [111]

with pytest.raises(ValueError, match="columns and values must have the same length"):
dataframe1.etl.insert_columns(loc=0, columns=columns, values=values)


@pytest.mark.parametrize(
"params, expected_key, expected_df",
[
Expand Down

0 comments on commit 62b565d

Please sign in to comment.