Skip to content

Commit

Permalink
feat(pipe): Add PIPE and PipeConfig classes with apply and pipe methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
entelecheia committed Jun 21, 2023
1 parent 9d5f6e7 commit df2c032
Showing 1 changed file with 81 additions and 0 deletions.
81 changes: 81 additions & 0 deletions src/hyfi/pipe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
A class to apply a pipe to a dataframe or a dictionary of dataframes.
"""
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Union

import pandas as pd
from pydantic import BaseModel
from tqdm.auto import tqdm

from hyfi.composer import SpecialKeys
from hyfi.composer.extended import XC
from hyfi.joblib import JobLibConfig
from hyfi.joblib.batch.apply import decorator_apply
from hyfi.utils.logging import Logging

logger = Logging.getLogger(__name__)


class PipeConfig(BaseModel):
"""Pipe Configuration"""

_func_: str = "hyfi.pipe.funcs.data_class_methods"
_method_: str = ""
apply_to: Union[str, List[str]] = None
rcParams: dict = {}
verbose: bool = False

class Config:
arbitrary_types_allowed = True
extra = "allow"


class PIPE:
"""
A class to apply a pipe to a dataframe or a dictionary of dataframes.
"""

@staticmethod
def pipe(data: Any, pipe_config: Dict):
_func_ = pipe_config.get(SpecialKeys.FUNC)
_fn = XC.partial(_func_)
logger.info("Applying pipe: %s", _fn)
return _fn(data, pipe_config)

@staticmethod
def apply(
func: Callable,
series: Union[pd.Series, pd.DataFrame, Sequence, Mapping],
description: Optional[str] = None,
use_batcher: bool = True,
minibatch_size: Optional[int] = None,
num_workers: Optional[int] = None,
**kwargs,
):
batcher_instance = JobLibConfig().__batcher_instance__
if use_batcher and batcher_instance is not None:
batcher_minibatch_size = batcher_instance.minibatch_size
if minibatch_size is None:
minibatch_size = batcher_minibatch_size
if num_workers is not None:
batcher_instance.procs = int(num_workers)
if batcher_instance.procs > 1:
batcher_instance.minibatch_size = min(
int(len(series) / batcher_instance.procs) + 1, minibatch_size
)
logger.info(
f"Using batcher with minibatch size: {batcher_instance.minibatch_size}"
)
results = decorator_apply(
func,
batcher_instance,
description=description, # type: ignore
)(series)
if batcher_instance is not None:
batcher_instance.minibatch_size = batcher_minibatch_size
return results

if batcher_instance is None:
logger.info("Warning: batcher not initialized")
tqdm.pandas(desc=description)
return series.progress_apply(func)

0 comments on commit df2c032

Please sign in to comment.