Skip to content

Commit

Permalink
Updated schedule logic for placement groups for ray backend (#2523)
Browse files Browse the repository at this point in the history
* updated scheduling logic for resource isolation

* fixes

* fixes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add test and clean up remove_pg logic

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix test

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
magdyksaleh and pre-commit-ci[bot] authored Sep 20, 2022
1 parent f563531 commit 3e655dc
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
25 changes: 13 additions & 12 deletions ludwig/backend/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,17 +812,24 @@ def initialize(self):
# Disable placement groups on dask
dask.config.set(annotations={"ray_remote_args": {"placement_group": None}})

def generate_bundles(self, num_cpu):
# Ray requires that each bundle be scheduleable on a single node.
# So a bundle of 320 cpus would never get scheduled. For now a simple heuristic
# to be used is to just request 1 cpu at a time.
return [{"CPU": 1} for _ in range(int(num_cpu))]

@contextlib.contextmanager
def provision_preprocessing_workers(self):
if not self._preprocessor_kwargs.get("use_preprocessing_placement_group", False):
logger.warning(
"Backend config has use_preprocessing_placement_group set to False or did not set it at all."
" provision_preprocessing_workers() is a no-op in this case."
num_cpu = self._preprocessor_kwargs.get("num_cpu")
if not num_cpu:
logger.info(
"Backend config has num_cpu not set." " provision_preprocessing_workers() is a no-op in this case."
)
yield
else:
num_cpu = self._preprocessor_kwargs["num_cpu_workers"]
self._preprocessor_pg = placement_group([{"CPU": num_cpu}])
bundles = self.generate_bundles(num_cpu)
logger.info("Requesting bundles of %s for preprocessing", bundles)
self._preprocessor_pg = placement_group(bundles)
ready = self._preprocessor_pg.wait(FIFTEEN_MINS_IN_S)

if not ready:
Expand All @@ -840,12 +847,6 @@ def provision_preprocessing_workers(self):
self._release_preprocessing_workers()

def _release_preprocessing_workers(self):
if not self._preprocessor_kwargs.get("use_preprocessing_placement_group", False):
logger.warning(
"Backend config has use_preprocessing_placement_group set to False or did not set it at all."
" _release_preprocessing_workers() is a no-op in this case."
)
return
if self._preprocessor_pg is not None:
remove_placement_group(self._preprocessor_pg)
self._preprocessor_pg = None
Expand Down
41 changes: 41 additions & 0 deletions tests/integration_tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,44 @@ def test_ray_distributed_predict(tmpdir, ray_cluster_2cpu):
# compute the predictions
preds = preds.compute()
assert preds.iloc[1].name != preds.iloc[42].name


@pytest.mark.distributed
def test_ray_preprocessing_placement_group(tmpdir, ray_cluster_2cpu):
preprocessing_params = {
"audio_file_length_limit_in_s": 3.0,
"missing_value_strategy": BFILL,
"in_memory": True,
"padding_value": 0,
"norm": "per_file",
"type": "fbank",
"window_length_in_s": 0.04,
"window_shift_in_s": 0.02,
"num_filter_bands": 80,
}
audio_dest_folder = os.path.join(tmpdir, "generated_audio")
input_features = [audio_feature(folder=audio_dest_folder, preprocessing=preprocessing_params)]
output_features = [binary_feature()]

config = {
"input_features": input_features,
"output_features": output_features,
TRAINER: {"epochs": 2, "batch_size": 8},
}

with tempfile.TemporaryDirectory() as tmpdir:
backend_config = {**RAY_BACKEND_CONFIG}
backend_config["preprocessor_kwargs"] = {"num_cpu": 1}
csv_filename = os.path.join(tmpdir, "dataset.csv")
dataset_csv = generate_data(input_features, output_features, csv_filename, num_examples=100)
dataset = create_data_set_to_use("csv", dataset_csv, nan_percent=0.0)
model = LudwigModel(config, backend=backend_config)
_, _, output_dir = model.train(
dataset=dataset,
training_set=dataset,
skip_save_processed_input=True,
skip_save_progress=True,
skip_save_unprocessed_output=True,
skip_save_log=True,
)
preds, _ = model.predict(dataset=dataset)

0 comments on commit 3e655dc

Please sign in to comment.