diff --git a/src/facet/crossfit/_crossfit.py b/src/facet/crossfit/_crossfit.py index c7e3dd838..042ea6a02 100644 --- a/src/facet/crossfit/_crossfit.py +++ b/src/facet/crossfit/_crossfit.py @@ -6,9 +6,12 @@ from abc import ABCMeta from copy import copy from typing import ( + Any, Callable, Container, + Dict, Generic, + Iterable, Iterator, List, NamedTuple, @@ -29,7 +32,7 @@ from pytools.api import AllTracker, inheritdoc from pytools.fit import FittableMixin -from pytools.parallelization import ParallelizableMixin +from pytools.parallelization import Job, JobQueue, JobRunner, ParallelizableMixin from sklearndf import LearnerDF, TransformerDF from sklearndf.pipeline import ( ClassifierPipelineDF, @@ -41,7 +44,7 @@ log = logging.getLogger(__name__) -__all__ = ["LearnerCrossfit", "Scorer"] +__all__ = ["Scorer", "FitResult", "LearnerCrossfit"] # # Type variables @@ -79,13 +82,15 @@ float, ] +FitResult = Tuple[Optional[LearnerPipelineDF], Optional[float]] + # # Class definitions # class _FitScoreParameters(NamedTuple): - pipeline: T_LearnerPipelineDF + pipeline: LearnerPipelineDF # fit parameters train_features: Optional[pd.DataFrame] @@ -215,7 +220,7 @@ def fit(self: T_Self, sample: Sample, **fit_params) -> T_Self: # un-fit this instance so we have a defined state in case of an exception self._reset_fit() - self._fit_score(_sample=sample, **fit_params) + self._run(self._fit_score_queue(_sample=sample, **fit_params)) return self @@ -238,7 +243,9 @@ def score( :return: the resulting scores as a 1d numpy array """ - return self._fit_score(_scoring=scoring, _train_scores=train_scores) + return self._run( + self._fit_score_queue(_scoring=scoring, _train_scores=train_scores) + ) def fit_score( self, @@ -267,8 +274,45 @@ def fit_score( # un-fit this instance so we have a defined state in case of an exception self._reset_fit() - return self._fit_score( - _sample=sample, _scoring=scoring, _train_scores=train_scores, **fit_params + return self._run( + self._fit_score_queue( + _sample=sample, + _scoring=scoring, + _train_scores=train_scores, + **fit_params, + ) + ) + + def fit_score_queue( + self, + sample: Sample, + scoring: Union[str, Callable[[float, float], float], None] = None, + train_scores: bool = False, + **fit_params, + ) -> JobQueue[FitResult, Optional[np.ndarray]]: + """ + Create a :class:`pytools.parallelization.JobQueue` that fits then scores this + crossfit. + + See :meth:`.fit` and :meth:`.score` for details on fitting and scoring. + + :param sample: the sample to fit the estimators to; if the sample + weights these are passed on to the learner and scoring function as + keyword argument ``sample_weight`` + :param fit_params: optional fit parameters, to be passed on to the fit method + of the learner + :param scoring: scoring function to use to score the models + (see :func:`~sklearn.metrics.check_scoring` for details) + :param train_scores: if ``True``, calculate train scores instead of test + scores (default: ``False``) + :return: the job queue + """ + + return self._fit_score_queue( + _sample=sample, + _scoring=scoring, + _train_scores=train_scores, + **fit_params, ) def resize(self: T_Self, n_splits: int) -> T_Self: @@ -318,14 +362,14 @@ def models(self) -> Iterator[T_LearnerPipelineDF]: return iter(self._model_by_split) # noinspection PyPep8Naming - def _fit_score( + def _fit_score_queue( self, _sample: Optional[Sample] = None, _scoring: Union[str, Callable[[float, float], float], None] = __NO_SCORING, _train_scores: bool = False, sample_weight: pd.Series = None, **fit_params, - ) -> Optional[np.ndarray]: + ) -> JobQueue[FitResult, Optional[np.ndarray]]: if sample_weight is not None: raise ValueError( @@ -423,35 +467,69 @@ def _generate_parameters() -> Iterator[_FitScoreParameters]: ), ) - with self._parallel() as parallel: - model_and_score_by_split: List[ - Tuple[T_LearnerPipelineDF, Optional[float]] - ] = parallel( - self._delayed(LearnerCrossfit._fit_and_score_model_for_split)( - parameters, **fit_params + crossfit = self + + @inheritdoc(match="""[see superclass]""") + class _FitScoreQueue(JobQueue[FitResult, Optional[np.ndarray]]): + def jobs(self) -> Iterable[Job[FitResult]]: + """[see superclass]""" + return ( + _FitAndScoreModelForSplit(parameters, fit_params) + for parameters in _generate_parameters() ) - for parameters in _generate_parameters() - ) - model_by_split, scores = zip(*model_and_score_by_split) + def start(self) -> None: + """ + Un-fit the crossfit associated with this queue, if we are fitting. + """ + if do_fit: + crossfit._reset_fit() - if do_fit: - self._splits = splits - self._model_by_split = model_by_split - self._sample = _sample + def collate(self, job_results: List[FitResult]) -> Optional[np.ndarray]: + """[see superclass]""" + model_by_split, scores = zip(*job_results) + + if do_fit: + crossfit._splits = splits + crossfit._model_by_split = model_by_split + crossfit._sample = _sample + + return np.array(scores) if do_score else None + + def __len__(self) -> int: + return len(splits) - return np.array(scores) if do_score else None + return _FitScoreQueue() + + def _run( + self, queue: JobQueue[FitResult, Optional[np.ndarray]] + ) -> Optional[np.ndarray]: + return JobRunner.from_parallelizable(self).run_queue(queue) def _reset_fit(self) -> None: self._sample = None self._splits = None self._model_by_split = None - # noinspection PyPep8Naming - @staticmethod - def _fit_and_score_model_for_split( - parameters: _FitScoreParameters, **fit_params - ) -> Tuple[Optional[T_LearnerPipelineDF], Optional[float]]: + def __len__(self) -> int: + return self.n_splits_ + + +class _FitAndScoreModelForSplit(Job[FitResult]): + def __init__( + self, parameters: _FitScoreParameters, fit_params: Dict[str, Any] + ) -> None: + self.parameters = parameters + self.fit_params = fit_params + + def run(self) -> FitResult: + """ + Fit and/or score a learner pipeline. + + :return: a tuple with the the fitted pipeline and the score + """ + parameters = self.parameters + do_fit = parameters.train_target is not None do_score = parameters.scorer is not None @@ -463,7 +541,7 @@ def _fit_and_score_model_for_split( y=parameters.train_target, feature_sequence=parameters.train_feature_sequence, sample_weight=parameters.train_weight, - **fit_params, + **self.fit_params, ) else: @@ -496,8 +574,5 @@ def _fit_and_score_model_for_split( return pipeline if do_fit else None, score - def __len__(self) -> int: - return self.n_splits_ - __tracker.validate() diff --git a/src/facet/inspection/_shap.py b/src/facet/inspection/_shap.py index e77e0e491..b23de4804 100644 --- a/src/facet/inspection/_shap.py +++ b/src/facet/inspection/_shap.py @@ -11,7 +11,7 @@ from pytools.api import AllTracker, inheritdoc from pytools.fit import FittableMixin -from pytools.parallelization import ParallelizableMixin +from pytools.parallelization import Job, JobRunner, ParallelizableMixin from sklearndf.pipeline import ( ClassifierPipelineDF, LearnerPipelineDF, @@ -214,9 +214,11 @@ def _get_shap_all_splits( else: background_dataset = None - with self._parallel() as parallel: - shap_df_per_split: List[pd.DataFrame] = parallel( - self._delayed(self._get_shap_for_split)( + shap_df_per_split: List[pd.DataFrame] = JobRunner.from_parallelizable( + self + ).run_jobs( + *( + Job.delayed(self._get_shap_for_split)( model, sample, self._explainer_factory.make_explainer( @@ -253,6 +255,7 @@ def _get_shap_all_splits( ), ) ) + ) return self._concatenate_splits(shap_df_per_split=shap_df_per_split) diff --git a/src/facet/selection/_selection.py b/src/facet/selection/_selection.py index a60402fbf..c4f1f1d2f 100644 --- a/src/facet/selection/_selection.py +++ b/src/facet/selection/_selection.py @@ -32,7 +32,7 @@ from pytools.api import AllTracker, inheritdoc, to_tuple from pytools.fit import FittableMixin -from pytools.parallelization import ParallelizableMixin +from pytools.parallelization import JobRunner, ParallelizableMixin from sklearndf.pipeline import ( ClassifierPipelineDF, LearnerPipelineDF, @@ -479,15 +479,19 @@ def _rank_learners( ) -> List[LearnerEvaluation[T_LearnerPipelineDF]]: ranking_scorer = self.ranking_scorer - configurations: Iterable[Tuple[T_LearnerPipelineDF, Dict[str, Any]]] = ( - ( - cast(T_LearnerPipelineDF, grid.pipeline.clone()).set_params( - **parameters - ), - parameters, + pipelines: Iterable[T_LearnerPipelineDF] + pipelines_parameters: Iterable[Dict[str, Any]] + pipelines, pipelines_parameters = zip( + *( + ( + cast(T_LearnerPipelineDF, grid.pipeline.clone()).set_params( + **parameters + ), + parameters, + ) + for grid in self.grids + for parameters in grid ) - for grid in self.grids - for parameters in grid ) ranking: List[LearnerEvaluation[T_LearnerPipelineDF]] = [] @@ -496,8 +500,8 @@ def _rank_learners( scoring_name = self.scoring_name - for pipeline, parameters in configurations: - crossfit = LearnerCrossfit( + crossfits = [ + LearnerCrossfit( pipeline=pipeline, cv=self.cv, shuffle_features=self.shuffle_features, @@ -507,17 +511,29 @@ def _rank_learners( pre_dispatch=self.pre_dispatch, verbose=self.verbose, ) + for pipeline in pipelines + ] - pipeline_scoring: np.ndarray = crossfit.fit_score( - sample=sample, scoring=self.scoring, **fit_params - ) + queues = ( + crossfit.fit_score_queue(sample=sample, scoring=self.scoring, **fit_params) + for crossfit in crossfits + ) - ranking_score = ranking_scorer(pipeline_scoring) + pipeline_scorings: List[np.ndarray] = list( + JobRunner.from_parallelizable(self).run_queues(*queues) + ) + for crossfit, pipeline_parameters, pipeline_scoring in zip( + crossfits, pipelines_parameters, pipeline_scorings + ): + + ranking_score = ranking_scorer(pipeline_scoring) + crossfit_pipeline = crossfit.pipeline + assert crossfit_pipeline.is_fitted ranking.append( LearnerEvaluation( - pipeline=pipeline, - parameters=parameters, + pipeline=crossfit_pipeline, + parameters=pipeline_parameters, scoring_name=scoring_name, scores=pipeline_scoring, ranking_score=ranking_score, @@ -532,9 +548,7 @@ def _rank_learners( return ranking -def _learner_type( - pipeline: T_LearnerPipelineDF, -) -> Type[Union[RegressorPipelineDF, ClassifierPipelineDF]]: +def _learner_type(pipeline: T_LearnerPipelineDF) -> Type[T_LearnerPipelineDF]: # determine whether a learner pipeline fits a regressor or a classifier for learner_type in [RegressorPipelineDF, ClassifierPipelineDF]: if isinstance(pipeline, learner_type): diff --git a/src/facet/simulation/_simulation.py b/src/facet/simulation/_simulation.py index 57e02eba7..7f48fa89d 100644 --- a/src/facet/simulation/_simulation.py +++ b/src/facet/simulation/_simulation.py @@ -20,7 +20,7 @@ import pandas as pd from pytools.api import AllTracker, inheritdoc -from pytools.parallelization import ParallelizableMixin +from pytools.parallelization import Job, JobRunner, ParallelizableMixin from sklearndf import LearnerDF from sklearndf.pipeline import ( ClassifierPipelineDF, @@ -341,9 +341,9 @@ def simulate_actuals(self) -> pd.Series: sample = self.crossfit.sample_ y_mean = self.expected_output() - with self._parallel() as parallel: - result: List[float] = parallel( - self._delayed(self._simulate_actuals)( + result: List[float] = JobRunner.from_parallelizable(self).run_jobs( + *( + Job.delayed(self._simulate_actuals)( model, subsample.features, y_mean, self._simulate ) for (model, (_, test_indices)) in zip( @@ -351,6 +351,7 @@ def simulate_actuals(self) -> pd.Series: ) for subsample in (sample.subsample(iloc=test_indices),) ) + ) return pd.Series(data=result, name=COL_OUTPUT).rename_axis(index=IDX_SPLIT) @@ -410,9 +411,11 @@ def _simulate_feature_with_values( if feature_name not in sample.features.columns: raise ValueError(f"Feature '{feature_name}' not in sample") - with self._parallel() as parallel: - simulation_results_per_split: List[np.ndarray] = parallel( - self._delayed(UnivariateUpliftSimulator._simulate_values_for_split)( + simulation_results_per_split: List[np.ndarray] = JobRunner.from_parallelizable( + self + ).run_jobs( + *( + Job.delayed(UnivariateUpliftSimulator._simulate_values_for_split)( model=model, subsample=sample.subsample(iloc=test_indices), feature_name=feature_name, @@ -423,6 +426,7 @@ def _simulate_feature_with_values( self.crossfit.models(), self.crossfit.splits() ) ) + ) return pd.DataFrame( simulation_results_per_split, columns=simulation_values diff --git a/test/test/facet/test_inspection.py b/test/test/facet/test_inspection.py index 41f167068..679c86d19 100644 --- a/test/test/facet/test_inspection.py +++ b/test/test/facet/test_inspection.py @@ -141,7 +141,7 @@ def test_model_inspection( assert shap_values_mean.columns.names == [Sample.IDX_FEATURE] assert shap_values_std.index.names == [Sample.IDX_OBSERVATION] assert shap_values_std.columns.names == [Sample.IDX_FEATURE] - assert shap_values_raw.index.names == (["split", "observation"]) + assert shap_values_raw.index.names == ["split", "observation"] assert shap_values_raw.columns.names == [Sample.IDX_FEATURE] # column index