diff --git a/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml b/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml
index 93d11dd16f..00667ccbb0 100644
--- a/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml
+++ b/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml
@@ -71,11 +71,6 @@ spec:
{{- end }}
{{- end }}
- command:
- - python
- - "-m"
- - "feast.cli"
- - server
ports:
- name: http
containerPort: {{ .Values.service.http.targetPort }}
diff --git a/infra/scripts/codebuild_runner.py b/infra/scripts/codebuild_runner.py
new file mode 100755
index 0000000000..7b5ee0ee3e
--- /dev/null
+++ b/infra/scripts/codebuild_runner.py
@@ -0,0 +1,191 @@
+#!/usr/bin/env python
+
+# This is a thin wrapper for AWS Codebuild API to kick off a build, wait for it to finish,
+# and tail build logs while it is running.
+
+import os
+import json
+from typing import Dict, Any, List, Optional, AsyncGenerator
+from datetime import datetime
+import asyncio
+import sys
+import argparse
+import boto3
+
+
+class LogTailer:
+ """ A simple cloudwatch log tailer. """
+
+ _next_token: Optional[str]
+
+ def __init__(self, client, log_group: str, log_stream: str):
+ self._client = client
+ self._next_token = None
+ self._log_group = log_group
+ self._log_stream = log_stream
+
+ def _get_log_events_args(self) -> Dict[str, Any]:
+ res = dict(
+ logGroupName=self._log_group,
+ logStreamName=self._log_stream,
+ limit=100,
+ startFromHead=True,
+ )
+ if self._next_token:
+ res["nextToken"] = self._next_token
+ return res
+
+ async def tail_chunk(self) -> List[Dict[str, str]]:
+ max_sleep = 5.0
+ SLEEP_TIME = 0.5
+
+ while max_sleep > 0:
+ resp = self._client.get_log_events(**self._get_log_events_args())
+ events = resp["events"]
+ self._next_token = resp.get("nextForwardToken")
+ if events:
+ return events
+ else:
+ max_sleep -= SLEEP_TIME
+ await asyncio.sleep(SLEEP_TIME)
+ else:
+ return []
+
+ async def read_all_chunks(self) -> AsyncGenerator[List[Dict[str, str]], None]:
+ while True:
+ resp = self._client.get_log_events(**self._get_log_events_args())
+ events = resp["events"]
+ self._next_token = resp.get("nextForwardToken")
+ if events:
+ yield events
+ else:
+ return
+
+
+async def _wait_build_state(
+ client, build_id, desired_phase: Optional[str], desired_states: List[str]
+) -> Dict[str, Any]:
+ """ Wait until the build is in one of the desired states, or in the desired phase. """
+ while True:
+ resp = client.batch_get_builds(ids=[build_id])
+ assert len(resp["builds"]) == 1
+ build = resp["builds"][0]
+ if build["buildStatus"] in desired_states:
+ return build
+ for phase in build["phases"]:
+ if desired_phase and (phase["phaseType"] == desired_phase):
+ return build
+
+ await asyncio.sleep(2)
+
+
+def print_log_event(event) -> None:
+ print(
+ str(datetime.fromtimestamp(event["timestamp"] / 1000.0)),
+ event["message"],
+ end="",
+ )
+
+
+async def main() -> None:
+ parser = argparse.ArgumentParser(description="Process some integers.")
+ parser.add_argument(
+ "--project-name", default="feast-ci-project", type=str, help="Project name"
+ )
+ parser.add_argument(
+ "--source-location",
+ type=str,
+ help="Source location, e.g. https://github.com/feast/feast.git",
+ )
+ parser.add_argument(
+ "--source-version", type=str, help="Source version, e.g. master"
+ )
+ parser.add_argument(
+ "--location-from-prow", action='store_true', help="Infer source location and version from prow environment variables"
+ )
+ args = parser.parse_args()
+
+ if args.location_from_prow:
+ job_spec = json.loads(os.getenv('JOB_SPEC', ''))
+ source_location = job_spec['refs']['repo_link']
+ source_version = source_version_from_prow_job_spec(job_spec)
+ else:
+ source_location = args.source_location
+ source_version = args.source_version
+
+ await run_build(
+ project_name=args.project_name,
+ source_location=source_location,
+ source_version=source_version,
+ )
+
+def source_version_from_prow_job_spec(job_spec: Dict[str, Any]) -> str:
+ pull = job_spec['refs']['pulls'][0]
+ return f'refs/pull/{pull["number"]}/head^{{{pull["sha"]}}}'
+
+async def run_build(project_name: str, source_version: str, source_location: str):
+ print(f"Building {project_name} at {source_version}", file=sys.stderr)
+ logs_client = boto3.client("logs", region_name="us-west-2")
+ codebuild_client = boto3.client("codebuild", region_name="us-west-2")
+
+ print("Submitting the build..", file=sys.stderr)
+ build_resp = codebuild_client.start_build(
+ projectName=project_name,
+ sourceLocationOverride=source_location,
+ sourceVersion=source_version,
+ )
+
+ build_id = build_resp["build"]["id"]
+
+ try:
+ print(
+ "Waiting for the INSTALL phase to start before tailing the log",
+ file=sys.stderr,
+ )
+ build = await _wait_build_state(
+ codebuild_client,
+ build_id,
+ desired_phase="INSTALL",
+ desired_states=["SUCCEEDED", "FAILED", "STOPPED", "TIMED_OUT", "FAULT"],
+ )
+
+ if build["buildStatus"] != "IN_PROGRESS":
+ print(
+ f"Build failed before install phase: {build['buildStatus']}",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+ log_tailer = LogTailer(
+ logs_client,
+ log_stream=build["logs"]["streamName"],
+ log_group=build["logs"]["groupName"],
+ )
+
+ waiter_task = asyncio.create_task(
+ _wait_build_state(
+ codebuild_client,
+ build_id,
+ desired_phase=None,
+ desired_states=["SUCCEEDED", "FAILED", "STOPPED", "TIMED_OUT", "FAULT"],
+ )
+ )
+
+ while not waiter_task.done():
+ events = await log_tailer.tail_chunk()
+ for event in events:
+ print_log_event(event)
+
+ build_status = waiter_task.result()["buildStatus"]
+ if build_status == "SUCCEEDED":
+ print(f"Build {build_status}", file=sys.stderr)
+ else:
+ print(f"Build {build_status}", file=sys.stderr)
+ sys.exit(1)
+ except KeyboardInterrupt:
+ print(f"Stopping build {build_id}", file=sys.stderr)
+ codebuild_client.stop_build(id=build_id)
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/infra/scripts/setup-e2e-env-aws.sh b/infra/scripts/setup-e2e-env-aws.sh
new file mode 100755
index 0000000000..6521f941e2
--- /dev/null
+++ b/infra/scripts/setup-e2e-env-aws.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+
+make compile-protos-python
+
+python -m pip install --upgrade pip setuptools wheel
+
+python -m pip install -qr sdk/python/requirements-dev.txt
+python -m pip install -qr tests/requirements.txt
+
+# Using mvn -q to make it less verbose. This step happens after docker containers were
+# succesfully built so it should be unlikely to fail.
+echo "########## Building ingestion jar"
+TIMEFORMAT='########## took %R seconds'
+time mvn -q --no-transfer-progress -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean package
diff --git a/infra/scripts/test-end-to-end-aws.sh b/infra/scripts/test-end-to-end-aws.sh
index c8a98541d7..981118fd2e 100755
--- a/infra/scripts/test-end-to-end-aws.sh
+++ b/infra/scripts/test-end-to-end-aws.sh
@@ -1,3 +1,19 @@
#!/usr/bin/env bash
-aws sts get-caller-identity
\ No newline at end of file
+set -euo pipefail
+
+pip install "s3fs" "boto3" "urllib3>=1.25.4"
+
+export DISABLE_FEAST_SERVICE_FIXTURES=1
+export DISABLE_SERVICE_FIXTURES=1
+
+PYTHONPATH=sdk/python pytest tests/e2e/ \
+ --core-url cicd-feast-core:6565 \
+ --serving-url cicd-feast-online-serving:6566 \
+ --env aws \
+ --emr-cluster-id $CLUSTER_ID \
+ --staging-path $STAGING_PATH \
+ --redis-url $NODE_IP:32379 \
+ --emr-region us-west-2 \
+ --kafka-brokers $NODE_IP:30092 \
+ -m "not bq"
\ No newline at end of file
diff --git a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py
index 875bde3107..634d82ce78 100644
--- a/sdk/python/feast/pyspark/launchers/aws/emr_utils.py
+++ b/sdk/python/feast/pyspark/launchers/aws/emr_utils.py
@@ -321,7 +321,11 @@ def _cancel_job(emr_client, job: EmrJobRef):
else:
step_id = job.step_id
- emr_client.cancel_steps(ClusterId=job.cluster_id, StepIds=[step_id])
+ emr_client.cancel_steps(
+ ClusterId=job.cluster_id,
+ StepIds=[step_id],
+ StepCancellationOption="TERMINATE_PROCESS",
+ )
_wait_for_job_state(
emr_client, EmrJobRef(job.cluster_id, step_id), TERMINAL_STEP_STATES, 180
diff --git a/spark/ingestion/pom.xml b/spark/ingestion/pom.xml
index 6f51489086..b67c333bd5 100644
--- a/spark/ingestion/pom.xml
+++ b/spark/ingestion/pom.xml
@@ -299,6 +299,10 @@
com.google.protobuf
com.google.protobuf.vendor
+
+ org.apache.kafka
+ org.apache.kafka.vendor
+
diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py
index b602dc6d19..17524573dd 100644
--- a/tests/e2e/conftest.py
+++ b/tests/e2e/conftest.py
@@ -14,6 +14,8 @@ def pytest_addoption(parser):
parser.addoption("--staging-path", action="store")
parser.addoption("--dataproc-cluster-name", action="store")
parser.addoption("--dataproc-region", action="store")
+ parser.addoption("--emr-cluster-id", action="store")
+ parser.addoption("--emr-region", action="store")
parser.addoption("--dataproc-project", action="store")
parser.addoption("--ingestion-jar", action="store")
parser.addoption("--redis-url", action="store", default="localhost:6379")
diff --git a/tests/e2e/fixtures/client.py b/tests/e2e/fixtures/client.py
index 3ad3966eec..264d5dc687 100644
--- a/tests/e2e/fixtures/client.py
+++ b/tests/e2e/fixtures/client.py
@@ -3,7 +3,6 @@
import uuid
from typing import Optional, Tuple
-import pyspark
import pytest
from pytest_redis.executor import RedisExecutor
@@ -29,7 +28,9 @@ def feast_client(
)
if pytestconfig.getoption("env") == "local":
- c = Client(
+ import pyspark
+
+ return Client(
core_url=f"{feast_core[0]}:{feast_core[1]}",
serving_url=f"{feast_serving[0]}:{feast_serving[1]}",
spark_launcher="standalone",
@@ -62,6 +63,22 @@ def feast_client(
),
**job_service_env,
)
+ elif pytestconfig.getoption("env") == "aws":
+ return Client(
+ core_url=f"{feast_core[0]}:{feast_core[1]}",
+ serving_url=f"{feast_serving[0]}:{feast_serving[1]}",
+ spark_launcher="emr",
+ emr_cluster_id=pytestconfig.getoption("emr_cluster_id"),
+ emr_region=pytestconfig.getoption("emr_region"),
+ spark_staging_location=os.path.join(local_staging_path, "emr"),
+ emr_log_location=os.path.join(local_staging_path, "emr_logs"),
+ spark_ingestion_jar=ingestion_job_jar,
+ redis_host=pytestconfig.getoption("redis_url").split(":")[0],
+ redis_port=pytestconfig.getoption("redis_url").split(":")[1],
+ historical_feature_output_location=os.path.join(
+ local_staging_path, "historical_output"
+ ),
+ )
else:
raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}")
diff --git a/tests/e2e/test_historical_features.py b/tests/e2e/test_historical_features.py
index 6a618c92b1..ac65cbde86 100644
--- a/tests/e2e/test_historical_features.py
+++ b/tests/e2e/test_historical_features.py
@@ -21,11 +21,18 @@ def read_parquet(uri):
return pd.read_parquet(parsed_uri.path)
elif parsed_uri.scheme == "gs":
fs = gcsfs.GCSFileSystem()
- files = ["gs://" + path for path in gcsfs.GCSFileSystem().glob(uri + "/part-*")]
+ files = ["gs://" + path for path in fs.glob(uri + "/part-*")]
+ ds = parquet.ParquetDataset(files, filesystem=fs)
+ return ds.read().to_pandas()
+ elif parsed_uri.scheme == "s3":
+ import s3fs
+
+ fs = s3fs.S3FileSystem()
+ files = ["s3://" + path for path in fs.glob(uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
else:
- raise ValueError("Unsupported scheme")
+ raise ValueError(f"Unsupported URL scheme {uri}")
def generate_data():
diff --git a/tests/e2e/test_online_features.py b/tests/e2e/test_online_features.py
index c1dedd1e59..77bd2a0f7c 100644
--- a/tests/e2e/test_online_features.py
+++ b/tests/e2e/test_online_features.py
@@ -133,11 +133,11 @@ def test_streaming_ingestion(
job = feast_client.start_stream_to_online_ingestion(feature_table)
wait_retry_backoff(
- lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 60
+ lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
)
wait_retry_backoff(
- lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 60
+ lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 120
)
try:
@@ -183,7 +183,9 @@ def ingest_and_verify(
original.event_timestamp.max().to_pydatetime() + timedelta(seconds=1),
)
- wait_retry_backoff(lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 60)
+ wait_retry_backoff(
+ lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 180
+ )
features = feast_client.get_online_features(
[f"{feature_table.name}:unique_drivers"],