Skip to content

Commit

Permalink
Add resource limits for Vertex (#529)
Browse files Browse the repository at this point in the history
This was necessary to be able to allocate enough memory. Added the
missing ones to the KfP compiler as well.

Proof that it works:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/datacomp-filtering-pipeline-20231017201538?project=soy-audio-379412
  • Loading branch information
RobbeSneyders authored Oct 18, 2023
1 parent 02ac6eb commit 60f0ee2
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 55 deletions.
12 changes: 12 additions & 0 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,16 @@ def _set_configuration(self, task, fondant_component_operation):
accelerator_name = fondant_component_operation.accelerator_name
node_pool_label = fondant_component_operation.node_pool_label
node_pool_name = fondant_component_operation.node_pool_name
cpu_request = fondant_component_operation.cpu_request
cpu_limit = fondant_component_operation.cpu_limit
memory_request = fondant_component_operation.memory_request
memory_limit = fondant_component_operation.memory_limit

# Assign optional specification
if cpu_request is not None:
task.set_memory_request(cpu_request)
if cpu_limit is not None:
task.set_memory_limit(cpu_limit)
if memory_request is not None:
task.set_memory_request(memory_request)
if memory_limit is not None:
Expand Down Expand Up @@ -459,10 +465,16 @@ def resolve_imports(self):
@staticmethod
def _set_configuration(task, fondant_component_operation):
# Unpack optional specifications
cpu_limit = fondant_component_operation.cpu_limit
memory_limit = fondant_component_operation.memory_limit
number_of_accelerators = fondant_component_operation.number_of_accelerators
accelerator_name = fondant_component_operation.accelerator_name

# Assign optional specification
if cpu_limit is not None:
task.set_cpu_limit(cpu_limit)
if memory_limit is not None:
task.set_memory_limit(memory_limit)
if number_of_accelerators is not None:
task.set_accelerator_limit(number_of_accelerators)
if accelerator_name not in valid_vertex_accelerator_types:
Expand Down
30 changes: 26 additions & 4 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class ComponentOp:
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
cache: Set to False to disable caching, True by default.
cpu_request: the memory requested by the component. The value
should be a string which can be a number or a number followed by “m”, which means
1/1000.
cpu_limit: the maximum amount of CPU that can be used by the component. The value
should be a string which can be a number or a number followed by “m”, which means
1/1000.
memory_request: the memory requested by the component. The value can be a number or a
number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.
memory_limit: the maximum memory that can be used by the component. The value can be a
Expand Down Expand Up @@ -98,8 +104,10 @@ def __init__(
preemptible: t.Optional[bool] = False,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
memory_request: t.Optional[t.Union[str, int]] = None,
memory_limit: t.Optional[t.Union[str, int]] = None,
cpu_request: t.Optional[str] = None,
cpu_limit: t.Optional[str] = None,
memory_request: t.Optional[str] = None,
memory_limit: t.Optional[str] = None,
) -> None:
self.component_dir = Path(component_dir)
self.input_partition_rows = input_partition_rows
Expand All @@ -119,6 +127,8 @@ def __init__(

self.arguments.setdefault("component_spec", self.component_spec.specification)

self.cpu_request = cpu_request
self.cpu_limit = cpu_limit
self.memory_request = memory_request
self.memory_limit = memory_limit
self.node_pool_label, self.node_pool_name = self._validate_node_pool_spec(
Expand Down Expand Up @@ -231,8 +241,10 @@ def from_registry(
preemptible: t.Optional[bool] = False,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
memory_request: t.Optional[t.Union[str, int]] = None,
memory_limit: t.Optional[t.Union[str, int]] = None,
cpu_request: t.Optional[str] = None,
cpu_limit: t.Optional[str] = None,
memory_request: t.Optional[str] = None,
memory_limit: t.Optional[str] = None,
) -> "ComponentOp":
"""Load a reusable component by its name.
Expand All @@ -254,6 +266,14 @@ def from_registry(
Requires the setup and assignment of a preemptible node pool. Note that preemptibles
only work when KFP is setup on GCP. More info here:
https://v1-6-branch.kubeflow.org/docs/distributions/gke/pipelines/preemptible/
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the dask client.
cpu_request: the memory requested by the component. The value
should be a string which can be a number or a number followed by “m”, which means
1/1000.
cpu_limit: the maximum amount of CPU that can be used by the component. The value
should be a string which can be a number or a number followed by “m”, which means
1/1000.
memory_request: the memory requested by the component. The value can be a number or a
number followed by one of “E”, “P”, “T”, “G”, “M”, “K”.
memory_limit: the maximum memory that can be used by the component. The value can be a
Expand All @@ -279,6 +299,8 @@ def from_registry(
preemptible=preemptible,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
cpu_request=cpu_request,
cpu_limit=cpu_limit,
memory_request=memory_request,
memory_limit=memory_limit,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,44 +94,46 @@ deploymentSpec:
exec-first-component:
container:
args:
- --storage_args
- '{{$.inputs.parameters[''storage_args'']}}'
- --input_partition_rows
- '{{$.inputs.parameters[''input_partition_rows'']}}'
- --cache
- '{{$.inputs.parameters[''cache'']}}'
- --cluster_type
- '{{$.inputs.parameters[''cluster_type'']}}'
- --component_spec
- '{{$.inputs.parameters[''component_spec'']}}'
- --output_manifest_path
- '{{$.inputs.parameters[''output_manifest_path'']}}'
- --metadata
- '{{$.inputs.parameters[''metadata'']}}'
- "--storage_args"
- "{{$.inputs.parameters['storage_args']}}"
- "--input_partition_rows"
- "{{$.inputs.parameters['input_partition_rows']}}"
- "--cache"
- "{{$.inputs.parameters['cache']}}"
- "--cluster_type"
- "{{$.inputs.parameters['cluster_type']}}"
- "--component_spec"
- "{{$.inputs.parameters['component_spec']}}"
- "--output_manifest_path"
- "{{$.inputs.parameters['output_manifest_path']}}"
- "--metadata"
- "{{$.inputs.parameters['metadata']}}"
command:
- fondant
- execute
- main
image: example_component:latest
resources:
memoryLimit: 0.512
exec-second-component:
container:
args:
- --storage_args
- '{{$.inputs.parameters[''storage_args'']}}'
- --input_partition_rows
- '{{$.inputs.parameters[''input_partition_rows'']}}'
- --cache
- '{{$.inputs.parameters[''cache'']}}'
- --cluster_type
- '{{$.inputs.parameters[''cluster_type'']}}'
- --component_spec
- '{{$.inputs.parameters[''component_spec'']}}'
- --output_manifest_path
- '{{$.inputs.parameters[''output_manifest_path'']}}'
- --metadata
- '{{$.inputs.parameters[''metadata'']}}'
- --input_manifest_path
- '{{$.inputs.parameters[''input_manifest_path'']}}'
- "--storage_args"
- "{{$.inputs.parameters['storage_args']}}"
- "--input_partition_rows"
- "{{$.inputs.parameters['input_partition_rows']}}"
- "--cache"
- "{{$.inputs.parameters['cache']}}"
- "--cluster_type"
- "{{$.inputs.parameters['cluster_type']}}"
- "--component_spec"
- "{{$.inputs.parameters['component_spec']}}"
- "--output_manifest_path"
- "{{$.inputs.parameters['output_manifest_path']}}"
- "--metadata"
- "{{$.inputs.parameters['metadata']}}"
- "--input_manifest_path"
- "{{$.inputs.parameters['input_manifest_path']}}"
command:
- fondant
- execute
Expand All @@ -140,20 +142,20 @@ deploymentSpec:
exec-third-component:
container:
args:
- --storage_args
- '{{$.inputs.parameters[''storage_args'']}}'
- --cache
- '{{$.inputs.parameters[''cache'']}}'
- --cluster_type
- '{{$.inputs.parameters[''cluster_type'']}}'
- --component_spec
- '{{$.inputs.parameters[''component_spec'']}}'
- --output_manifest_path
- '{{$.inputs.parameters[''output_manifest_path'']}}'
- --metadata
- '{{$.inputs.parameters[''metadata'']}}'
- --input_manifest_path
- '{{$.inputs.parameters[''input_manifest_path'']}}'
- "--storage_args"
- "{{$.inputs.parameters['storage_args']}}"
- "--cache"
- "{{$.inputs.parameters['cache']}}"
- "--cluster_type"
- "{{$.inputs.parameters['cluster_type']}}"
- "--component_spec"
- "{{$.inputs.parameters['component_spec']}}"
- "--output_manifest_path"
- "{{$.inputs.parameters['output_manifest_path']}}"
- "--metadata"
- "{{$.inputs.parameters['metadata']}}"
- "--input_manifest_path"
- "{{$.inputs.parameters['input_manifest_path']}}"
command:
- fondant
- execute
Expand Down Expand Up @@ -198,15 +200,15 @@ root:
type: binary
input_partition_rows:
runtimeValue:
constant: 10.0
constant: 10
metadata:
runtimeValue:
constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline",
"run_id": "testpipeline-20230101000000", "component_id": "first_component",
"cache_key": "1"}'
output_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json"
storage_args:
runtimeValue:
constant: a dummy string arg
Expand Down Expand Up @@ -250,18 +252,18 @@ root:
type: array
input_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json"
input_partition_rows:
runtimeValue:
constant: 10.0
constant: 10
metadata:
runtimeValue:
constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline",
"run_id": "testpipeline-20230101000000", "component_id": "second_component",
"cache_key": "2"}'
output_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json"
storage_args:
runtimeValue:
constant: a dummy string arg
Expand Down Expand Up @@ -314,15 +316,15 @@ root:
type: binary
input_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json"
metadata:
runtimeValue:
constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline",
"run_id": "testpipeline-20230101000000", "component_id": "third_component",
"cache_key": "3"}'
output_manifest_path:
runtimeValue:
constant: /foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json
constant: "/foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json"
storage_args:
runtimeValue:
constant: a dummy string arg
Expand Down

0 comments on commit 60f0ee2

Please sign in to comment.