Skip to content

Commit

Permalink
feat: Adding feature type shared parameter capability for hyperopt (#…
Browse files Browse the repository at this point in the history
…2133)

* Added method to track eligible input features for shared parameters

* Passing shared params features into _run_experiment

* Refactor

* Almost working shared params

* Completely working shared params

* Aligning variable names for consistency

* Refactor logic for updating shared params to be more readable

* Added docstrings

* formatting

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Removing unnecessary import of unittest.mock.DEFAULT from constants.py

* Added integration test for shared params hyperopt

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Adding type hints to function definitions

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Added optional types

* Adding better exception and using constants

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Using tempfile to remove cleanup

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Added support for output features

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* removing nested function

* Resolving merge conflict

* Fix docstring typo

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Adding empty lists as defaults for input/output feature config

* Added support incase default encoders/decoders are specified in config

* Fix keyword arg in execute

* Adding conditional check for tests that call execute instead of hyperopt directly

* Fixed failing test because of Ray 1.13

* Fixing key type issue based on Ray version

* Simplifying and improving readability

* Renaming tests for clarity

* Added type hints to substitute_parameters

* Reorder guard clauses for correct sequential ordering of checks

* Refactoring tests two split into two to test each act separately

* additional checks to short circuit more quickly

* More refactoring

* Changing hyperopt search space to use user-level parameters

* Adding markers

* Resolving parameter names to be non-private attributes in tests

* Force push to fix failing tests

* Add vocab_size back as parameter for tests and make sample options smaller values

* Refactoring tests to run more efficiently

* Changing conditional ray check

* Update to apply shared defaults to both input and output features without distinction

* Fix typo causing tests to fail

* Fix typo causing test to fail

* Better value error message

* Simplfying functions

* Fixed type hints

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
arnavgarg1 and pre-commit-ci[bot] authored Jun 29, 2022
1 parent 76055a2 commit 236cf54
Show file tree
Hide file tree
Showing 6 changed files with 410 additions and 53 deletions.
8 changes: 8 additions & 0 deletions ludwig/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
INPUT_FEATURES = "input_features"
OUTPUT_FEATURES = "output_features"

BINARY = "binary"
CATEGORY = "category"
INT = "int"
Expand Down Expand Up @@ -150,6 +153,11 @@

COMBINER = "combiner"

ENCODER = "encoder"
DECODER = "decoder"

DEFAULTS = "defaults"

BALANCE_PERCENTAGE_TOLERANCE = 0.03
IMBALANCE_DETECTION_RATIO = 0.05

Expand Down
2 changes: 1 addition & 1 deletion ludwig/features/sequence_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def add_feature_data(


class SequenceInputFeature(SequenceFeatureMixin, InputFeature):
encoder = "embed"
encoder = "parallel_cnn"
max_sequence_length = None

def __init__(self, feature, encoder_obj=None):
Expand Down
169 changes: 146 additions & 23 deletions ludwig/hyperopt/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from functools import lru_cache
from inspect import signature
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union

import ray
from packaging import version
Expand All @@ -27,7 +27,20 @@
from ludwig.api import LudwigModel
from ludwig.backend import initialize_backend, RAY
from ludwig.callbacks import Callback
from ludwig.constants import COLUMN, MAXIMIZE, TEST, TRAINER, TRAINING, TYPE, VALIDATION
from ludwig.constants import (
COLUMN,
COMBINER,
DEFAULTS,
INPUT_FEATURES,
MAXIMIZE,
OUTPUT_FEATURES,
PREPROCESSING,
TEST,
TRAINER,
TRAINING,
TYPE,
VALIDATION,
)
from ludwig.hyperopt.results import RayTuneResults, TrialResults
from ludwig.hyperopt.search_algos import get_search_algorithm
from ludwig.hyperopt.utils import load_json_values
Expand Down Expand Up @@ -418,19 +431,30 @@ def _evaluate_best_model(
f"{traceback.format_exc()}"
)

def _run_experiment(self, config, checkpoint_dir, hyperopt_dict, decode_ctx, is_using_ray_backend=False):
def _run_experiment(
self,
config,
checkpoint_dir,
hyperopt_dict,
decode_ctx,
features_eligible_for_shared_params,
is_using_ray_backend=False,
):
for gpu_id in ray.get_gpu_ids():
# Previous trial may not have freed its memory yet, so wait to avoid OOM
wait_for_gpu(gpu_id)

# Some config values may be JSON encoded as strings, so decode them here
config = self.decode_values(config, decode_ctx)

trial_id = tune.get_trial_id()
modified_config = substitute_parameters(copy.deepcopy(hyperopt_dict["config"]), config)

trial_dir = Path(tune.get_trial_dir())
driver_trial_location = ray.util.get_node_ip_address()

modified_config = substitute_parameters(
copy.deepcopy(hyperopt_dict["config"]), config, features_eligible_for_shared_params
)

hyperopt_dict["config"] = modified_config
hyperopt_dict["experiment_name "] = f'{hyperopt_dict["experiment_name"]}_{trial_id}'
hyperopt_dict["output_directory"] = str(trial_dir)
Expand Down Expand Up @@ -622,6 +646,7 @@ def execute(
random_seed=default_random_seed,
debug=False,
hyperopt_log_verbosity=3,
features_eligible_for_shared_params=None,
**kwargs,
) -> RayTuneResults:
if isinstance(dataset, str) and not has_remote_protocol(dataset) and not os.path.isabs(dataset):
Expand Down Expand Up @@ -713,7 +738,12 @@ def execute(

def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None):
return self._run_experiment(
config, checkpoint_dir, local_hyperopt_dict, self.decode_ctx, _is_ray_backend(backend)
config,
checkpoint_dir,
local_hyperopt_dict,
self.decode_ctx,
features_eligible_for_shared_params,
_is_ray_backend(backend),
)

tune_config = {}
Expand Down Expand Up @@ -862,15 +892,86 @@ def get_build_hyperopt_executor(executor_type):
executor_registry = {"ray": RayTuneExecutor}


def set_values(model_dict, name, parameters_dict):
if name in parameters_dict:
params = parameters_dict[name]
for key, value in params.items():
if isinstance(value, dict):
for sub_key, sub_value in value.items():
model_dict[key][sub_key] = sub_value
else:
model_dict[key] = value
def set_values(params: Dict[str, Any], model_dict: Dict[str, Any]):
for key, value in params.items():
if isinstance(value, dict):
for sub_key, sub_value in value.items():
if key not in model_dict:
model_dict[key] = dict()
model_dict[key][sub_key] = sub_value
else:
model_dict[key] = value


def update_features_with_shared_params(
section_dict: Dict[str, Any],
trial_parameters_dict: Dict[str, Dict[str, Any]],
config_feature_group: str = None,
features_eligible_for_shared_params: Dict[str, Dict[str, Set]] = None,
):
"""Updates the parameters of feature_name in section_dict based on hyperopt parameters sampled.
:param section_dict: Underlying config for the specific input/output feature populated with potentially a mix of
default and feature-specific parameters. This may be updated with values from the hyperopt search space.
:type section_dict: dict[str, any]
:param trial_parameters_dict: Config produced by the hyperopt sampler based on the parameter search space. It maps
the name of the feature to the sampled parameters for that feature. For default parameters, it creates
nested dictionaries for each feature type.
:type trial_parameters_dict: dict[str, dict[str, any]]
:param config_feature_group: Indicates whether the feature is an input feature or output feature (can be either of
`input_features` or `output_features`).
:type config_feature_group: str
:param features_eligible_for_shared_params: Collection of names of features that are eligible for using shared
parameters, keyed by `input_features` or `output_features` and then by feature type.
:type features_eligible_for_shared_params: dict[str, dict[str, set]]
"""

feature_name = section_dict.get(COLUMN)
feature_type = section_dict.get(TYPE)

# No default parameters specified in hyperopt parameter search space
if DEFAULTS not in trial_parameters_dict:
return

# This feature type should have a sampled value from the default parameters passed in
if feature_type not in trial_parameters_dict.get(DEFAULTS):
return

# All features in Ludwig config use non-default encoders or decoders
if not features_eligible_for_shared_params:
logger.warning(
"""
Default parameters specified in the hyperopt parameter search space are not being used since features
in Ludwig config are not using default encoders or decoders. You may consider either setting features to
their default encoders or decoders, or specifying feature with encoder specific parameters instead of
defaults in the parameter search space.
"""
)
return

features_eligible_for_shared_params = features_eligible_for_shared_params.get(config_feature_group)

# At least one of this feature's feature type must use non-default encoders/decoders in the config
if feature_type not in features_eligible_for_shared_params:
return

# This feature must use a default encoder/decoder
if feature_name not in features_eligible_for_shared_params.get(feature_type):
return

sampled_default_shared_params = trial_parameters_dict.get(DEFAULTS).get(feature_type)
set_values(sampled_default_shared_params, section_dict)


def update_section_dict(
section_dict: Dict[str, Any], parameter_name: str, trial_parameters_dict: Dict[str, Dict[str, Any]]
):
"""Update a parameter in section config with sampled value from hyperopt."""
if parameter_name not in trial_parameters_dict:
return

params = trial_parameters_dict[parameter_name]
set_values(params, section_dict)


def get_parameters_dict(parameters):
Expand All @@ -888,15 +989,36 @@ def get_parameters_dict(parameters):
return parameters_dict


def substitute_parameters(config, parameters):
def substitute_parameters(
config: Dict[str, Any],
parameters: Dict[str, Any],
features_eligible_for_shared_params: Dict[str, Dict[str, Set]] = None,
):
"""Update Ludwig config with parameters sampled from the Hyperopt sampler."""
parameters_dict = get_parameters_dict(parameters)
for input_feature in config["input_features"]:
set_values(input_feature, input_feature[COLUMN], parameters_dict)
for output_feature in config["output_features"]:
set_values(output_feature, output_feature[COLUMN], parameters_dict)
set_values(config["combiner"], "combiner", parameters_dict)
set_values(config[TRAINER], TRAINER, parameters_dict)
set_values(config["preprocessing"], "preprocessing", parameters_dict)
for input_feature in config[INPUT_FEATURES]:
# Update shared params
update_features_with_shared_params(
input_feature,
parameters_dict,
config_feature_group=INPUT_FEATURES,
features_eligible_for_shared_params=features_eligible_for_shared_params,
)
# Update or overwrite any feature specific hyperopt params
update_section_dict(input_feature, input_feature[COLUMN], parameters_dict)
for output_feature in config[OUTPUT_FEATURES]:
# Update shared params
update_features_with_shared_params(
output_feature,
parameters_dict,
config_feature_group=OUTPUT_FEATURES,
features_eligible_for_shared_params=features_eligible_for_shared_params,
)
# Update or overwrite any feature specific hyperopt params
update_section_dict(output_feature, output_feature[COLUMN], parameters_dict)
update_section_dict(config[COMBINER], COMBINER, parameters_dict)
update_section_dict(config[TRAINER], TRAINER, parameters_dict)
update_section_dict(config[PREPROCESSING], PREPROCESSING, parameters_dict)
return config


Expand Down Expand Up @@ -945,6 +1067,7 @@ def run_experiment(
allow_parallel_threads=allow_parallel_threads,
callbacks=callbacks,
)

eval_stats, train_stats, _, _ = model.experiment(
dataset=dataset,
training_set=training_set,
Expand Down
Loading

0 comments on commit 236cf54

Please sign in to comment.