Skip to content

Commit

Permalink
Extended hyperopt to support nested configuration block parameters (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tgaddair authored Sep 6, 2022
1 parent 1c0e607 commit c9a7b77
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 342 deletions.
168 changes: 10 additions & 158 deletions ludwig/hyperopt/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from functools import lru_cache
from inspect import signature
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import ray
from packaging import version
Expand All @@ -28,29 +28,14 @@
from ludwig.backend import initialize_backend, RAY
from ludwig.backend.ray import initialize_ray
from ludwig.callbacks import Callback
from ludwig.constants import (
COLUMN,
COMBINER,
DECODER,
DEFAULTS,
ENCODER,
INPUT_FEATURES,
MAXIMIZE,
OUTPUT_FEATURES,
PREPROCESSING,
TEST,
TRAINER,
TRAINING,
TYPE,
VALIDATION,
)
from ludwig.hyperopt.results import RayTuneResults, TrialResults
from ludwig.constants import MAXIMIZE, TEST, TRAINER, TRAINING, TYPE, VALIDATION
from ludwig.hyperopt.results import HyperoptResults, TrialResults
from ludwig.hyperopt.search_algos import get_search_algorithm
from ludwig.hyperopt.utils import load_json_values
from ludwig.hyperopt.utils import load_json_values, substitute_parameters
from ludwig.modules.metric_modules import get_best_function
from ludwig.utils import metric_utils
from ludwig.utils.data_utils import hash_dict, NumpyEncoder
from ludwig.utils.defaults import default_random_seed
from ludwig.utils.defaults import default_random_seed, merge_with_defaults
from ludwig.utils.fs_utils import has_remote_protocol
from ludwig.utils.misc_utils import get_from_registry

Expand Down Expand Up @@ -435,7 +420,6 @@ def _run_experiment(
checkpoint_dir,
hyperopt_dict,
decode_ctx,
features_eligible_for_shared_params,
is_using_ray_backend=False,
):
for gpu_id in ray.get_gpu_ids():
Expand All @@ -453,9 +437,9 @@ def _run_experiment(
trial_dir = Path(tune.get_trial_dir())
driver_trial_location = ray.util.get_node_ip_address()

modified_config = substitute_parameters(
copy.deepcopy(hyperopt_dict["config"]), config, features_eligible_for_shared_params
)
modified_config = substitute_parameters(copy.deepcopy(hyperopt_dict["config"]), config)

modified_config = merge_with_defaults(modified_config)

hyperopt_dict["config"] = modified_config
hyperopt_dict["experiment_name "] = f'{hyperopt_dict["experiment_name"]}_{trial_id}'
Expand Down Expand Up @@ -648,9 +632,8 @@ def execute(
random_seed=default_random_seed,
debug=False,
hyperopt_log_verbosity=3,
features_eligible_for_shared_params=None,
**kwargs,
) -> RayTuneResults:
) -> HyperoptResults:
if isinstance(dataset, str) and not has_remote_protocol(dataset) and not os.path.isabs(dataset):
dataset = os.path.abspath(dataset)

Expand Down Expand Up @@ -744,7 +727,6 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None):
checkpoint_dir,
local_hyperopt_dict,
self.decode_ctx,
features_eligible_for_shared_params,
_is_ray_backend(backend),
)

Expand Down Expand Up @@ -868,7 +850,7 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None):
logger.warning("No trials reported results; check if time budget lower than epoch latency")
ordered_trials = []

return RayTuneResults(ordered_trials=ordered_trials, experiment_analysis=analysis)
return HyperoptResults(ordered_trials=ordered_trials, experiment_analysis=analysis)


class CallbackStopper(Stopper):
Expand Down Expand Up @@ -905,136 +887,6 @@ def set_values(params: Dict[str, Any], model_dict: Dict[str, Any]):
model_dict[key] = value


def update_features_with_shared_params(
section_dict: Dict[str, Any],
trial_parameters_dict: Dict[str, Dict[str, Any]],
config_feature_group: str = None,
features_eligible_for_shared_params: Dict[str, Dict[str, Set]] = None,
):
"""Updates the parameters of feature_name in section_dict based on hyperopt parameters sampled.
:param section_dict: Underlying config for the specific input/output feature populated with potentially a mix of
default and feature-specific parameters. This may be updated with values from the hyperopt search space.
:type section_dict: dict[str, any]
:param trial_parameters_dict: Config produced by the hyperopt sampler based on the parameter search space. It maps
the name of the feature to the sampled parameters for that feature. For default parameters, it creates
nested dictionaries for each feature type.
:type trial_parameters_dict: dict[str, dict[str, any]]
:param config_feature_group: Indicates whether the feature is an input feature or output feature (can be either of
`input_features` or `output_features`).
:type config_feature_group: str
:param features_eligible_for_shared_params: Collection of names of features that are eligible for using shared
parameters, keyed by `input_features` or `output_features` and then by feature type.
:type features_eligible_for_shared_params: dict[str, dict[str, set]]
"""

feature_name = section_dict.get(COLUMN)
feature_type = section_dict.get(TYPE)

# No default parameters specified in hyperopt parameter search space
if DEFAULTS not in trial_parameters_dict:
return

# This feature type should have a sampled value from the default parameters passed in
if feature_type not in trial_parameters_dict.get(DEFAULTS):
return

# All features in Ludwig config use non-default encoders or decoders
if not features_eligible_for_shared_params:
logger.warning(
"""
Default parameters specified in the hyperopt parameter search space are not being used since features
in Ludwig config are not using default encoders or decoders. You may consider either setting features to
their default encoders or decoders, or specifying feature with encoder specific parameters instead of
defaults in the parameter search space.
"""
)
return

features_eligible_for_shared_params = features_eligible_for_shared_params.get(config_feature_group)

# At least one of this feature's feature type must use non-default encoders/decoders in the config
if feature_type not in features_eligible_for_shared_params:
return

# This feature must use a default encoder/decoder
if feature_name not in features_eligible_for_shared_params.get(feature_type):
return

sampled_default_shared_params = trial_parameters_dict.get(DEFAULTS).get(feature_type)
shared_params_copy = copy.deepcopy(sampled_default_shared_params)

# Remove encoder/decoder from output/input features
if config_feature_group == INPUT_FEATURES:
if DECODER in sampled_default_shared_params:
del shared_params_copy[DECODER]
else:
if ENCODER in sampled_default_shared_params:
del shared_params_copy[ENCODER]
sampled_default_shared_params = shared_params_copy

set_values(sampled_default_shared_params, section_dict)


def update_section_dict(
section_dict: Dict[str, Any], parameter_name: str, trial_parameters_dict: Dict[str, Dict[str, Any]]
):
"""Update a parameter in section config with sampled value from hyperopt."""
if parameter_name not in trial_parameters_dict:
return

params = trial_parameters_dict[parameter_name]
set_values(params, section_dict)


def get_parameters_dict(parameters):
parameters_dict = {}
for name, value in parameters.items():
curr_dict = parameters_dict
name_list = name.split(".")
for i, name_elem in enumerate(name_list):
if i == len(name_list) - 1:
curr_dict[name_elem] = value
else:
name_dict = curr_dict.get(name_elem, {})
curr_dict[name_elem] = name_dict
curr_dict = name_dict
return parameters_dict


def substitute_parameters(
config: Dict[str, Any],
parameters: Dict[str, Any],
features_eligible_for_shared_params: Dict[str, Dict[str, Set]] = None,
):
"""Update Ludwig config with parameters sampled from the Hyperopt sampler."""
parameters_dict = get_parameters_dict(parameters)
for input_feature in config[INPUT_FEATURES]:
# Update shared params
update_features_with_shared_params(
input_feature,
parameters_dict,
config_feature_group=INPUT_FEATURES,
features_eligible_for_shared_params=features_eligible_for_shared_params,
)
# Update or overwrite any feature specific hyperopt params
update_section_dict(input_feature, input_feature[COLUMN], parameters_dict)
for output_feature in config[OUTPUT_FEATURES]:
# Update shared params
update_features_with_shared_params(
output_feature,
parameters_dict,
config_feature_group=OUTPUT_FEATURES,
features_eligible_for_shared_params=features_eligible_for_shared_params,
)
# Update or overwrite any feature specific hyperopt params
update_section_dict(output_feature, output_feature[COLUMN], parameters_dict)
update_section_dict(config[COMBINER], COMBINER, parameters_dict)
update_section_dict(config[TRAINER], TRAINER, parameters_dict)
update_section_dict(config[PREPROCESSING], PREPROCESSING, parameters_dict)
return config


def run_experiment(
config,
parameters=None,
Expand Down
4 changes: 0 additions & 4 deletions ludwig/hyperopt/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,4 @@ class TrialResults:
@dataclass
class HyperoptResults:
ordered_trials: List[TrialResults]


@dataclass
class RayTuneResults(HyperoptResults):
experiment_analysis: ExperimentAnalysis
75 changes: 7 additions & 68 deletions ludwig/hyperopt/run.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import copy
import logging
import os
from collections import defaultdict
from pprint import pformat
from typing import Any, Dict, List, Optional, Set, Union
from typing import List, Optional, Union

import pandas as pd
import yaml
Expand All @@ -12,11 +12,8 @@
from ludwig.callbacks import Callback
from ludwig.constants import (
COMBINED,
DECODER,
ENCODER,
EXECUTOR,
HYPEROPT,
INPUT_FEATURES,
LOSS,
MINIMIZE,
NAME,
Expand All @@ -28,7 +25,7 @@
VALIDATION,
)
from ludwig.data.split import get_splitter
from ludwig.features.feature_registries import input_type_registry, output_type_registry
from ludwig.features.feature_registries import output_type_registry
from ludwig.hyperopt.results import HyperoptResults
from ludwig.hyperopt.utils import print_hyperopt_results, save_hyperopt_stats, should_tune_preprocessing
from ludwig.utils.backward_compatibility import upgrade_to_latest_version
Expand Down Expand Up @@ -188,15 +185,12 @@ def hyperopt(
else:
config_dict = config

# Get mapping of input/output features that don't have an encoder for shared parameters
features_eligible_for_shared_params = {
INPUT_FEATURES: get_features_eligible_for_shared_params(config_dict, INPUT_FEATURES),
OUTPUT_FEATURES: get_features_eligible_for_shared_params(config_dict, OUTPUT_FEATURES),
}

# backwards compatibility
config = upgrade_to_latest_version(config_dict)

# Retain pre-merged config for hyperopt schema generation
premerged_config = copy.deepcopy(config)

# merge config with defaults
config = merge_with_defaults(config)

Expand All @@ -212,12 +206,6 @@ def hyperopt(
logging.info(pformat(hyperopt_config, indent=4))
logging.info("\n")

logging.info(
"Features that may be updated in hyperopt trials if default parameters are specified in the search space"
)
logging.info(pformat(dict(features_eligible_for_shared_params), indent=4))
logging.info("\n")

search_alg = hyperopt_config["search_alg"]
executor = hyperopt_config[EXECUTOR]
parameters = hyperopt_config["parameters"]
Expand Down Expand Up @@ -339,7 +327,7 @@ def hyperopt(
callback.on_hyperopt_start(experiment_name)

hyperopt_results = hyperopt_executor.execute(
config,
premerged_config,
dataset=dataset,
training_set=training_set,
validation_set=validation_set,
Expand All @@ -366,7 +354,6 @@ def hyperopt(
backend=backend,
random_seed=random_seed,
hyperopt_log_verbosity=hyperopt_log_verbosity,
features_eligible_for_shared_params=features_eligible_for_shared_params,
**kwargs,
)

Expand Down Expand Up @@ -410,51 +397,3 @@ def update_hyperopt_params_with_defaults(hyperopt_params):
hyperopt_params[EXECUTOR],
executor_defaults,
)


def get_features_eligible_for_shared_params(
config_dict: Dict[str, Any], config_feature_type: str
) -> Dict[str, Dict[str, Set]]:
"""Generates a mapping of feature type to the corresponding set of features without an encoder or one using the
default encoder for that feature type.
These features may be considered for potential shared parameter search spaces depending on the parameter space
defined later within the hyperopt config. This applies to both config_feature_types (input_features and
output_features). The shared parameters for both config_feature_types must be specified separately.
Note that shared default parameter search spaces are not applied to features with non-default encoders or
non-default decoders, since shared default parameter values should only apply to default modules.
Returns:
Dict of feature type -> set of feature names with that type that are eligible for shared parameters (they use
the default encoder or default decoder).
TODO(#2167): Make sure each feature has a type defined in the JSONSchema for Hyperopt
"""

if config_feature_type not in config_dict:
raise ValueError(f"{config_feature_type} must be defined in Ludwig config.")

features_eligible_for_shared_params = defaultdict(set)

features = config_dict.get(config_feature_type)
feature_registry = input_type_registry if config_feature_type == INPUT_FEATURES else output_type_registry

for feature in features:
if TYPE not in feature:
raise ValueError("Ludwig expects feature types to be defined for each feature within the config.")

feature_schema = get_from_registry(feature.get(TYPE), feature_registry).get_schema_cls()

if config_feature_type == INPUT_FEATURES:
default_encoder = feature_schema().encoder.type
if feature.get(ENCODER, None) and feature.get(ENCODER).get(TYPE, None) != default_encoder:
continue
else:
default_decoder = feature_schema().decoder.type
if feature.get(DECODER, None) and feature.get(DECODER).get(TYPE, None) != default_decoder:
continue

features_eligible_for_shared_params[feature[TYPE]].add(feature[NAME])

return features_eligible_for_shared_params
Loading

0 comments on commit c9a7b77

Please sign in to comment.