Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI] Ensure that mlflow callback cleans up background-saving threads on trainer teardown. #3683

Merged
merged 13 commits into from
Oct 2, 2023
35 changes: 28 additions & 7 deletions ludwig/contribs/mlflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,26 @@ def on_trainer_train_setup(self, trainer, save_path, is_coordinator):
def on_eval_end(self, trainer, progress_tracker, save_path):
if progress_tracker.steps not in self.logged_steps:
self.logged_steps.add(progress_tracker.steps)
self.save_fn((progress_tracker.log_metrics(), progress_tracker.steps, save_path, True)) # Why True?
# Adds a tuple to the logging queue.
# True is passed to indicate that the background saving loop should continue.
self.save_fn((progress_tracker.log_metrics(), progress_tracker.steps, save_path, True))

def on_trainer_train_teardown(self, trainer, progress_tracker, save_path, is_coordinator):
if is_coordinator:
if progress_tracker.steps not in self.logged_steps:
self.logged_steps.add(progress_tracker.steps)
self.save_fn((progress_tracker.log_metrics(), progress_tracker.steps, save_path, False)) # Why False?
if self.save_thread is not None:
self.save_thread.join()
# Adds a tuple to the logging queue.
# False is passed to indicate that the background saving loop should break.
self.save_fn((progress_tracker.log_metrics(), progress_tracker.steps, save_path, False))
# False ensures that the background saving loop breaks.
# TODO(Justin): This should probably live in on_ludwig_end, once that's implemented.
self.save_fn((None, None, None, False))
justinxzhao marked this conversation as resolved.
Show resolved Hide resolved

# Close the save_thread.
if self.save_thread is not None:
self.save_thread.join()
# if self.save_thread.is_alive():
# logger.warning("MLFlow save thread timed out and did not close properly.")

def on_visualize_figure(self, fig):
# TODO: need to also include a filename for this figure
Expand Down Expand Up @@ -205,10 +216,15 @@ def __setstate__(self, d):


def _log_mlflow_loop(q: queue.Queue, log_artifacts: bool = True):
"""The save_fn for the background thread that logs to MLFlow when save_in_background is True."""
should_continue = True
while should_continue:
elem = q.get()
log_metrics, steps, save_path, should_continue = elem
if log_metrics is None:
# Break out of the loop if we're not going to log anything.
break

mlflow.log_metrics(log_metrics, step=steps)

if not q.empty():
Expand All @@ -221,9 +237,14 @@ def _log_mlflow_loop(q: queue.Queue, log_artifacts: bool = True):


def _log_mlflow(log_metrics, steps, save_path, should_continue, log_artifacts: bool = True):
mlflow.log_metrics(log_metrics, step=steps)
if log_artifacts:
_log_model(save_path)
"""The save_fn for the MlflowCallback.

This is used when save_in_background is False.
"""
if log_metrics is not None:
mlflow.log_metrics(log_metrics, step=steps)
if log_artifacts:
_log_model(save_path)


def _log_artifacts(output_directory):
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/test_contrib_aim.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TEST_SCRIPT = os.path.join(os.path.dirname(__file__), "scripts", "run_train_aim.py")


@pytest.mark.skip(reason="Aim integration not compatible with Aim 4.0.")
@pytest.mark.distributed
def test_contrib_experiment(csv_filename, tmpdir):
aim_test_path = os.path.join(tmpdir, "results")
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/test_torchscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from ludwig.api import LudwigModel
from ludwig.backend import RAY
from ludwig.constants import BATCH_SIZE, COMBINER, LOGITS, NAME, PREDICTIONS, PROBABILITIES, TRAINER
from ludwig.constants import BATCH_SIZE, COMBINER, EVAL_BATCH_SIZE, LOGITS, NAME, PREDICTIONS, PROBABILITIES, TRAINER
from ludwig.data.preprocessing import preprocess_for_prediction
from ludwig.features.number_feature import numeric_transformation_registry
from ludwig.globals import TRAIN_SET_METADATA_FILE_NAME
Expand Down Expand Up @@ -415,7 +415,7 @@ def test_torchscript_e2e_text_hf_tokenizer(tmpdir, csv_filename):
config = {
"input_features": input_features,
"output_features": output_features,
TRAINER: {"epochs": 2, BATCH_SIZE: 128},
TRAINER: {"epochs": 2, BATCH_SIZE: 128, EVAL_BATCH_SIZE: 128},
}
training_data_csv_path = generate_data(input_features, output_features, data_csv_path)

Expand Down