Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated schedule logic for placement groups for ray backend #2523

Merged
merged 9 commits into from
Sep 20, 2022
16 changes: 15 additions & 1 deletion ludwig/backend/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,18 @@ def initialize(self):
# Disable placement groups on dask
dask.config.set(annotations={"ray_remote_args": {"placement_group": None}})

def generate_bundles(self, num_cpu):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: prefix with underscore since this is not really meant to be used outside this package.

# Ray requires that each bundle be scheduleable on a single node.
# So a bundle of 320 cpus would never get scheduled. For now a simple heuristic
# to be used is to just request cpus 4 at a time.
if num_cpu <= 4:
magdyksaleh marked this conversation as resolved.
Show resolved Hide resolved
bundles = [{"CPU": num_cpu}]
else:
bundles = [{"CPU": 4} for _ in range(int(num_cpu // 4))]
if num_cpu % 4:
bundles.append({"CPU": num_cpu % 4})
return bundles

@contextlib.contextmanager
def provision_preprocessing_workers(self):
if not self._preprocessor_kwargs.get("use_preprocessing_placement_group", False):
Expand All @@ -822,7 +834,9 @@ def provision_preprocessing_workers(self):
yield
else:
num_cpu = self._preprocessor_kwargs["num_cpu_workers"]
self._preprocessor_pg = placement_group([{"CPU": num_cpu}])
bundles = self.generate_bundles(num_cpu)
magdyksaleh marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Requesting bundles of %s for preprocessing", bundles)
self._preprocessor_pg = placement_group(bundles)
ready = self._preprocessor_pg.wait(FIFTEEN_MINS_IN_S)

if not ready:
Expand Down