Skip to content

Commit

Permalink
aws ci/cd
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev committed Nov 5, 2020
1 parent 39efe3c commit 021b232
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ spec:
{{- end }}
{{- end }}

command:
- python
- "-m"
- "feast.cli"
- server
ports:
- name: http
containerPort: {{ .Values.service.http.targetPort }}
Expand Down
191 changes: 191 additions & 0 deletions infra/scripts/codebuild_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#!/usr/bin/env python

# This is a thin wrapper for AWS Codebuild API to kick off a build, wait for it to finish,
# and tail build logs while it is running.

import os
import json
from typing import Dict, Any, List, Optional, AsyncGenerator
from datetime import datetime
import asyncio
import sys
import argparse
import boto3


class LogTailer:
""" A simple cloudwatch log tailer. """

_next_token: Optional[str]

def __init__(self, client, log_group: str, log_stream: str):
self._client = client
self._next_token = None
self._log_group = log_group
self._log_stream = log_stream

def _get_log_events_args(self) -> Dict[str, Any]:
res = dict(
logGroupName=self._log_group,
logStreamName=self._log_stream,
limit=100,
startFromHead=True,
)
if self._next_token:
res["nextToken"] = self._next_token
return res

async def tail_chunk(self) -> List[Dict[str, str]]:
max_sleep = 5.0
SLEEP_TIME = 0.5

while max_sleep > 0:
resp = self._client.get_log_events(**self._get_log_events_args())
events = resp["events"]
self._next_token = resp.get("nextForwardToken")
if events:
return events
else:
max_sleep -= SLEEP_TIME
await asyncio.sleep(SLEEP_TIME)
else:
return []

async def read_all_chunks(self) -> AsyncGenerator[List[Dict[str, str]], None]:
while True:
resp = self._client.get_log_events(**self._get_log_events_args())
events = resp["events"]
self._next_token = resp.get("nextForwardToken")
if events:
yield events
else:
return


async def _wait_build_state(
client, build_id, desired_phase: Optional[str], desired_states: List[str]
) -> Dict[str, Any]:
""" Wait until the build is in one of the desired states, or in the desired phase. """
while True:
resp = client.batch_get_builds(ids=[build_id])
assert len(resp["builds"]) == 1
build = resp["builds"][0]
if build["buildStatus"] in desired_states:
return build
for phase in build["phases"]:
if desired_phase and (phase["phaseType"] == desired_phase):
return build

await asyncio.sleep(2)


def print_log_event(event) -> None:
print(
str(datetime.fromtimestamp(event["timestamp"] / 1000.0)),
event["message"],
end="",
)


async def main() -> None:
parser = argparse.ArgumentParser(description="Process some integers.")
parser.add_argument(
"--project-name", default="feast-ci-project", type=str, help="Project name"
)
parser.add_argument(
"--source-location",
type=str,
help="Source location, e.g. https://github.com/feast/feast.git",
)
parser.add_argument(
"--source-version", type=str, help="Source version, e.g. master"
)
parser.add_argument(
"--location-from-prow", action='store_true', help="Infer source location and version from prow environment variables"
)
args = parser.parse_args()

if args.location_from_prow:
job_spec = json.loads(os.getenv('JOB_SPEC', ''))
source_location = job_spec['refs']['repo_link']
source_version = source_version_from_prow_job_spec(job_spec)
else:
source_location = args.source_location
source_version = args.source_version

await run_build(
project_name=args.project_name,
source_location=source_location,
source_version=source_version,
)

def source_version_from_prow_job_spec(job_spec: Dict[str, Any]) -> str:
pull = job_spec['refs']['pulls'][0]
return f'refs/pull/{pull["number"]}/head^{{{pull["sha"]}}}'

async def run_build(project_name: str, source_version: str, source_location: str):
print(f"Building {project_name} at {source_version}", file=sys.stderr)
logs_client = boto3.client("logs", region_name="us-west-2")
codebuild_client = boto3.client("codebuild", region_name="us-west-2")

print("Submitting the build..", file=sys.stderr)
build_resp = codebuild_client.start_build(
projectName=project_name,
sourceLocationOverride=source_location,
sourceVersion=source_version,
)

build_id = build_resp["build"]["id"]

try:
print(
"Waiting for the INSTALL phase to start before tailing the log",
file=sys.stderr,
)
build = await _wait_build_state(
codebuild_client,
build_id,
desired_phase="INSTALL",
desired_states=["SUCCEEDED", "FAILED", "STOPPED", "TIMED_OUT", "FAULT"],
)

if build["buildStatus"] != "IN_PROGRESS":
print(
f"Build failed before install phase: {build['buildStatus']}",
file=sys.stderr,
)
sys.exit(1)

log_tailer = LogTailer(
logs_client,
log_stream=build["logs"]["streamName"],
log_group=build["logs"]["groupName"],
)

waiter_task = asyncio.create_task(
_wait_build_state(
codebuild_client,
build_id,
desired_phase=None,
desired_states=["SUCCEEDED", "FAILED", "STOPPED", "TIMED_OUT", "FAULT"],
)
)

while not waiter_task.done():
events = await log_tailer.tail_chunk()
for event in events:
print_log_event(event)

build_status = waiter_task.result()["buildStatus"]
if build_status == "SUCCEEDED":
print(f"Build {build_status}", file=sys.stderr)
else:
print(f"Build {build_status}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print(f"Stopping build {build_id}", file=sys.stderr)
codebuild_client.stop_build(id=build_id)


if __name__ == "__main__":
asyncio.run(main())
14 changes: 14 additions & 0 deletions infra/scripts/setup-e2e-env-aws.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

make compile-protos-python

python -m pip install --upgrade pip setuptools wheel

python -m pip install -qr sdk/python/requirements-dev.txt
python -m pip install -qr tests/requirements.txt

# Using mvn -q to make it less verbose. This step happens after docker containers were
# succesfully built so it should be unlikely to fail.
echo "########## Building ingestion jar"
TIMEFORMAT='########## took %R seconds'
time mvn -q --no-transfer-progress -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean package
18 changes: 17 additions & 1 deletion infra/scripts/test-end-to-end-aws.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
#!/usr/bin/env bash

aws sts get-caller-identity
set -euo pipefail

pip install "s3fs" "boto3" "urllib3>=1.25.4"

export DISABLE_FEAST_SERVICE_FIXTURES=1
export DISABLE_SERVICE_FIXTURES=1

PYTHONPATH=sdk/python pytest tests/e2e/ \
--core-url cicd-feast-core:6565 \
--serving-url cicd-feast-online-serving:6566 \
--env aws \
--emr-cluster-id $CLUSTER_ID \
--staging-path $STAGING_PATH \
--redis-url $NODE_IP:32379 \
--emr-region us-west-2 \
--kafka-brokers $NODE_IP:30092 \
-m "not bq"
6 changes: 5 additions & 1 deletion sdk/python/feast/pyspark/launchers/aws/emr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ def _cancel_job(emr_client, job: EmrJobRef):
else:
step_id = job.step_id

emr_client.cancel_steps(ClusterId=job.cluster_id, StepIds=[step_id])
emr_client.cancel_steps(
ClusterId=job.cluster_id,
StepIds=[step_id],
StepCancellationOption="TERMINATE_PROCESS",
)

_wait_for_job_state(
emr_client, EmrJobRef(job.cluster_id, step_id), TERMINAL_STEP_STATES, 180
Expand Down
4 changes: 4 additions & 0 deletions spark/ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@
<pattern>com.google.protobuf</pattern>
<shadedPattern>com.google.protobuf.vendor</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.kafka.vendor</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def pytest_addoption(parser):
parser.addoption("--staging-path", action="store")
parser.addoption("--dataproc-cluster-name", action="store")
parser.addoption("--dataproc-region", action="store")
parser.addoption("--emr-cluster-id", action="store")
parser.addoption("--emr-region", action="store")
parser.addoption("--dataproc-project", action="store")
parser.addoption("--ingestion-jar", action="store")
parser.addoption("--redis-url", action="store", default="localhost:6379")
Expand Down
21 changes: 19 additions & 2 deletions tests/e2e/fixtures/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import uuid
from typing import Optional, Tuple

import pyspark
import pytest
from pytest_redis.executor import RedisExecutor

Expand All @@ -29,7 +28,9 @@ def feast_client(
)

if pytestconfig.getoption("env") == "local":
c = Client(
import pyspark

return Client(
core_url=f"{feast_core[0]}:{feast_core[1]}",
serving_url=f"{feast_serving[0]}:{feast_serving[1]}",
spark_launcher="standalone",
Expand Down Expand Up @@ -62,6 +63,22 @@ def feast_client(
),
**job_service_env,
)
elif pytestconfig.getoption("env") == "aws":
return Client(
core_url=f"{feast_core[0]}:{feast_core[1]}",
serving_url=f"{feast_serving[0]}:{feast_serving[1]}",
spark_launcher="emr",
emr_cluster_id=pytestconfig.getoption("emr_cluster_id"),
emr_region=pytestconfig.getoption("emr_region"),
spark_staging_location=os.path.join(local_staging_path, "emr"),
emr_log_location=os.path.join(local_staging_path, "emr_logs"),
spark_ingestion_jar=ingestion_job_jar,
redis_host=pytestconfig.getoption("redis_url").split(":")[0],
redis_port=pytestconfig.getoption("redis_url").split(":")[1],
historical_feature_output_location=os.path.join(
local_staging_path, "historical_output"
),
)
else:
raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}")

Expand Down
11 changes: 9 additions & 2 deletions tests/e2e/test_historical_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@ def read_parquet(uri):
return pd.read_parquet(parsed_uri.path)
elif parsed_uri.scheme == "gs":
fs = gcsfs.GCSFileSystem()
files = ["gs://" + path for path in gcsfs.GCSFileSystem().glob(uri + "/part-*")]
files = ["gs://" + path for path in fs.glob(uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
elif parsed_uri.scheme == "s3":
import s3fs

fs = s3fs.S3FileSystem()
files = ["s3://" + path for path in fs.glob(uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
else:
raise ValueError("Unsupported scheme")
raise ValueError(f"Unsupported URL scheme {uri}")


def generate_data():
Expand Down
8 changes: 5 additions & 3 deletions tests/e2e/test_online_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ def test_streaming_ingestion(
job = feast_client.start_stream_to_online_ingestion(feature_table)

wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 60
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
)

wait_retry_backoff(
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 60
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 120
)

try:
Expand Down Expand Up @@ -183,7 +183,9 @@ def ingest_and_verify(
original.event_timestamp.max().to_pydatetime() + timedelta(seconds=1),
)

wait_retry_backoff(lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 60)
wait_retry_backoff(
lambda: (None, job.get_status() == SparkJobStatus.COMPLETED), 180
)

features = feast_client.get_online_features(
[f"{feature_table.name}:unique_drivers"],
Expand Down

0 comments on commit 021b232

Please sign in to comment.