diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml index ef3715bcd8..ba7b110c8e 100644 --- a/.github/workflows/complete.yml +++ b/.github/workflows/complete.yml @@ -156,3 +156,54 @@ jobs: - name: copy to gs run: gsutil cp ./spark/ingestion/target/feast-ingestion-spark-${GITHUB_SHA}.jar gs://feast-jobs/spark/ingestion/ + test-end-to-end: + runs-on: [self-hosted] + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-java@v1 + with: + java-version: '11' + - uses: stCarolas/setup-maven@v3 + with: + maven-version: 3.6.3 + - uses: actions/setup-python@v2 + with: + python-version: 3.6 + - name: install + run: | + apt-get update && apt-get install -y redis-server postgresql libpq-dev + make build-java-no-tests REVISION=develop + python -m pip install --upgrade pip setuptools wheel + make install-python + python -m pip install -qr tests/requirements.txt + - name: run tests + run: su -p postgres -c "PATH=$PATH HOME=/tmp pytest tests/e2e/ --feast-version develop" + + test-end-to-end-gcp: + runs-on: [self-hosted] + env: + DISABLE_SERVICE_FIXTURES: true + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-java@v1 + with: + java-version: '11' + - uses: stCarolas/setup-maven@v3 + with: + maven-version: 3.6.3 + - uses: actions/setup-python@v2 + with: + python-version: 3.6 + - name: install + run: | + apt-get update && apt-get install -y redis-server postgresql libpq-dev + make build-java-no-tests REVISION=develop + python -m pip install --upgrade pip setuptools wheel + make install-python + python -m pip install -qr tests/requirements.txt + - name: run tests + run: > + su -p postgres -c "PATH=$PATH HOME=/tmp pytest tests/e2e/ + --feast-version develop --env=gcloud --dataproc-cluster-name feast-e2e + --dataproc-project kf-feast --dataproc-region us-central1 + --redis-url 10.128.0.105:6379 --redis-cluster --kafka-brokers 10.128.0.103:9094" diff --git a/.prow/config.yaml b/.prow/config.yaml index 39c275603d..ec18e74586 100644 --- a/.prow/config.yaml +++ b/.prow/config.yaml @@ -141,118 +141,6 @@ presubmits: - image: golang:1.13 command: ["infra/scripts/test-golang-sdk.sh"] - - name: test-end-to-end - decorate: true - always_run: true - spec: - containers: - - image: maven:3.6-jdk-11 - command: ["infra/scripts/test-end-to-end.sh"] - resources: - requests: - cpu: "6" - memory: "6144Mi" - env: - - name: GOOGLE_APPLICATION_CREDENTIALS - value: /etc/gcloud/service-account.json - volumeMounts: - - mountPath: /etc/gcloud/service-account.json - name: service-account - readOnly: true - subPath: service-account.json - volumes: - - name: service-account - secret: - secretName: feast-service-account - skip_branches: - - ^v0\.(3|4)-branch$ - - - name: test-end-to-end-auth - decorate: true - always_run: true - spec: - containers: - - image: maven:3.6-jdk-11 - command: ["infra/scripts/test-end-to-end.sh", "True"] - resources: - requests: - cpu: "6" - memory: "6144Mi" - env: - - name: GOOGLE_APPLICATION_CREDENTIALS - value: /etc/gcloud/service-account.json - volumeMounts: - - mountPath: /etc/gcloud/service-account.json - name: service-account - readOnly: true - subPath: service-account.json - volumes: - - name: service-account - secret: - secretName: feast-service-account - skip_branches: - - ^v0\.(3|4)-branch$ - - - name: test-end-to-end-redis-cluster - decorate: true - always_run: true - spec: - containers: - - image: maven:3.6-jdk-11 - command: ["infra/scripts/test-end-to-end-redis-cluster.sh"] - resources: - requests: - cpu: "6" - memory: "6144Mi" - env: - - name: GOOGLE_APPLICATION_CREDENTIALS - value: /etc/gcloud/service-account.json - volumeMounts: - - mountPath: /etc/gcloud/service-account.json - name: service-account - readOnly: true - subPath: service-account.json - volumes: - - name: service-account - secret: - secretName: feast-service-account - skip_branches: - - ^v0\.(3|4)-branch$ - - - name: test-end-to-end-java-8 - decorate: true - always_run: true - spec: - containers: - - image: maven:3.6-jdk-8 - command: ["infra/scripts/test-end-to-end.sh"] - resources: - requests: - cpu: "6" - memory: "6144Mi" - branches: - - ^v0\.(3|4)-branch$ - - - name: test-end-to-end-batch-java-8 - decorate: true - always_run: true - spec: - volumes: - - name: service-account - secret: - secretName: feast-service-account - containers: - - image: maven:3.6-jdk-8 - command: ["infra/scripts/test-end-to-end-batch.sh"] - resources: - requests: - cpu: "6" - memory: "6144Mi" - volumeMounts: - - name: service-account - mountPath: "/etc/service-account" - branches: - - ^v0\.(3|4)-branch$ postsubmits: feast-dev/feast: diff --git a/Makefile b/Makefile index de0a1b1c4c..f0f0cc5f1f 100644 --- a/Makefile +++ b/Makefile @@ -70,7 +70,7 @@ compile-protos-python: install-python-ci-dependencies cd ${ROOT_DIR}/protos; python -m grpc_tools.protoc -I. --python_out=../sdk/python/ --grpc_python_out=../sdk/python/ --mypy_out=../sdk/python/ feast/third_party/grpc/health/v1/*.proto install-python: compile-protos-python - cd sdk/python; python setup.py develop + python -m pip install -e sdk/python test-python: pytest --verbose --color=yes sdk/python/tests @@ -90,10 +90,10 @@ lint-python: cd ${ROOT_DIR}/sdk/python; flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; black --check feast tests - cd ${ROOT_DIR}/tests/e2e; mypy . - cd ${ROOT_DIR}/tests/e2e; isort . --check-only - cd ${ROOT_DIR}/tests/e2e; flake8 . - cd ${ROOT_DIR}/tests/e2e; black --check . + cd ${ROOT_DIR}/tests; mypy e2e + cd ${ROOT_DIR}/tests; isort e2e --check-only + cd ${ROOT_DIR}/tests; flake8 e2e + cd ${ROOT_DIR}/tests; black --check e2e # Go SDK diff --git a/infra/scripts/helm/kafka-values.tpl.yaml b/infra/scripts/helm/kafka-values.tpl.yaml new file mode 100644 index 0000000000..206323f337 --- /dev/null +++ b/infra/scripts/helm/kafka-values.tpl.yaml @@ -0,0 +1,18 @@ +externalAccess: + enabled: true + service: + loadBalancerIPs: + - $feast_kafka_ip + annotations: + cloud.google.com/load-balancer-type: Internal + loadBalancerSourceRanges: + - 10.0.0.0/8 + - 172.16.0.0/12 + - 192.168.0.0/16 + +persistence: + enabled: false + +zookeeper: + persistence: + enabled: false \ No newline at end of file diff --git a/infra/scripts/helm/redis-cluster-values.tpl.yaml b/infra/scripts/helm/redis-cluster-values.tpl.yaml new file mode 100644 index 0000000000..5b15252495 --- /dev/null +++ b/infra/scripts/helm/redis-cluster-values.tpl.yaml @@ -0,0 +1,17 @@ +cluster: + nodes: 3 + replicas: 0 + externalAccess: + enabled: true + service: + annotations: + cloud.google.com/load-balancer-type: Internal + loadBalancerIP: + - $feast_redis_1_ip + - $feast_redis_2_ip + - $feast_redis_3_ip + +persistence: + enabled: false + +usePassword: false \ No newline at end of file diff --git a/infra/scripts/setup-common-functions.sh b/infra/scripts/setup-common-functions.sh index 40d5b6badf..a70dacb6d4 100755 --- a/infra/scripts/setup-common-functions.sh +++ b/infra/scripts/setup-common-functions.sh @@ -9,136 +9,6 @@ install_test_tools() { apt-get -y install wget netcat kafkacat build-essential } -install_gcloud_sdk() { - print_banner "Installing Google Cloud SDK" - if [[ ! $(command -v gsutil) ]]; then - CURRENT_DIR=$(dirname "$BASH_SOURCE") - . "${CURRENT_DIR}"/install-google-cloud-sdk.sh - fi - - export GOOGLE_APPLICATION_CREDENTIALS - gcloud auth activate-service-account --key-file ${GOOGLE_APPLICATION_CREDENTIALS} -} - -install_and_start_local_redis() { - print_banner "Installing and tarting Redis at localhost:6379" - # Allow starting serving in this Maven Docker image. Default set to not allowed. - echo "exit 0" >/usr/sbin/policy-rc.d - apt-get -y install redis-server >/var/log/redis.install.log - redis-server --daemonize yes - redis-cli ping -} - -install_and_start_local_redis_cluster() { - print_banner "Installing Redis at localhost:6379" - echo "exit 0" >/usr/sbin/policy-rc.d - ${SCRIPTS_DIR}/setup-redis-cluster.sh - redis-cli -c -p 7000 ping -} - -install_and_start_local_postgres() { - print_banner "Installing and starting Postgres at localhost:5432" - apt-get -y install postgresql >/var/log/postgresql.install.log - service postgresql start - # Initialize with database: 'postgres', user: 'postgres', password: 'password' - cat </tmp/update-postgres-role.sh -psql -c "ALTER USER postgres PASSWORD 'password';" -EOF - chmod +x /tmp/update-postgres-role.sh - su -s /bin/bash -c /tmp/update-postgres-role.sh postgres - export PGPASSWORD=password - pg_isready -} - -install_and_start_local_zookeeper_and_kafka() { - print_banner "Installing and starting Zookeeper at localhost:2181 and Kafka at localhost:9092" - wget -qO- https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz | tar xz - mv kafka_2.12-2.3.0/ /tmp/kafka - - nohup /tmp/kafka/bin/zookeeper-server-start.sh /tmp/kafka/config/zookeeper.properties &>/var/log/zookeeper.log 2>&1 & - ${SCRIPTS_DIR}/wait-for-it.sh localhost:2181 --timeout=20 - tail -n10 /var/log/zookeeper.log - - nohup /tmp/kafka/bin/kafka-server-start.sh /tmp/kafka/config/server.properties &>/var/log/kafka.log 2>&1 & - ${SCRIPTS_DIR}/wait-for-it.sh localhost:9092 --timeout=40 - tail -n10 /var/log/kafka.log - kafkacat -b localhost:9092 -L -} - -build_feast_core_and_serving() { - print_banner "Building Feast Core and Feast Serving" - infra/scripts/download-maven-cache.sh \ - --archive-uri gs://feast-templocation-kf-feast/.m2.2020-08-19.tar \ - --output-dir /root/ - - # Build jars for Feast - mvn --quiet --batch-mode -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean package - - ls -lh core/target/*jar - ls -lh serving/target/*jar - ls -lh job-controller/target/*jar -} - -start_feast_core() { - print_banner "Starting Feast Core" - - if [ -n "$1" ]; then - echo "Custom Spring application.yml location provided: $1" - export CONFIG_ARG="--spring.config.location=classpath:/application.yml,file://$1" - fi - - nohup java -jar core/target/feast-core-$FEAST_BUILD_VERSION-exec.jar $CONFIG_ARG &>/var/log/feast-core.log & - ${SCRIPTS_DIR}/wait-for-it.sh localhost:6565 --timeout=90 - - tail -n10 /var/log/feast-core.log - nc -w2 localhost 6565 /var/log/feast-jobcontroller.log & - ${SCRIPTS_DIR}/wait-for-it.sh localhost:6570 --timeout=90 - - tail -n10 /var/log/feast-jobcontroller.log - nc -w2 localhost 6570 /var/log/feast-serving-online.log & - ${SCRIPTS_DIR}/wait-for-it.sh localhost:6566 --timeout=60 - - tail -n100 /var/log/feast-serving-online.log - nc -w2 localhost 6566 helm/kafka-values.yaml +envsubst '$feast_redis_1_ip,$feast_redis_2_ip,$feast_redis_3_ip' < helm/redis-cluster-values.tpl.yaml > helm/redis-cluster-values.yaml + +helm install e2e-kafka bitnami/kafka \ + --values helm/kafka-values.yaml --namespace infra --create-namespace + +helm install e2e-redis-cluster bitnami/redis-cluster \ + --values helm/redis-cluster-values.yaml --namespace infra \ + --create-namespace \ No newline at end of file diff --git a/infra/scripts/setup-redis-cluster.sh b/infra/scripts/setup-redis-cluster.sh deleted file mode 100755 index a193970531..0000000000 --- a/infra/scripts/setup-redis-cluster.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env bash - -apt-get -y install redis-server > /var/log/redis.install.log - -mkdir 7000 7001 7002 7003 7004 7005 -for i in {0..5} ; do -echo "port 700$i -cluster-enabled yes -cluster-config-file nodes-$i.conf -cluster-node-timeout 5000 -appendonly yes" > 700$i/redis.conf -redis-server 700$i/redis.conf --daemonize yes -done -echo yes | redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 \ -127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \ ---cluster-replicas 1 diff --git a/infra/scripts/test-docker-compose.sh b/infra/scripts/test-docker-compose.sh index 8ace2cdc23..f44f6da1cb 100755 --- a/infra/scripts/test-docker-compose.sh +++ b/infra/scripts/test-docker-compose.sh @@ -57,5 +57,9 @@ export FEAST_ONLINE_SERVING_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .N ${PROJECT_ROOT_DIR}/infra/scripts/wait-for-it.sh ${FEAST_ONLINE_SERVING_CONTAINER_IP_ADDRESS}:6566 --timeout=120 # Run e2e tests for Redis -docker exec -e FEAST_VERSION=${FEAST_VERSION} feast_jupyter_1 bash \ --c 'cd /feast/tests/e2e && unset GOOGLE_APPLICATION_CREDENTIALS && pytest *.py -m "not bq" --ingestion-jar gs://feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core_url core:6565 --serving_url=online_serving:6566 --kafka_brokers=kafka:9092' +docker exec \ + -e FEAST_VERSION=${FEAST_VERSION} \ + -e DISABLE_SERVICE_FIXTURES=true \ + -e DISABLE_FEAST_SERVICE_FIXTURES=true \ + feast_jupyter_1 bash \ + -c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ -m "not bq" --ingestion-jar gs://feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --kafka-brokers kafka:9092' diff --git a/infra/scripts/test-end-to-end-redis-cluster.sh b/infra/scripts/test-end-to-end-redis-cluster.sh deleted file mode 100755 index a198998f20..0000000000 --- a/infra/scripts/test-end-to-end-redis-cluster.sh +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/env bash - -set -e -set -o pipefail - -test -z ${GOOGLE_APPLICATION_CREDENTIALS} && GOOGLE_APPLICATION_CREDENTIALS="/etc/gcloud/service-account.json" -test -z ${SKIP_BUILD_JARS} && SKIP_BUILD_JARS="false" -test -z ${GOOGLE_CLOUD_PROJECT} && GOOGLE_CLOUD_PROJECT="kf-feast" -test -z ${TEMP_BUCKET} && TEMP_BUCKET="feast-templocation-kf-feast" -test -z ${JOBS_STAGING_LOCATION} && JOBS_STAGING_LOCATION="gs://${TEMP_BUCKET}/staging-location" - -# Get the current build version using maven (and pom.xml) -export FEAST_BUILD_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) -echo Building version: $FEAST_BUILD_VERSION - -# Get Feast project repository root and scripts directory -export PROJECT_ROOT_DIR=$(git rev-parse --show-toplevel) -export SCRIPTS_DIR=${PROJECT_ROOT_DIR}/infra/scripts - -echo " -This script will run end-to-end tests for Feast Core and Online Serving. - -1. Install Redis as the store for Feast Online Serving. -2. Install Postgres for persisting Feast metadata. -3. Install Kafka and Zookeeper as the Source in Feast. -4. Install Python 3.7.4, Feast Python SDK and run end-to-end tests from - tests/e2e via pytest. -" - -source ${SCRIPTS_DIR}/setup-common-functions.sh - -install_test_tools -install_gcloud_sdk -install_and_start_local_redis_cluster -install_and_start_local_postgres -install_and_start_local_zookeeper_and_kafka - -if [[ ${SKIP_BUILD_JARS} != "true" ]]; then - build_feast_core_and_serving -else - echo "[DEBUG] Skipping building jars" -fi - -# Start Feast Core with auth if enabled -cat < /tmp/jc.warehouse.application.yml -feast: - core-host: localhost - core-port: 6565 - jobs: - polling_interval_milliseconds: 5000 - active_runner: direct - runners: - - name: direct - type: DirectRunner - options: {} -EOF - -start_feast_core -start_feast_jobcontroller /tmp/jc.warehouse.application.yml - -cat < /tmp/serving.online.application.yml -feast: - core-host: localhost - core-grpc-port: 6565 - - active_store: online - - # List of store configurations - stores: - - name: online # Name of the store (referenced by active_store) - type: REDIS_CLUSTER # Type of the store. REDIS, BIGQUERY are available options - config: - # Connection string specifies the IP and ports of Redis instances in Redis cluster - connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005" - flush_frequency_seconds: 1 - # Subscriptions indicate which feature tables needs to be retrieved and used to populate this store - subscriptions: - # Wildcards match all options. No filtering is done. - - name: "*" - project: "*" - version: "*" - - tracing: - enabled: false - -spring: - main: - web-environment: false - -EOF - -start_feast_serving /tmp/serving.online.application.yml - -install_python_with_miniconda_and_feast_sdk - -print_banner "Running end-to-end tests with pytest at 'tests/e2e'" - -# Default artifact location setting in Prow jobs -LOGS_ARTIFACT_PATH=/logs/artifacts - -ORIGINAL_DIR=$(pwd) -cd tests/e2e - -set +e -CORE_NO=$(nproc --all) -pytest *.py -n ${CORE_NO} --redis-url localhost:7000 \ - --dist=loadscope --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml -TEST_EXIT_CODE=$? - -if [[ ${TEST_EXIT_CODE} != 0 ]]; then - echo "[DEBUG] Printing logs" - ls -ltrh /var/log/feast* - cat /var/log/feast-serving-online.log /var/log/feast-core.log - - echo "[DEBUG] Printing Python packages list" - pip list -fi - -cd ${ORIGINAL_DIR} -exit ${TEST_EXIT_CODE} diff --git a/infra/scripts/test-end-to-end.sh b/infra/scripts/test-end-to-end.sh deleted file mode 100755 index 51b55b1763..0000000000 --- a/infra/scripts/test-end-to-end.sh +++ /dev/null @@ -1,136 +0,0 @@ -#!/usr/bin/env bash - -set -e -set -o pipefail -[[ $1 == "True" ]] && ENABLE_AUTH="true" || ENABLE_AUTH="false" -echo "Authenication enabled : ${ENABLE_AUTH}" - -test -z ${GOOGLE_APPLICATION_CREDENTIALS} && GOOGLE_APPLICATION_CREDENTIALS="/etc/gcloud/service-account.json" -test -z ${SKIP_BUILD_JARS} && SKIP_BUILD_JARS="false" -test -z ${GOOGLE_CLOUD_PROJECT} && GOOGLE_CLOUD_PROJECT="kf-feast" -test -z ${TEMP_BUCKET} && TEMP_BUCKET="feast-templocation-kf-feast" -test -z ${JOBS_STAGING_LOCATION} && JOBS_STAGING_LOCATION="gs://${TEMP_BUCKET}/staging-location" - -# Get the current build version using maven (and pom.xml) -export FEAST_BUILD_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) -echo Building version: $FEAST_BUILD_VERSION - -# Get Feast project repository root and scripts directory -export PROJECT_ROOT_DIR=$(git rev-parse --show-toplevel) -export SCRIPTS_DIR=${PROJECT_ROOT_DIR}/infra/scripts - -echo " -This script will run end-to-end tests for Feast Core and Online Serving. - -1. Install Redis as the store for Feast Online Serving. -2. Install Postgres for persisting Feast metadata. -3. Install Kafka and Zookeeper as the Source in Feast. -4. Install Python 3.7.4, Feast Python SDK and run end-to-end tests from - tests/e2e via pytest. -" - -source ${SCRIPTS_DIR}/setup-common-functions.sh - -install_test_tools -install_gcloud_sdk -install_and_start_local_redis -install_and_start_local_postgres -install_and_start_local_zookeeper_and_kafka - -if [[ ${SKIP_BUILD_JARS} != "true" ]]; then - build_feast_core_and_serving -else - echo "[DEBUG] Skipping building jars" -fi - -# Start Feast Core with auth if enabled -cat < /tmp/core.warehouse.application.yml -feast: - security: - authentication: - enabled: true - provider: jwt - options: - jwkEndpointURI: "https://www.googleapis.com/oauth2/v3/certs" - authorization: - enabled: false - provider: none -EOF - -cat < /tmp/jc.warehouse.application.yml -feast: - core-host: localhost - core-port: 6565 - jobs: - polling_interval_milliseconds: 5000 - active_runner: direct - runners: - - name: direct - type: DirectRunner - options: {} -EOF - -cat < /tmp/serving.warehouse.application.yml -feast: - stores: - - name: online - type: REDIS - config: - host: localhost - port: 6379 - flush_frequency_seconds: 1 - subscriptions: - - name: "*" - project: "*" - core-authentication: - enabled: $ENABLE_AUTH - provider: google - security: - authentication: - enabled: $ENABLE_AUTH - provider: jwt - authorization: - enabled: false - provider: none -EOF - -if [[ ${ENABLE_AUTH} = "true" ]]; - then - print_banner "Starting Feast core with auth" - start_feast_core /tmp/core.warehouse.application.yml - print_banner "Starting Feast Serving with auth" - else - print_banner "Starting Feast core without auth" - start_feast_core - print_banner "Starting Feast Serving without auth" -fi - -start_feast_jobcontroller /tmp/jc.warehouse.application.yml -start_feast_serving /tmp/serving.warehouse.application.yml -install_python_with_miniconda_and_feast_sdk - -print_banner "Running end-to-end tests with pytest at 'tests/e2e'" - -# Default artifact location setting in Prow jobs -LOGS_ARTIFACT_PATH=/logs/artifacts - -ORIGINAL_DIR=$(pwd) -cd tests/e2e - -set +e -export GOOGLE_APPLICATION_CREDENTIALS=/etc/gcloud/service-account.json -CORE_NO=$(nproc --all) -pytest *.py -n ${CORE_NO} --dist=loadscope --enable_auth=${ENABLE_AUTH} --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml -TEST_EXIT_CODE=$? - -if [[ ${TEST_EXIT_CODE} != 0 ]]; then - echo "[DEBUG] Printing logs" - ls -ltrh /var/log/feast* - cat /var/log/feast-serving-online.log /var/log/feast-core.log - - echo "[DEBUG] Printing Python packages list" - pip list -fi - -cd ${ORIGINAL_DIR} -exit ${TEST_EXIT_CODE} diff --git a/infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml b/infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml deleted file mode 100644 index 7eea4d40ef..0000000000 --- a/infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml +++ /dev/null @@ -1,178 +0,0 @@ -feast-core: - # feast-core.enabled -- Flag to install Feast Core - enabled: true - gcpServiceAccount: - enabled: true - postgresql: - existingSecret: feast-postgresql - service: - type: LoadBalancer - image: - tag: $IMAGE_TAG - logLevel: INFO - application-override.yaml: - feast: - stream: - options: - bootstrapServers: $feast_kafka_1_ip:31090 - topic: $FEATURES_TOPIC - -feast-jobcontroller: - enabled: true - gcpServiceAccount: - enabled: true - service: - type: LoadBalancer - image: - tag: $IMAGE_TAG - application-override.yaml: - feast: - stream: - options: - bootstrapServers: $feast_kafka_1_ip:31090 - specsOptions: - specsTopic: $SPECS_TOPIC - specsAckTopic: $SPECS_TOPIC-ack - jobs: - active_runner: dataflow - controller: - consolidate-jobs-per-source: true - jobSelector: - application: feast - tag: $IMAGE_TAG - featureSetSelector: - - project: "*" - name: "*" - whitelisted-stores: - - online - - historical - runners: - - name: dataflow - type: DataflowRunner - options: - project: $GCLOUD_PROJECT - region: $GCLOUD_REGION - workerZone: $GCLOUD_REGION-a - tempLocation: gs://$TEMP_BUCKET/tempLocation - network: $GCLOUD_NETWORK - subnetwork: regions/$GCLOUD_REGION/subnetworks/$GCLOUD_SUBNET - maxNumWorkers: 1 - autoscalingAlgorithm: THROUGHPUT_BASED - usePublicIps: false - workerMachineType: n1-standard-1 - deadLetterTableSpec: $GCLOUD_PROJECT:$DATASET_NAME.deadletter - - metrics: - enabled: true - host: $feast_statsd_ip - -feast-online-serving: - # feast-online-serving.enabled -- Flag to install Feast Online Serving - enabled: true - image: - tag: $IMAGE_TAG - service: - type: LoadBalancer - application-override.yaml: - feast: - active_store: online - - # List of store configurations - stores: - - name: online - type: REDIS - config: - host: $feast_redis_ip - port: 6379 - subscriptions: - - name: "*" - project: "*" - version: "*" - -feast-batch-serving: - # feast-batch-serving.enabled -- Flag to install Feast Batch Serving - enabled: true - image: - tag: $IMAGE_TAG - gcpServiceAccount: - enabled: true - service: - type: LoadBalancer - application-override.yaml: - feast: - active_store: historical - - # List of store configurations - stores: - - name: historical - type: BIGQUERY - config: - project_id: $GCLOUD_PROJECT - dataset_id: $DATASET_NAME - staging_location: gs://$TEMP_BUCKET/stagingLocation - initial_retry_delay_seconds: 3 - total_timeout_seconds: 21600 - write_triggering_frequency_seconds: 1 - subscriptions: - - name: "*" - project: "*" - version: "*" - job_store: - redis_host: $HELM_COMMON_NAME-redis-master - -postgresql: - # postgresql.enabled -- Flag to install Postgresql - enabled: true - existingSecret: feast-postgresql - -kafka: - # kafka.enabled -- Flag to install Kafka - enabled: true - external: - enabled: true - type: LoadBalancer - annotations: - cloud.google.com/load-balancer-type: Internal - loadBalancerSourceRanges: - - 10.0.0.0/8 - - 172.16.0.0/12 - - 192.168.0.0/16 - firstListenerPort: 31090 - loadBalancerIP: - - $feast_kafka_1_ip - - $feast_kafka_2_ip - - $feast_kafka_3_ip - configurationOverrides: - "advertised.listeners": |- - EXTERNAL://${LOAD_BALANCER_IP}:31090 - "listener.security.protocol.map": |- - PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT - "log.retention.hours": 1 - -redis: - # redis.enabled -- Flag to install Redis - enabled: true - usePassword: false - master: - service: - type: LoadBalancer - loadBalancerIP: $feast_redis_ip - annotations: - cloud.google.com/load-balancer-type: Internal - loadBalancerSourceRanges: - - 10.0.0.0/8 - - 172.16.0.0/12 - - 192.168.0.0/16 - -prometheus-statsd-exporter: - # prometheus-statsd-exporter.enabled -- Flag to install StatsD to Prometheus Exporter - enabled: true - service: - type: LoadBalancer - annotations: - cloud.google.com/load-balancer-type: Internal - loadBalancerSourceRanges: - - 10.0.0.0/8 - - 172.16.0.0/12 - - 192.168.0.0/16 - loadBalancerIP: $feast_statsd_ip diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index 8777fe9f38..f3fa46032a 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -1,11 +1,11 @@ import os import uuid -from typing import List, cast +from functools import partial +from typing import Any, Callable, Dict, List from urllib.parse import urlparse from google.api_core.operation import Operation from google.cloud import dataproc_v1 -from google.cloud.dataproc_v1 import Job as DataprocJob from google.cloud.dataproc_v1 import JobStatus from feast.pyspark.abc import ( @@ -25,29 +25,36 @@ class DataprocJobMixin: - def __init__(self, operation: Operation): + def __init__(self, operation: Operation, cancel_fn: Callable[[], None]): """ :param operation: (google.api.core.operation.Operation): A Future for the spark job result, returned by the dataproc client. """ self._operation = operation + self._cancel_fn = cancel_fn def get_id(self) -> str: return self._operation.metadata.job_id def get_status(self) -> SparkJobStatus: - if self._operation.running(): - return SparkJobStatus.IN_PROGRESS + self._operation._refresh_and_update() - job = cast(DataprocJob, self._operation.result()) - status = cast(JobStatus, job.status) - if status.state == JobStatus.State.DONE: - return SparkJobStatus.COMPLETED + status = self._operation.metadata.status + if status.state == JobStatus.State.ERROR: + return SparkJobStatus.FAILED + elif status.state == JobStatus.State.RUNNING: + return SparkJobStatus.IN_PROGRESS + elif status.state in ( + JobStatus.State.PENDING, + JobStatus.State.SETUP_DONE, + JobStatus.State.STATE_UNSPECIFIED, + ): + return SparkJobStatus.STARTING - return SparkJobStatus.FAILED + return SparkJobStatus.COMPLETED def cancel(self): - self._operation.cancel() + self._cancel_fn() class DataprocRetrievalJob(DataprocJobMixin, RetrievalJob): @@ -55,14 +62,16 @@ class DataprocRetrievalJob(DataprocJobMixin, RetrievalJob): Historical feature retrieval job result for a Dataproc cluster """ - def __init__(self, operation: Operation, output_file_uri: str): + def __init__( + self, operation: Operation, cancel_fn: Callable[[], None], output_file_uri: str + ): """ This is the returned historical feature retrieval job result for DataprocClusterLauncher. Args: output_file_uri (str): Uri to the historical feature retrieval job output file. """ - super().__init__(operation) + super().__init__(operation, cancel_fn) self._output_file_uri = output_file_uri def get_output_file_uri(self, timeout_sec=None): @@ -130,21 +139,36 @@ def _stage_files(self, pyspark_script: str, job_id: str) -> str: blob_path = os.path.join( self.remote_path, job_id, os.path.basename(pyspark_script), ) - staging_client.upload_file(blob_path, self.staging_bucket, pyspark_script) + staging_client.upload_file(pyspark_script, self.staging_bucket, blob_path) return f"gs://{self.staging_bucket}/{blob_path}" def dataproc_submit(self, job_params: SparkJobParameters) -> Operation: local_job_id = str(uuid.uuid4()) - pyspark_gcs = self._stage_files(job_params.get_main_file_path(), local_job_id) - job_config = { + main_file_uri = self._stage_files(job_params.get_main_file_path(), local_job_id) + job_config: Dict[str, Any] = { "reference": {"job_id": local_job_id}, "placement": {"cluster_name": self.cluster_name}, - "pyspark_job": { - "main_python_file_uri": pyspark_gcs, - "args": job_params.get_arguments(), - }, } + if job_params.get_class_name(): + job_config.update( + { + "spark_job": { + "jar_file_uris": [main_file_uri], + "main_class": job_params.get_class_name(), + "args": job_params.get_arguments(), + } + } + ) + else: + job_config.update( + { + "pyspark_job": { + "main_python_file_uri": main_file_uri, + "args": job_params.get_arguments(), + } + } + ) return self.job_client.submit_job_as_operation( request={ "project_id": self.project_id, @@ -153,22 +177,33 @@ def dataproc_submit(self, job_params: SparkJobParameters) -> Operation: } ) + def dataproc_cancel(self, job_id): + self.job_client.cancel_job( + project_id=self.project_id, region=self.region, job_id=job_id + ) + def historical_feature_retrieval( self, job_params: RetrievalJobParameters ) -> RetrievalJob: + operation = self.dataproc_submit(job_params) + cancel_fn = partial(self.dataproc_cancel, operation.metadata.job_id) return DataprocRetrievalJob( - self.dataproc_submit(job_params), job_params.get_destination_path() + operation, cancel_fn, job_params.get_destination_path() ) def offline_to_online_ingestion( self, ingestion_job_params: BatchIngestionJobParameters ) -> BatchIngestionJob: - return DataprocBatchIngestionJob(self.dataproc_submit(ingestion_job_params)) + operation = self.dataproc_submit(ingestion_job_params) + cancel_fn = partial(self.dataproc_cancel, operation.metadata.job_id) + return DataprocBatchIngestionJob(operation, cancel_fn) def start_stream_to_online_ingestion( self, ingestion_job_params: StreamIngestionJobParameters ) -> StreamIngestionJob: - return DataprocStreamingIngestionJob(self.dataproc_submit(ingestion_job_params)) + operation = self.dataproc_submit(ingestion_job_params) + cancel_fn = partial(self.dataproc_cancel, operation.metadata.job_id) + return DataprocStreamingIngestionJob(operation, cancel_fn) def stage_dataframe( self, df, event_timestamp_column: str, created_timestamp_column: str, diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt index 2b0b87bd81..258bca8aab 100644 --- a/sdk/python/requirements-ci.txt +++ b/sdk/python/requirements-ci.txt @@ -2,13 +2,8 @@ cryptography==3.1 flake8 black==19.10b0 isort>=5 -grpcio-tools +grpcio-tools==1.31.0 mypy-protobuf -pytest -pytest-lazy-fixture==0.6.3 -pytest-mock -pytest-timeout -pytest-ordering==0.6.* pyspark==2.4.2 pandas~=1.0.0 mock==2.0.0 @@ -17,6 +12,11 @@ moto mypy mypy-protobuf avro==1.10.0 -confluent_kafka gcsfs -urllib3>=1.25.4 \ No newline at end of file +urllib3>=1.25.4 +google-cloud-dataproc==2.0.2 +pytest==6.0.0 +pytest-lazy-fixture==0.6.3 +pytest-timeout==1.4.2 +pytest-ordering==0.6.* +pytest-mock==1.10.4 \ No newline at end of file diff --git a/sdk/python/requirements-dev.txt b/sdk/python/requirements-dev.txt index f34c0c8924..ca845e7f5a 100644 --- a/sdk/python/requirements-dev.txt +++ b/sdk/python/requirements-dev.txt @@ -1,15 +1,15 @@ Click==7.* -google-api-core==1.* -google-auth==1.* -google-cloud-bigquery==1.* -google-cloud-bigquery-storage==0.* -google-cloud-dataproc==2.* -google-cloud-storage==1.* +google-api-core==1.22.4 +google-auth==1.22.1 +google-cloud-bigquery==1.18 +google-cloud-bigquery-storage==0.7.0 +google-cloud-dataproc==2.0.2 +google-cloud-storage==1.20.0 google-resumable-media>=0.5 -googleapis-common-protos==1.* -grpcio==1.* -grpcio-testing==1.* -grpcio-tools +googleapis-common-protos==1.52.0 +grpcio==1.31.0 +grpcio-testing==1.31.0 +grpcio-tools==1.31.0 numpy mock==2.0.0 pandas~=1.0.0 @@ -28,7 +28,7 @@ toml==0.10.* tqdm==4.* google pandavro==1.5.* -kafka-python==1.* +kafka-python==2.0.2 tabulate==0.8.* isort>=5 mypy diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 19e567ecd3..b8481f23d5 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -26,14 +26,14 @@ REQUIRED = [ "Click==7.*", - "google-api-core==1.20.*", + "google-api-core==1.22.4", "google-auth<2.0dev,>=1.14.0", "google-cloud-bigquery==1.18.*", "google-cloud-storage==1.20.*", "google-cloud-core==1.0.*", - "googleapis-common-protos==1.*", + "googleapis-common-protos==1.52.*", "google-cloud-bigquery-storage==0.7.*", - "grpcio==1.*", + "grpcio==1.31.0", "pandas~=1.0.0", "pandavro==1.5.*", "protobuf>=3.10", diff --git a/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala b/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala index 5c70da2b86..458bcc21d0 100644 --- a/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala +++ b/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala @@ -25,7 +25,7 @@ import org.apache.spark.SecurityManager import org.apache.spark.internal.Logging import org.apache.spark.metrics.MetricsSystem -private[spark] class StatsdSinkWithTags( +class StatsdSinkWithTags( val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 321ba5dcfc..c96ff7c3be 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -1,21 +1,15 @@ import os -from pathlib import Path -import pyspark import pytest -from feast import Client - def pytest_addoption(parser): - parser.addoption("--core_url", action="store", default="localhost:6565") - parser.addoption("--serving_url", action="store", default="localhost:6566") - parser.addoption("--allow_dirty", action="store", default="False") + parser.addoption("--core-url", action="store", default="localhost:6565") + parser.addoption("--serving-url", action="store", default="localhost:6566") parser.addoption( "--gcs_path", action="store", default="gs://feast-templocation-kf-feast/" ) - parser.addoption("--enable_auth", action="store", default="False") - parser.addoption("--kafka_brokers", action="store", default="localhost:9092") + parser.addoption("--kafka-brokers", action="store", default="localhost:9092") parser.addoption("--env", action="store", help="local|aws|gcloud", default="local") parser.addoption( @@ -26,6 +20,8 @@ def pytest_addoption(parser): parser.addoption("--dataproc-project", action="store") parser.addoption("--ingestion-jar", action="store") parser.addoption("--redis-url", action="store", default="localhost:6379") + parser.addoption("--redis-cluster", action="store_true") + parser.addoption("--feast-version", action="store") def pytest_runtest_makereport(item, call): @@ -42,50 +38,29 @@ def pytest_runtest_setup(item): pytest.xfail("previous test failed (%s)" % previousfailed.name) -@pytest.fixture(scope="session") -def feast_version(): - return "0.8-SNAPSHOT" - - -@pytest.fixture(scope="session") -def ingestion_job_jar(pytestconfig, feast_version): - default_path = ( - Path(__file__).parent.parent.parent - / "spark" - / "ingestion" - / "target" - / f"feast-ingestion-spark-{feast_version}.jar" +from .fixtures.base import project_root, project_version # noqa +from .fixtures.client import ( # noqa + feast_client, + global_staging_path, + ingestion_job_jar, + local_staging_path, +) + +if not os.environ.get("DISABLE_SERVICE_FIXTURES"): + from .fixtures.services import ( # noqa + kafka_port, + kafka_server, + redis_server, + zookeeper_server, + ) +else: + from .fixtures.external_services import kafka_server, redis_server # noqa + +if not os.environ.get("DISABLE_FEAST_SERVICE_FIXTURES"): + from .fixtures.feast_services import * # type: ignore # noqa + from .fixtures.services import postgres_server # noqa +else: + from .fixtures.external_services import ( # type: ignore # noqa + feast_core, + feast_serving, ) - - return pytestconfig.getoption("ingestion_jar") or f"file://{default_path}" - - -@pytest.fixture(scope="session") -def feast_client(pytestconfig, ingestion_job_jar): - redis_host, redis_port = pytestconfig.getoption("redis_url").split(":") - - if pytestconfig.getoption("env") == "local": - return Client( - core_url=pytestconfig.getoption("core_url"), - serving_url=pytestconfig.getoption("serving_url"), - spark_launcher="standalone", - spark_standalone_master="local", - spark_home=os.getenv("SPARK_HOME") or os.path.dirname(pyspark.__file__), - spark_ingestion_jar=ingestion_job_jar, - redis_host=redis_host, - redis_port=redis_port, - ) - - if pytestconfig.getoption("env") == "gcloud": - return Client( - core_url=pytestconfig.getoption("core_url"), - serving_url=pytestconfig.getoption("serving_url"), - spark_launcher="dataproc", - dataproc_cluster_name=pytestconfig.getoption("dataproc_cluster_name"), - dataproc_project=pytestconfig.getoption("dataproc_project"), - dataproc_region=pytestconfig.getoption("dataproc_region"), - dataproc_staging_location=os.path.join( - pytestconfig.getoption("staging_path"), "dataproc" - ), - spark_ingestion_jar=ingestion_job_jar, - ) diff --git a/tests/e2e/fixtures/__init__.py b/tests/e2e/fixtures/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/e2e/fixtures/base.py b/tests/e2e/fixtures/base.py new file mode 100644 index 0000000000..68b9be5a4d --- /dev/null +++ b/tests/e2e/fixtures/base.py @@ -0,0 +1,16 @@ +from pathlib import Path + +import pytest + + +@pytest.fixture(scope="session") +def project_root(): + return Path(__file__).parent.parent.parent.parent + + +@pytest.fixture(scope="session") +def project_version(pytestconfig): + if pytestconfig.getoption("feast_version"): + return pytestconfig.getoption("feast_version") + + return "0.8-SNAPSHOT" diff --git a/tests/e2e/fixtures/client.py b/tests/e2e/fixtures/client.py new file mode 100644 index 0000000000..366b0aa711 --- /dev/null +++ b/tests/e2e/fixtures/client.py @@ -0,0 +1,81 @@ +import os +import tempfile +import uuid +from typing import Tuple + +import pyspark +import pytest +from pytest_redis.executor import RedisExecutor + +from feast import Client + + +@pytest.fixture +def feast_client( + pytestconfig, + ingestion_job_jar, + redis_server: RedisExecutor, + feast_core: Tuple[str, int], + feast_serving: Tuple[str, int], + local_staging_path, +): + if pytestconfig.getoption("env") == "local": + return Client( + core_url=f"{feast_core[0]}:{feast_core[1]}", + serving_url=f"{feast_serving[0]}:{feast_serving[1]}", + spark_launcher="standalone", + spark_standalone_master="local", + spark_home=os.getenv("SPARK_HOME") or os.path.dirname(pyspark.__file__), + spark_ingestion_jar=ingestion_job_jar, + redis_host=redis_server.host, + redis_port=redis_server.port, + spark_staging_location=os.path.join(local_staging_path, "spark"), + historical_feature_output_location=os.path.join( + local_staging_path, "historical_output" + ), + ) + + if pytestconfig.getoption("env") == "gcloud": + return Client( + core_url=f"{feast_core[0]}:{feast_core[1]}", + serving_url=f"{feast_serving[0]}:{feast_serving[1]}", + spark_launcher="dataproc", + dataproc_cluster_name=pytestconfig.getoption("dataproc_cluster_name"), + dataproc_project=pytestconfig.getoption("dataproc_project"), + dataproc_region=pytestconfig.getoption("dataproc_region"), + spark_staging_location=os.path.join(local_staging_path, "dataproc"), + spark_ingestion_jar=ingestion_job_jar, + redis_host=pytestconfig.getoption("redis_url").split(":")[0], + redis_port=pytestconfig.getoption("redis_url").split(":")[1], + historical_feature_output_location=os.path.join( + local_staging_path, "historical_output" + ), + ) + + +@pytest.fixture(scope="session") +def global_staging_path(pytestconfig): + if pytestconfig.getoption("env") == "local": + tmp_path = tempfile.mkdtemp() + return f"file://{tmp_path}" + + staging_path = pytestconfig.getoption("staging_path") + return os.path.join(staging_path, str(uuid.uuid4())) + + +@pytest.fixture(scope="function") +def local_staging_path(global_staging_path): + return os.path.join(global_staging_path, str(uuid.uuid4())) + + +@pytest.fixture(scope="session") +def ingestion_job_jar(pytestconfig, project_root, project_version): + default_path = ( + project_root + / "spark" + / "ingestion" + / "target" + / f"feast-ingestion-spark-{project_version}.jar" + ) + + return pytestconfig.getoption("ingestion_jar") or f"file://{default_path}" diff --git a/tests/e2e/fixtures/external_services.py b/tests/e2e/fixtures/external_services.py new file mode 100644 index 0000000000..6929c16ec1 --- /dev/null +++ b/tests/e2e/fixtures/external_services.py @@ -0,0 +1,28 @@ +import pytest +from pytest_redis.executor import NoopRedis + +__all__ = ("feast_core", "feast_serving", "redis_server", "kafka_server") + + +@pytest.fixture(scope="session") +def redis_server(pytestconfig): + host, port = pytestconfig.getoption("redis_url").split(":") + return NoopRedis(host, port, None) + + +@pytest.fixture(scope="session") +def feast_core(pytestconfig): + host, port = pytestconfig.getoption("core_url").split(":") + return host, port + + +@pytest.fixture(scope="session") +def feast_serving(pytestconfig): + host, port = pytestconfig.getoption("serving_url").split(":") + return host, port + + +@pytest.fixture(scope="session") +def kafka_server(pytestconfig): + host, port = pytestconfig.getoption("kafka_brokers").split(":") + return host, port diff --git a/tests/e2e/fixtures/feast_services.py b/tests/e2e/fixtures/feast_services.py new file mode 100644 index 0000000000..ce7f854691 --- /dev/null +++ b/tests/e2e/fixtures/feast_services.py @@ -0,0 +1,140 @@ +import os +import shutil +import socket +import subprocess +import tempfile +import time +from typing import Any, Dict + +import pytest +import yaml +from pytest_postgresql.executor import PostgreSQLExecutor +from pytest_redis.executor import RedisExecutor + +__all__ = ( + "feast_core", + "feast_serving", + "enable_auth", +) + + +def _start_jar(jar, options=None) -> subprocess.Popen: + if not os.path.isfile(jar): + raise ValueError(f"{jar} doesn't exist") + + cmd = [shutil.which("java"), "-jar", jar] + if options: + cmd.extend(options) + + return subprocess.Popen(cmd) # type: ignore + + +def _wait_port_open(port, max_wait=60): + print(f"Waiting for port {port}") + start = time.time() + + while True: + try: + socket.create_connection(("localhost", port), timeout=1) + except OSError: + if time.time() - start > max_wait: + raise + + time.sleep(1) + else: + return + + +@pytest.fixture(scope="session", params=[True, False]) +def enable_auth(request): + return request.param + + +@pytest.fixture(scope="session") +def feast_core( + project_root, project_version, enable_auth, postgres_server: PostgreSQLExecutor +): + jar = str( + project_root / "core" / "target" / f"feast-core-{project_version}-exec.jar" + ) + config = dict( + feast=dict( + security=dict( + enabled=enable_auth, + provider="jwt", + options=dict( + jwkEndpointURI="https://www.googleapis.com/oauth2/v3/certs" + ), + ) + ), + spring=dict( + datasource=dict( + url=f"jdbc:postgresql://{postgres_server.host}:{postgres_server.port}/postgres" + ) + ), + ) + + with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w+") as config_file: + yaml.dump(config, config_file) + config_file.flush() + + process = _start_jar( + jar, + [ + f"--spring.config.location=classpath:/application.yml,file://{config_file.name}" + ], + ) + _wait_port_open(6565) + yield "localhost", 6565 + process.terminate() + + +@pytest.fixture(scope="session") +def feast_serving( + project_root, + project_version, + enable_auth, + redis_server: RedisExecutor, + feast_core, + pytestconfig, +): + jar = str( + project_root + / "serving" + / "target" + / f"feast-serving-{project_version}-exec.jar" + ) + if pytestconfig.getoption("redis_cluster"): + store: Dict[str, Any] = dict( + name="online", + type="REDIS_CLUSTER", + config=dict(connection_string=f"{redis_server.host}:{redis_server.port}"), + ) + else: + store = dict( + name="online", + type="REDIS", + config=dict(host=redis_server.host, port=redis_server.port), + ) + + config = dict( + feast=dict( + stores=[store], + coreAuthentication=dict(enabled=enable_auth, provider="google"), + security=dict(authentication=dict(enabled=enable_auth, provider="jwt")), + ) + ) + + with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w+") as config_file: + yaml.dump(config, config_file) + config_file.flush() + + process = _start_jar( + jar, + [ + f"--spring.config.location=classpath:/application.yml,file://{config_file.name}" + ], + ) + _wait_port_open(6566) + yield "localhost", 6566 + process.terminate() diff --git a/tests/e2e/fixtures/services.py b/tests/e2e/fixtures/services.py new file mode 100644 index 0000000000..38927a0729 --- /dev/null +++ b/tests/e2e/fixtures/services.py @@ -0,0 +1,53 @@ +import pathlib +import shutil +import tempfile + +import pytest +import requests +from pytest_kafka import make_kafka_server, make_zookeeper_process +from pytest_postgresql import factories as pg_factories +from pytest_redis import factories as redis_factories + +__all__ = ( + "kafka_server", + "kafka_port", + "zookeeper_server", + "postgres_server", + "redis_server", +) + + +def download_kafka(version="2.12-2.6.0"): + r = requests.get(f"https://downloads.apache.org/kafka/2.6.0/kafka_{version}.tgz") + temp_dir = pathlib.Path(tempfile.mkdtemp()) + local_path = temp_dir / "kafka.tgz" + + with open(local_path, "wb") as f: + f.write(r.content) + + shutil.unpack_archive(str(local_path), str(temp_dir)) + return temp_dir / f"kafka_{version}" / "bin" + + +@pytest.fixture +def kafka_server(kafka_port): + _, port = kafka_port + return "localhost", port + + +postgres_server = pg_factories.postgresql_proc(password="password") +redis_server = redis_factories.redis_proc(executable=shutil.which("redis-server")) + +KAFKA_BIN = download_kafka() +zookeeper_server = make_zookeeper_process( + str(KAFKA_BIN / "zookeeper-server-start.sh"), + zk_config_template=""" +dataDir={zk_data_dir} +clientPort={zk_port} +maxClientCnxns=0 +admin.enableServer=false""", +) +kafka_port = make_kafka_server( + kafka_bin=str(KAFKA_BIN / "kafka-server-start.sh"), + zookeeper_fixture_name="zookeeper_server", +) diff --git a/tests/e2e/requirements.txt b/tests/e2e/requirements.txt deleted file mode 100644 index 80380451c5..0000000000 --- a/tests/e2e/requirements.txt +++ /dev/null @@ -1,14 +0,0 @@ -mock==2.0.0 -numpy==1.16.4 -pandas~=1.0.0 -pandavro==1.5.* -pyspark==2.4.2 -pytest==6.0.0 -pytest-benchmark==3.2.2 -pytest-mock==1.10.4 -pytest-timeout==1.3.3 -pytest-ordering==0.6.* -pytest-xdist==2.1.0 -deepdiff==4.3.2 -confluent_kafka -avro==1.10.0 \ No newline at end of file diff --git a/tests/e2e/test_historical_features.py b/tests/e2e/test_historical_features.py index 84c1af139f..e6e909bb04 100644 --- a/tests/e2e/test_historical_features.py +++ b/tests/e2e/test_historical_features.py @@ -1,35 +1,36 @@ import os -import tempfile -import uuid from datetime import datetime, timedelta from urllib.parse import urlparse +import gcsfs import numpy as np import pandas as pd -import pytest from google.protobuf.duration_pb2 import Duration from pandas._testing import assert_frame_equal +from pyarrow import parquet from feast import Client, Entity, Feature, FeatureTable, FileSource, ValueType from feast.data_format import ParquetFormat -from feast.staging.storage_client import get_staging_client np.random.seed(0) -@pytest.fixture(scope="function") -def staging_path(pytestconfig, tmp_path): - if pytestconfig.getoption("env") == "local": - return f"file://{tmp_path}" +def read_parquet(uri): + parsed_uri = urlparse(uri) + if parsed_uri.scheme == "file": + return pd.read_parquet(parsed_uri.path) + elif parsed_uri.scheme == "gs": + fs = gcsfs.GCSFileSystem() + files = ["gs://" + path for path in gcsfs.GCSFileSystem().glob(uri + "/part-*")] + ds = parquet.ParquetDataset(files, filesystem=fs) + return ds.read().to_pandas() + else: + raise ValueError("Unsupported scheme") - staging_path = pytestconfig.getoption("staging_path") - return os.path.join(staging_path, str(uuid.uuid4())) - -@pytest.mark.skip -def test_historical_features(feast_client: Client, staging_path: str): +def test_historical_features(feast_client: Client, local_staging_path: str): customer_entity = Entity( - name="customer_id", description="Customer", value_type=ValueType.INT64 + name="user_id", description="Customer", value_type=ValueType.INT64 ) feast_client.apply_entity(customer_entity) @@ -38,7 +39,7 @@ def test_historical_features(feast_client: Client, staging_path: str): transactions_feature_table = FeatureTable( name="transactions", - entities=["customer_id"], + entities=["user_id"], features=[ Feature("daily_transactions", ValueType.DOUBLE), Feature("total_transactions", ValueType.DOUBLE), @@ -47,7 +48,7 @@ def test_historical_features(feast_client: Client, staging_path: str): "event_timestamp", "created_timestamp", ParquetFormat(), - os.path.join(staging_path, "transactions"), + os.path.join(local_staging_path, "transactions"), ), max_age=max_age, ) @@ -71,7 +72,7 @@ def test_historical_features(feast_client: Client, staging_path: str): { "event_timestamp": [event_date for _ in customers], "created_timestamp": [creation_date for _ in customers], - "customer_id": customers, + "user_id": customers, "daily_transactions": daily_transactions, "total_transactions": total_transactions, } @@ -85,44 +86,27 @@ def test_historical_features(feast_client: Client, staging_path: str): { "event_timestamp": [retrieval_date for _ in customers] + [retrieval_outside_max_age_date for _ in customers], - "customer_id": customers + customers, + "user_id": customers + customers, } ) - with tempfile.TemporaryDirectory() as tempdir: - df_export_path = os.path.join(tempdir, "customers.parquets") - customer_df.to_parquet(df_export_path) - scheme, _, remote_path, _, _, _ = urlparse(staging_path) - staging_client = get_staging_client(scheme) - staging_client.upload_file(df_export_path, None, remote_path) - customer_source = FileSource( - "event_timestamp", - "event_timestamp", - ParquetFormat(), - os.path.join(staging_path, os.path.basename(df_export_path)), - ) - - job = feast_client.get_historical_features(feature_refs, customer_source) - output_dir = job.get_output_file_uri() - - _, _, joined_df_destination_path, _, _, _ = urlparse(output_dir) - joined_df = pd.read_parquet(joined_df_destination_path) - - expected_joined_df = pd.DataFrame( - { - "event_timestamp": [retrieval_date for _ in customers] - + [retrieval_outside_max_age_date for _ in customers], - "customer_id": customers + customers, - "transactions__daily_transactions": daily_transactions - + [None] * len(customers), - } - ) - - assert_frame_equal( - joined_df.sort_values(by=["customer_id", "event_timestamp"]).reset_index( - drop=True - ), - expected_joined_df.sort_values( - by=["customer_id", "event_timestamp"] - ).reset_index(drop=True), - ) + job = feast_client.get_historical_features(feature_refs, customer_df) + output_dir = job.get_output_file_uri() + joined_df = read_parquet(output_dir) + + expected_joined_df = pd.DataFrame( + { + "event_timestamp": [retrieval_date for _ in customers] + + [retrieval_outside_max_age_date for _ in customers], + "user_id": customers + customers, + "transactions__daily_transactions": daily_transactions + + [None] * len(customers), + } + ) + + assert_frame_equal( + joined_df.sort_values(by=["user_id", "event_timestamp"]).reset_index(drop=True), + expected_joined_df.sort_values(by=["user_id", "event_timestamp"]).reset_index( + drop=True + ), + ) diff --git a/tests/e2e/test_online_features.py b/tests/e2e/test_online_features.py index 8d81a9b79a..cb1ccb69ff 100644 --- a/tests/e2e/test_online_features.py +++ b/tests/e2e/test_online_features.py @@ -8,10 +8,10 @@ import avro.schema import numpy as np import pandas as pd -import pytest import pytz from avro.io import BinaryEncoder, DatumWriter -from confluent_kafka import Producer +from kafka.admin import KafkaAdminClient +from kafka.producer import KafkaProducer from feast import ( Client, @@ -39,16 +39,7 @@ def generate_data(): return df -@pytest.fixture(scope="function") -def staging_path(pytestconfig, tmp_path): - if pytestconfig.getoption("env") == "local": - return f"file://{tmp_path}" - - staging_path = pytestconfig.getoption("staging_path") - return os.path.join(staging_path, str(uuid.uuid4())) - - -def test_offline_ingestion(feast_client: Client, staging_path: str): +def test_offline_ingestion(feast_client: Client, local_staging_path: str): entity = Entity(name="s2id", description="S2id", value_type=ValueType.INT64,) feature_table = FeatureTable( @@ -59,7 +50,7 @@ def test_offline_ingestion(feast_client: Client, staging_path: str): "event_timestamp", "event_timestamp", ParquetFormat(), - os.path.join(staging_path, "batch-storage"), + os.path.join(local_staging_path, "batch-storage"), ), ) @@ -89,8 +80,12 @@ def test_offline_ingestion(feast_client: Client, staging_path: str): ) -def test_streaming_ingestion(feast_client: Client, staging_path: str, pytestconfig): +def test_streaming_ingestion( + feast_client: Client, local_staging_path: str, kafka_server +): entity = Entity(name="s2id", description="S2id", value_type=ValueType.INT64,) + kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}" + topic_name = f"avro-{uuid.uuid4()}" feature_table = FeatureTable( name="drivers_stream", @@ -100,14 +95,14 @@ def test_streaming_ingestion(feast_client: Client, staging_path: str, pytestconf "event_timestamp", "event_timestamp", ParquetFormat(), - os.path.join(staging_path, "batch-storage"), + os.path.join(local_staging_path, "batch-storage"), ), stream_source=KafkaSource( "event_timestamp", "event_timestamp", - pytestconfig.getoption("kafka_brokers"), + kafka_broker, AvroFormat(avro_schema()), - topic="avro", + topic=topic_name, ), ) @@ -120,6 +115,10 @@ def test_streaming_ingestion(feast_client: Client, staging_path: str, pytestconf lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 60 ) + wait_retry_backoff( + lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 60 + ) + try: original = generate_data()[["s2id", "unique_drivers", "event_timestamp"]] for record in original.to_dict("records"): @@ -128,9 +127,9 @@ def test_streaming_ingestion(feast_client: Client, staging_path: str, pytestconf ) send_avro_record_to_kafka( - "avro", + topic_name, record, - bootstrap_servers=pytestconfig.getoption("kafka_brokers"), + bootstrap_servers=kafka_broker, avro_schema_json=avro_schema(), ) @@ -174,12 +173,7 @@ def avro_schema(): def send_avro_record_to_kafka(topic, value, bootstrap_servers, avro_schema_json): value_schema = avro.schema.parse(avro_schema_json) - producer_config = { - "bootstrap.servers": bootstrap_servers, - "request.timeout.ms": "1000", - } - - producer = Producer(producer_config) + producer = KafkaProducer(bootstrap_servers=bootstrap_servers) writer = DatumWriter(value_schema) bytes_writer = io.BytesIO() @@ -188,7 +182,7 @@ def send_avro_record_to_kafka(topic, value, bootstrap_servers, avro_schema_json) writer.write(value, encoder) try: - producer.produce(topic=topic, value=bytes_writer.getvalue()) + producer.send(topic=topic, value=bytes_writer.getvalue()) except Exception as e: print( f"Exception while producing record value - {value} to topic - {topic}: {e}" @@ -197,3 +191,18 @@ def send_avro_record_to_kafka(topic, value, bootstrap_servers, avro_schema_json) print(f"Successfully producing record value - {value} to topic - {topic}") producer.flush() + + +def check_consumer_exist(bootstrap_servers, topic_name): + admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers) + consumer_groups = admin.describe_consumer_groups( + group_ids=[group_id for group_id, _ in admin.list_consumer_groups()] + ) + subscriptions = { + subscription + for group in consumer_groups + for member in group.members + if not isinstance(member.member_metadata, bytes) + for subscription in member.member_metadata.subscription + } + return topic_name in subscriptions diff --git a/tests/e2e/test-register.py b/tests/e2e/test_register.py similarity index 83% rename from tests/e2e/test-register.py rename to tests/e2e/test_register.py index 4bd8966817..0c89ee69ce 100644 --- a/tests/e2e/test-register.py +++ b/tests/e2e/test_register.py @@ -23,18 +23,6 @@ SUFFIX = str(int(datetime.now().timestamp())) -@pytest.fixture(scope="module") -def client(pytestconfig): - core_url = pytestconfig.getoption("core_url") - serving_url = pytestconfig.getoption("serving_url") - - client = Client(core_url=core_url, serving_url=serving_url,) - - client.set_project(PROJECT_NAME) - - return client - - @pytest.fixture def bq_table_id(): return f"kf-feast:feaste2e.table{SUFFIX}" @@ -177,76 +165,80 @@ def alltypes_featuretable(): def test_get_list_basic( - client: Client, + feast_client: Client, customer_entity: Entity, driver_entity: Entity, basic_featuretable: FeatureTable, ): # ApplyEntity - client.apply_entity(customer_entity) - client.apply_entity(driver_entity) + feast_client.apply_entity(customer_entity) + feast_client.apply_entity(driver_entity) # GetEntity Check - assert client.get_entity(name="customer_id") == customer_entity - assert client.get_entity(name="driver_id") == driver_entity + assert feast_client.get_entity(name="customer_id") == customer_entity + assert feast_client.get_entity(name="driver_id") == driver_entity # ListEntities Check common_filtering_labels = {"common_key": "common_val"} matchmaking_filtering_labels = {"team": "matchmaking"} - actual_common_entities = client.list_entities(labels=common_filtering_labels) - actual_matchmaking_entities = client.list_entities( + actual_common_entities = feast_client.list_entities(labels=common_filtering_labels) + actual_matchmaking_entities = feast_client.list_entities( labels=matchmaking_filtering_labels ) assert len(actual_common_entities) == 2 assert len(actual_matchmaking_entities) == 1 # ApplyFeatureTable - client.apply_feature_table(basic_featuretable) + feast_client.apply_feature_table(basic_featuretable) # GetFeatureTable Check - actual_get_feature_table = client.get_feature_table(name="basic_featuretable") + actual_get_feature_table = feast_client.get_feature_table(name="basic_featuretable") assert actual_get_feature_table == basic_featuretable # ListFeatureTables Check actual_list_feature_table = [ - ft for ft in client.list_feature_tables() if ft.name == "basic_featuretable" + ft + for ft in feast_client.list_feature_tables() + if ft.name == "basic_featuretable" ][0] assert actual_list_feature_table == basic_featuretable def test_get_list_alltypes( - client: Client, alltypes_entity: Entity, alltypes_featuretable: FeatureTable + feast_client: Client, alltypes_entity: Entity, alltypes_featuretable: FeatureTable ): # ApplyEntity - client.apply_entity(alltypes_entity) + feast_client.apply_entity(alltypes_entity) # GetEntity Check - assert client.get_entity(name="alltypes_id") == alltypes_entity + assert feast_client.get_entity(name="alltypes_id") == alltypes_entity # ListEntities Check alltypes_filtering_labels = {"cat": "alltypes"} - actual_alltypes_entities = client.list_entities(labels=alltypes_filtering_labels) + actual_alltypes_entities = feast_client.list_entities( + labels=alltypes_filtering_labels + ) assert len(actual_alltypes_entities) == 1 # ApplyFeatureTable - client.apply_feature_table(alltypes_featuretable) + feast_client.apply_feature_table(alltypes_featuretable) # GetFeatureTable Check - actual_get_feature_table = client.get_feature_table(name="alltypes") + actual_get_feature_table = feast_client.get_feature_table(name="alltypes") assert actual_get_feature_table == alltypes_featuretable # ListFeatureTables Check actual_list_feature_table = [ - ft for ft in client.list_feature_tables() if ft.name == "alltypes" + ft for ft in feast_client.list_feature_tables() if ft.name == "alltypes" ][0] assert actual_list_feature_table == alltypes_featuretable @pytest.mark.bq def test_ingest( - client: Client, + feast_client: Client, customer_entity: Entity, driver_entity: Entity, bq_featuretable: FeatureTable, @@ -257,12 +249,12 @@ def test_ingest( bq_table_id = bq_table_id.replace(":", ".") # ApplyEntity - client.apply_entity(customer_entity) - client.apply_entity(driver_entity) + feast_client.apply_entity(customer_entity) + feast_client.apply_entity(driver_entity) # ApplyFeatureTable - client.apply_feature_table(bq_featuretable) - client.ingest(bq_featuretable, bq_dataset, timeout=120) + feast_client.apply_feature_table(bq_featuretable) + feast_client.ingest(bq_featuretable, bq_dataset, timeout=120) from google.api_core.exceptions import NotFound from google.cloud import bigquery diff --git a/tests/e2e/pyproject.toml b/tests/pyproject.toml similarity index 100% rename from tests/e2e/pyproject.toml rename to tests/pyproject.toml diff --git a/tests/e2e/pytest.ini b/tests/pytest.ini similarity index 100% rename from tests/e2e/pytest.ini rename to tests/pytest.ini diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 0000000000..790432d471 --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,13 @@ +pytest==6.0.0 +pytest-lazy-fixture==0.6.3 +pytest-timeout==1.4.2 +pytest-ordering==0.6.* +pytest-benchmark==3.2.2 +pytest-mock==1.10.4 +pytest-ordering==0.6.* +pytest-xdist==2.1.0 +pytest-postgresql==2.5.1 +pytest-redis==2.0.0 +pytest-kafka==0.4.0 +deepdiff==4.3.2 +kafka-python==2.0.2 \ No newline at end of file diff --git a/tests/e2e/setup.cfg b/tests/setup.cfg similarity index 100% rename from tests/e2e/setup.cfg rename to tests/setup.cfg