Skip to content

Commit

Permalink
REFACTOR: update parallelization code to use new Job classes (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
j-ittner authored Jan 27, 2021
1 parent d726cb1 commit 8f75c78
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 64 deletions.
139 changes: 107 additions & 32 deletions src/facet/crossfit/_crossfit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
from abc import ABCMeta
from copy import copy
from typing import (
Any,
Callable,
Container,
Dict,
Generic,
Iterable,
Iterator,
List,
NamedTuple,
Expand All @@ -29,7 +32,7 @@

from pytools.api import AllTracker, inheritdoc
from pytools.fit import FittableMixin
from pytools.parallelization import ParallelizableMixin
from pytools.parallelization import Job, JobQueue, JobRunner, ParallelizableMixin
from sklearndf import LearnerDF, TransformerDF
from sklearndf.pipeline import (
ClassifierPipelineDF,
Expand All @@ -41,7 +44,7 @@

log = logging.getLogger(__name__)

__all__ = ["LearnerCrossfit", "Scorer"]
__all__ = ["Scorer", "FitResult", "LearnerCrossfit"]

#
# Type variables
Expand Down Expand Up @@ -79,13 +82,15 @@
float,
]

FitResult = Tuple[Optional[LearnerPipelineDF], Optional[float]]

#
# Class definitions
#


class _FitScoreParameters(NamedTuple):
pipeline: T_LearnerPipelineDF
pipeline: LearnerPipelineDF

# fit parameters
train_features: Optional[pd.DataFrame]
Expand Down Expand Up @@ -215,7 +220,7 @@ def fit(self: T_Self, sample: Sample, **fit_params) -> T_Self:
# un-fit this instance so we have a defined state in case of an exception
self._reset_fit()

self._fit_score(_sample=sample, **fit_params)
self._run(self._fit_score_queue(_sample=sample, **fit_params))

return self

Expand All @@ -238,7 +243,9 @@ def score(
:return: the resulting scores as a 1d numpy array
"""

return self._fit_score(_scoring=scoring, _train_scores=train_scores)
return self._run(
self._fit_score_queue(_scoring=scoring, _train_scores=train_scores)
)

def fit_score(
self,
Expand Down Expand Up @@ -267,8 +274,45 @@ def fit_score(
# un-fit this instance so we have a defined state in case of an exception
self._reset_fit()

return self._fit_score(
_sample=sample, _scoring=scoring, _train_scores=train_scores, **fit_params
return self._run(
self._fit_score_queue(
_sample=sample,
_scoring=scoring,
_train_scores=train_scores,
**fit_params,
)
)

def fit_score_queue(
self,
sample: Sample,
scoring: Union[str, Callable[[float, float], float], None] = None,
train_scores: bool = False,
**fit_params,
) -> JobQueue[FitResult, Optional[np.ndarray]]:
"""
Create a :class:`pytools.parallelization.JobQueue` that fits then scores this
crossfit.
See :meth:`.fit` and :meth:`.score` for details on fitting and scoring.
:param sample: the sample to fit the estimators to; if the sample
weights these are passed on to the learner and scoring function as
keyword argument ``sample_weight``
:param fit_params: optional fit parameters, to be passed on to the fit method
of the learner
:param scoring: scoring function to use to score the models
(see :func:`~sklearn.metrics.check_scoring` for details)
:param train_scores: if ``True``, calculate train scores instead of test
scores (default: ``False``)
:return: the job queue
"""

return self._fit_score_queue(
_sample=sample,
_scoring=scoring,
_train_scores=train_scores,
**fit_params,
)

def resize(self: T_Self, n_splits: int) -> T_Self:
Expand Down Expand Up @@ -318,14 +362,14 @@ def models(self) -> Iterator[T_LearnerPipelineDF]:
return iter(self._model_by_split)

# noinspection PyPep8Naming
def _fit_score(
def _fit_score_queue(
self,
_sample: Optional[Sample] = None,
_scoring: Union[str, Callable[[float, float], float], None] = __NO_SCORING,
_train_scores: bool = False,
sample_weight: pd.Series = None,
**fit_params,
) -> Optional[np.ndarray]:
) -> JobQueue[FitResult, Optional[np.ndarray]]:

if sample_weight is not None:
raise ValueError(
Expand Down Expand Up @@ -423,35 +467,69 @@ def _generate_parameters() -> Iterator[_FitScoreParameters]:
),
)

with self._parallel() as parallel:
model_and_score_by_split: List[
Tuple[T_LearnerPipelineDF, Optional[float]]
] = parallel(
self._delayed(LearnerCrossfit._fit_and_score_model_for_split)(
parameters, **fit_params
crossfit = self

@inheritdoc(match="""[see superclass]""")
class _FitScoreQueue(JobQueue[FitResult, Optional[np.ndarray]]):
def jobs(self) -> Iterable[Job[FitResult]]:
"""[see superclass]"""
return (
_FitAndScoreModelForSplit(parameters, fit_params)
for parameters in _generate_parameters()
)
for parameters in _generate_parameters()
)

model_by_split, scores = zip(*model_and_score_by_split)
def start(self) -> None:
"""
Un-fit the crossfit associated with this queue, if we are fitting.
"""
if do_fit:
crossfit._reset_fit()

if do_fit:
self._splits = splits
self._model_by_split = model_by_split
self._sample = _sample
def collate(self, job_results: List[FitResult]) -> Optional[np.ndarray]:
"""[see superclass]"""
model_by_split, scores = zip(*job_results)

if do_fit:
crossfit._splits = splits
crossfit._model_by_split = model_by_split
crossfit._sample = _sample

return np.array(scores) if do_score else None

def __len__(self) -> int:
return len(splits)

return np.array(scores) if do_score else None
return _FitScoreQueue()

def _run(
self, queue: JobQueue[FitResult, Optional[np.ndarray]]
) -> Optional[np.ndarray]:
return JobRunner.from_parallelizable(self).run_queue(queue)

def _reset_fit(self) -> None:
self._sample = None
self._splits = None
self._model_by_split = None

# noinspection PyPep8Naming
@staticmethod
def _fit_and_score_model_for_split(
parameters: _FitScoreParameters, **fit_params
) -> Tuple[Optional[T_LearnerPipelineDF], Optional[float]]:
def __len__(self) -> int:
return self.n_splits_


class _FitAndScoreModelForSplit(Job[FitResult]):
def __init__(
self, parameters: _FitScoreParameters, fit_params: Dict[str, Any]
) -> None:
self.parameters = parameters
self.fit_params = fit_params

def run(self) -> FitResult:
"""
Fit and/or score a learner pipeline.
:return: a tuple with the the fitted pipeline and the score
"""
parameters = self.parameters

do_fit = parameters.train_target is not None
do_score = parameters.scorer is not None

Expand All @@ -463,7 +541,7 @@ def _fit_and_score_model_for_split(
y=parameters.train_target,
feature_sequence=parameters.train_feature_sequence,
sample_weight=parameters.train_weight,
**fit_params,
**self.fit_params,
)

else:
Expand Down Expand Up @@ -496,8 +574,5 @@ def _fit_and_score_model_for_split(

return pipeline if do_fit else None, score

def __len__(self) -> int:
return self.n_splits_


__tracker.validate()
11 changes: 7 additions & 4 deletions src/facet/inspection/_shap.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from pytools.api import AllTracker, inheritdoc
from pytools.fit import FittableMixin
from pytools.parallelization import ParallelizableMixin
from pytools.parallelization import Job, JobRunner, ParallelizableMixin
from sklearndf.pipeline import (
ClassifierPipelineDF,
LearnerPipelineDF,
Expand Down Expand Up @@ -214,9 +214,11 @@ def _get_shap_all_splits(
else:
background_dataset = None

with self._parallel() as parallel:
shap_df_per_split: List[pd.DataFrame] = parallel(
self._delayed(self._get_shap_for_split)(
shap_df_per_split: List[pd.DataFrame] = JobRunner.from_parallelizable(
self
).run_jobs(
*(
Job.delayed(self._get_shap_for_split)(
model,
sample,
self._explainer_factory.make_explainer(
Expand Down Expand Up @@ -253,6 +255,7 @@ def _get_shap_all_splits(
),
)
)
)

return self._concatenate_splits(shap_df_per_split=shap_df_per_split)

Expand Down
54 changes: 34 additions & 20 deletions src/facet/selection/_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from pytools.api import AllTracker, inheritdoc, to_tuple
from pytools.fit import FittableMixin
from pytools.parallelization import ParallelizableMixin
from pytools.parallelization import JobRunner, ParallelizableMixin
from sklearndf.pipeline import (
ClassifierPipelineDF,
LearnerPipelineDF,
Expand Down Expand Up @@ -479,15 +479,19 @@ def _rank_learners(
) -> List[LearnerEvaluation[T_LearnerPipelineDF]]:
ranking_scorer = self.ranking_scorer

configurations: Iterable[Tuple[T_LearnerPipelineDF, Dict[str, Any]]] = (
(
cast(T_LearnerPipelineDF, grid.pipeline.clone()).set_params(
**parameters
),
parameters,
pipelines: Iterable[T_LearnerPipelineDF]
pipelines_parameters: Iterable[Dict[str, Any]]
pipelines, pipelines_parameters = zip(
*(
(
cast(T_LearnerPipelineDF, grid.pipeline.clone()).set_params(
**parameters
),
parameters,
)
for grid in self.grids
for parameters in grid
)
for grid in self.grids
for parameters in grid
)

ranking: List[LearnerEvaluation[T_LearnerPipelineDF]] = []
Expand All @@ -496,8 +500,8 @@ def _rank_learners(

scoring_name = self.scoring_name

for pipeline, parameters in configurations:
crossfit = LearnerCrossfit(
crossfits = [
LearnerCrossfit(
pipeline=pipeline,
cv=self.cv,
shuffle_features=self.shuffle_features,
Expand All @@ -507,17 +511,29 @@ def _rank_learners(
pre_dispatch=self.pre_dispatch,
verbose=self.verbose,
)
for pipeline in pipelines
]

pipeline_scoring: np.ndarray = crossfit.fit_score(
sample=sample, scoring=self.scoring, **fit_params
)
queues = (
crossfit.fit_score_queue(sample=sample, scoring=self.scoring, **fit_params)
for crossfit in crossfits
)

ranking_score = ranking_scorer(pipeline_scoring)
pipeline_scorings: List[np.ndarray] = list(
JobRunner.from_parallelizable(self).run_queues(*queues)
)

for crossfit, pipeline_parameters, pipeline_scoring in zip(
crossfits, pipelines_parameters, pipeline_scorings
):

ranking_score = ranking_scorer(pipeline_scoring)
crossfit_pipeline = crossfit.pipeline
assert crossfit_pipeline.is_fitted
ranking.append(
LearnerEvaluation(
pipeline=pipeline,
parameters=parameters,
pipeline=crossfit_pipeline,
parameters=pipeline_parameters,
scoring_name=scoring_name,
scores=pipeline_scoring,
ranking_score=ranking_score,
Expand All @@ -532,9 +548,7 @@ def _rank_learners(
return ranking


def _learner_type(
pipeline: T_LearnerPipelineDF,
) -> Type[Union[RegressorPipelineDF, ClassifierPipelineDF]]:
def _learner_type(pipeline: T_LearnerPipelineDF) -> Type[T_LearnerPipelineDF]:
# determine whether a learner pipeline fits a regressor or a classifier
for learner_type in [RegressorPipelineDF, ClassifierPipelineDF]:
if isinstance(pipeline, learner_type):
Expand Down
Loading

0 comments on commit 8f75c78

Please sign in to comment.