diff --git a/ludwig/backend/ray.py b/ludwig/backend/ray.py index ab238208272..d8076b74d51 100644 --- a/ludwig/backend/ray.py +++ b/ludwig/backend/ray.py @@ -812,17 +812,24 @@ def initialize(self): # Disable placement groups on dask dask.config.set(annotations={"ray_remote_args": {"placement_group": None}}) + def generate_bundles(self, num_cpu): + # Ray requires that each bundle be scheduleable on a single node. + # So a bundle of 320 cpus would never get scheduled. For now a simple heuristic + # to be used is to just request 1 cpu at a time. + return [{"CPU": 1} for _ in range(int(num_cpu))] + @contextlib.contextmanager def provision_preprocessing_workers(self): - if not self._preprocessor_kwargs.get("use_preprocessing_placement_group", False): - logger.warning( - "Backend config has use_preprocessing_placement_group set to False or did not set it at all." - " provision_preprocessing_workers() is a no-op in this case." + num_cpu = self._preprocessor_kwargs.get("num_cpu") + if not num_cpu: + logger.info( + "Backend config has num_cpu not set." " provision_preprocessing_workers() is a no-op in this case." ) yield else: - num_cpu = self._preprocessor_kwargs["num_cpu_workers"] - self._preprocessor_pg = placement_group([{"CPU": num_cpu}]) + bundles = self.generate_bundles(num_cpu) + logger.info("Requesting bundles of %s for preprocessing", bundles) + self._preprocessor_pg = placement_group(bundles) ready = self._preprocessor_pg.wait(FIFTEEN_MINS_IN_S) if not ready: @@ -840,12 +847,6 @@ def provision_preprocessing_workers(self): self._release_preprocessing_workers() def _release_preprocessing_workers(self): - if not self._preprocessor_kwargs.get("use_preprocessing_placement_group", False): - logger.warning( - "Backend config has use_preprocessing_placement_group set to False or did not set it at all." - " _release_preprocessing_workers() is a no-op in this case." - ) - return if self._preprocessor_pg is not None: remove_placement_group(self._preprocessor_pg) self._preprocessor_pg = None diff --git a/tests/integration_tests/test_ray.py b/tests/integration_tests/test_ray.py index 2fad952ac62..a8ee5aa8742 100644 --- a/tests/integration_tests/test_ray.py +++ b/tests/integration_tests/test_ray.py @@ -631,3 +631,44 @@ def test_ray_distributed_predict(tmpdir, ray_cluster_2cpu): # compute the predictions preds = preds.compute() assert preds.iloc[1].name != preds.iloc[42].name + + +@pytest.mark.distributed +def test_ray_preprocessing_placement_group(tmpdir, ray_cluster_2cpu): + preprocessing_params = { + "audio_file_length_limit_in_s": 3.0, + "missing_value_strategy": BFILL, + "in_memory": True, + "padding_value": 0, + "norm": "per_file", + "type": "fbank", + "window_length_in_s": 0.04, + "window_shift_in_s": 0.02, + "num_filter_bands": 80, + } + audio_dest_folder = os.path.join(tmpdir, "generated_audio") + input_features = [audio_feature(folder=audio_dest_folder, preprocessing=preprocessing_params)] + output_features = [binary_feature()] + + config = { + "input_features": input_features, + "output_features": output_features, + TRAINER: {"epochs": 2, "batch_size": 8}, + } + + with tempfile.TemporaryDirectory() as tmpdir: + backend_config = {**RAY_BACKEND_CONFIG} + backend_config["preprocessor_kwargs"] = {"num_cpu": 1} + csv_filename = os.path.join(tmpdir, "dataset.csv") + dataset_csv = generate_data(input_features, output_features, csv_filename, num_examples=100) + dataset = create_data_set_to_use("csv", dataset_csv, nan_percent=0.0) + model = LudwigModel(config, backend=backend_config) + _, _, output_dir = model.train( + dataset=dataset, + training_set=dataset, + skip_save_processed_input=True, + skip_save_progress=True, + skip_save_unprocessed_output=True, + skip_save_log=True, + ) + preds, _ = model.predict(dataset=dataset)