diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index b222128bbb..21b7a5da1f 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -61,6 +61,12 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): include_security_context_capabilities: bool = True """ (optional) Include security context capabilities in the init and job container spec """ + labels: dict = {} + """ (optional) additional labels to append to kubernetes objects """ + + max_parallelism: int = 10 + """ (optional) Maximum number of pods (default 10) allowed to run in parallel per job""" + class BytewaxMaterializationEngine(BatchMaterializationEngine): def __init__( @@ -82,7 +88,7 @@ def __init__( self.online_store = online_store # TODO: Configure k8s here - k8s_config.load_kube_config() + k8s_config.load_config() self.k8s_client = client.api_client.ApiClient() self.v1 = client.CoreV1Api(self.k8s_client) @@ -196,14 +202,13 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace): {"paths": paths, "feature_view": feature_view.name} ) + labels = {"feast-bytewax-materializer": "configmap"} configmap_manifest = { "kind": "ConfigMap", "apiVersion": "v1", "metadata": { "name": f"feast-{job_id}", - "labels": { - "feast-bytewax-materializer": "configmap", - }, + "labels": {**labels, **self.batch_engine_config.labels}, }, "data": { "feature_store.yaml": feature_store_configuration, @@ -260,27 +265,25 @@ def _create_job_definition(self, job_id, namespace, pods, env): "drop": ["ALL"], } + job_labels = {"feast-bytewax-materializer": "job"} + pod_labels = {"feast-bytewax-materializer": "pod"} job_definition = { "apiVersion": "batch/v1", "kind": "Job", "metadata": { "name": f"dataflow-{job_id}", "namespace": namespace, - "labels": { - "feast-bytewax-materializer": "job", - }, + "labels": {**job_labels, **self.batch_engine_config.labels}, }, "spec": { "ttlSecondsAfterFinished": 3600, "completions": pods, - "parallelism": pods, + "parallelism": min(pods, self.batch_engine_config.max_parallelism), "completionMode": "Indexed", "template": { "metadata": { "annotations": self.batch_engine_config.annotations, - "labels": { - "feast-bytewax-materializer": "pod", - }, + "labels": {**pod_labels, **self.batch_engine_config.labels}, }, "spec": { "restartPolicy": "Never", diff --git a/setup.py b/setup.py index f7b1ff0417..047100f03e 100644 --- a/setup.py +++ b/setup.py @@ -98,7 +98,7 @@ "hiredis>=2.0.0,<3", ] -AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2"] +AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "s3fs"] BYTEWAX_REQUIRED = ["bytewax==0.15.1", "docker>=5.0.2", "kubernetes<=20.13.0"]