Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated schedule logic for placement groups for ray backend #2523

Merged
merged 9 commits into from
Sep 20, 2022
25 changes: 13 additions & 12 deletions ludwig/backend/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,17 +812,24 @@ def initialize(self):
# Disable placement groups on dask
dask.config.set(annotations={"ray_remote_args": {"placement_group": None}})

def generate_bundles(self, num_cpu):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: prefix with underscore since this is not really meant to be used outside this package.

# Ray requires that each bundle be scheduleable on a single node.
# So a bundle of 320 cpus would never get scheduled. For now a simple heuristic
# to be used is to just request 1 cpu at a time.
return [{"CPU": 1} for _ in range(int(num_cpu))]

@contextlib.contextmanager
def provision_preprocessing_workers(self):
if not self._preprocessor_kwargs.get("use_preprocessing_placement_group", False):
logger.warning(
"Backend config has use_preprocessing_placement_group set to False or did not set it at all."
" provision_preprocessing_workers() is a no-op in this case."
num_cpu = self._preprocessor_kwargs.get("num_cpu")
if not num_cpu:
logger.info(
"Backend config has num_cpu not set." " provision_preprocessing_workers() is a no-op in this case."
)
yield
else:
num_cpu = self._preprocessor_kwargs["num_cpu_workers"]
self._preprocessor_pg = placement_group([{"CPU": num_cpu}])
bundles = self.generate_bundles(num_cpu)
magdyksaleh marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Requesting bundles of %s for preprocessing", bundles)
self._preprocessor_pg = placement_group(bundles)
ready = self._preprocessor_pg.wait(FIFTEEN_MINS_IN_S)

if not ready:
Expand All @@ -840,12 +847,6 @@ def provision_preprocessing_workers(self):
self._release_preprocessing_workers()

def _release_preprocessing_workers(self):
if not self._preprocessor_kwargs.get("use_preprocessing_placement_group", False):
logger.warning(
"Backend config has use_preprocessing_placement_group set to False or did not set it at all."
" _release_preprocessing_workers() is a no-op in this case."
)
return
if self._preprocessor_pg is not None:
remove_placement_group(self._preprocessor_pg)
self._preprocessor_pg = None
Expand Down
41 changes: 41 additions & 0 deletions tests/integration_tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,44 @@ def test_ray_distributed_predict(tmpdir, ray_cluster_2cpu):
# compute the predictions
preds = preds.compute()
assert preds.iloc[1].name != preds.iloc[42].name


@pytest.mark.distributed
def test_ray_preprocessing_placement_group(tmpdir, ray_cluster_2cpu):
preprocessing_params = {
"audio_file_length_limit_in_s": 3.0,
"missing_value_strategy": BFILL,
"in_memory": True,
"padding_value": 0,
"norm": "per_file",
"type": "fbank",
"window_length_in_s": 0.04,
"window_shift_in_s": 0.02,
"num_filter_bands": 80,
}
audio_dest_folder = os.path.join(tmpdir, "generated_audio")
input_features = [audio_feature(folder=audio_dest_folder, preprocessing=preprocessing_params)]
output_features = [binary_feature()]

config = {
"input_features": input_features,
"output_features": output_features,
TRAINER: {"epochs": 2, "batch_size": 8},
}

with tempfile.TemporaryDirectory() as tmpdir:
backend_config = {**RAY_BACKEND_CONFIG}
backend_config["preprocessor_kwargs"] = {"num_cpu": 1}
csv_filename = os.path.join(tmpdir, "dataset.csv")
dataset_csv = generate_data(input_features, output_features, csv_filename, num_examples=100)
dataset = create_data_set_to_use("csv", dataset_csv, nan_percent=0.0)
model = LudwigModel(config, backend=backend_config)
_, _, output_dir = model.train(
dataset=dataset,
training_set=dataset,
skip_save_processed_input=True,
skip_save_progress=True,
skip_save_unprocessed_output=True,
skip_save_log=True,
)
preds, _ = model.predict(dataset=dataset)