diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index e04b78ec32..000f9e9728 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -145,6 +145,10 @@ jobs: run: pip install pip-tools - name: Install dependencies run: make install-python-ci-dependencies + - name: Setup Redis Cluster + run: | + docker pull vishnunair/docker-redis-cluster:latest + docker run -d -p 6001:6379 -p 6002:6380 -p 6003:6381 -p 6004:6382 -p 6005:6383 -p 6006:6384 --name redis-cluster vishnunair/docker-redis-cluster - name: Test python if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak env: diff --git a/docs/how-to-guides/adding-or-reusing-tests.md b/docs/how-to-guides/adding-or-reusing-tests.md index 1730abe209..5a29342d6e 100644 --- a/docs/how-to-guides/adding-or-reusing-tests.md +++ b/docs/how-to-guides/adding-or-reusing-tests.md @@ -79,7 +79,6 @@ def test_historical_features(environment, universal_data_sources, full_feature_n datasets["global"], datasets["entity"], ) - # ... more test code customer_fv, driver_fv, driver_odfv, order_fv, global_fv = ( @@ -93,7 +92,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n feature_service = FeatureService( "convrate_plus100", features=[ - feature_views["driver"][["conv_rate"]], + feature_views["driver"][["conv_rate"]], feature_views["driver_odfv"] ], ) @@ -112,7 +111,6 @@ def test_historical_features(environment, universal_data_sources, full_feature_n ] ) store.apply(feast_objects) - # ... more test code job_from_df = store.get_historical_features( @@ -132,13 +130,11 @@ def test_historical_features(environment, universal_data_sources, full_feature_n full_feature_names=full_feature_names, ) actual_df_from_df_entities = job_from_df.to_df() - # ... more test code assert_frame_equal( expected_df, actual_df_from_df_entities, check_dtype=False, ) - # ... more test code ``` {% endtab %} @@ -186,6 +182,24 @@ def your_test(environment: Environment): your_fv = driver_feature_view(data_source) entity = driver(value_type=ValueType.UNKNOWN) fs.apply([fv, entity]) - + # ... run test ``` + +### Running your own redis cluster for testing + +* Install redis on your computer. If you are a mac user, you should be able to `brew install redis`. + * Running `redis-server --help` and `redis-cli --help` should show corresponding help menus. +* Run `cd scripts/create-cluster` and run `./create-cluster start` then `./create-cluster create` to start the server. You should see output that looks like this: +~~~~ +Starting 6001 +Starting 6002 +Starting 6003 +Starting 6004 +Starting 6005 +Starting 6006 +~~~~ +* You should be able to run the integration tests and have the redis cluster tests pass. +* If you would like to run your own redis cluster, you can run the above commands with your own specified ports and connect to the newly configured cluster. +* To stop the cluster, run `./create-cluster stop` and then `./create-cluster clean`. + diff --git a/scripts/create-cluster b/scripts/create-cluster new file mode 100755 index 0000000000..06a1afd232 --- /dev/null +++ b/scripts/create-cluster @@ -0,0 +1,97 @@ +# Settings +# Make sure you run "brew install redis" + +# BIN_PATH="/opt/homebrew/bin" +REDIS_CLI=`which redis-cli` +REDIS_SERVER=`which redis-server` +CLUSTER_HOST=127.0.0.1 +# Creates a cluster at ports 6001-6006 with 3 masters 6001-6003 and 3 slaves 6004-6006 +PORT=${2:-6000} +TIMEOUT=2000 +NODES=6 +REPLICAS=1 +PROTECTED_MODE=yes +ADDITIONAL_OPTIONS="" + +if [ -a config.sh ] +then + source "config.sh" +fi + +# Computed vars +ENDPORT=$((PORT+NODES)) + +if [ "$1" == "start" ] +then + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + echo "Starting $PORT" + $REDIS_SERVER --port $PORT --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes ${ADDITIONAL_OPTIONS} + done + exit 0 +fi + +if [ "$1" == "create" ] +then + HOSTS="" + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + HOSTS="$HOSTS $CLUSTER_HOST:$PORT" + done + OPT_ARG="" + if [ "$2" == "-f" ]; then + OPT_ARG="--cluster-yes" + fi + $REDIS_CLI --cluster create $HOSTS --cluster-replicas $REPLICAS $OPT_ARG + exit 0 +fi + +if [ "$1" == "stop" ] +then + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + echo "Stopping $PORT" + $REDIS_CLI -p $PORT shutdown nosave + done + exit 0 +fi + +if [ "$1" == "watch" ] +then + PORT=$((PORT+1)) + while [ 1 ]; do + clear + date + $REDIS_CLI -p $PORT cluster nodes | head -30 + sleep 1 + done + exit 0 +fi + +if [ "$1" == "clean" ] +then + echo "Cleaning *.log" + rm -rf *.log + echo "Cleaning appendonly-*" + rm -rf appendonly-* + echo "Cleaning dump-*.rdb" + rm -rf dump-*.rdb + echo "Cleaning nodes-*.conf" + rm -rf nodes-*.conf + exit 0 +fi + +if [ "$1" == "clean-logs" ] +then + echo "Cleaning *.log" + rm -rf *.log + exit 0 +fi + +echo "Usage: $0 [start|create|stop|watch|clean|clean-logs|call]" +echo "start [PORT] -- Launch Redis Cluster instances." +echo "create [PORT] [-f] -- Create a cluster using redis-cli --cluster create." +echo "stop [PORT] -- Stop Redis Cluster instances." +echo "watch [PORT] -- Show CLUSTER NODES output (first 30 lines) of first node." +echo "clean -- Remove all instances data, logs, configs." +echo "clean-logs -- Remove just instances logs." diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 0420751bbd..b557fddc68 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -41,7 +41,7 @@ try: from redis import Redis - from redis.cluster import RedisCluster + from redis.cluster import ClusterNode, RedisCluster except ImportError as e: from feast.errors import FeastExtrasDependencyImportError @@ -160,7 +160,9 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig): online_store_config.connection_string ) if online_store_config.redis_type == RedisType.redis_cluster: - kwargs["startup_nodes"] = startup_nodes + kwargs["startup_nodes"] = [ + ClusterNode(**node) for node in startup_nodes + ] self._client = RedisCluster(**kwargs) else: kwargs["host"] = startup_nodes[0]["host"] diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 49f32379a3..624b610fe4 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -30,6 +30,7 @@ ) from tests.integration.feature_repos.repo_configuration import ( FULL_REPO_CONFIGS, + REDIS_CLUSTER_CONFIG, REDIS_CONFIG, Environment, construct_test_environment, @@ -170,10 +171,14 @@ def cleanup(): return e -@pytest.fixture() +@pytest.fixture( + scope="session", + params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG], + ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]], +) def local_redis_environment(request, worker_id): e = construct_test_environment( - IntegrationTestRepoConfig(online_store=REDIS_CONFIG), worker_id=worker_id + IntegrationTestRepoConfig(online_store=request.param), worker_id=worker_id ) def cleanup(): diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 7de2effc5d..976c807acb 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -46,6 +46,12 @@ DYNAMO_CONFIG = {"type": "dynamodb", "region": "us-west-2"} REDIS_CONFIG = {"type": "redis", "connection_string": "localhost:6379,db=0"} +REDIS_CLUSTER_CONFIG = { + "type": "redis", + "redis_type": "redis_cluster", + # Redis Cluster Port Forwarding is setup in "pr_integration_tests.yaml" under "Setup Redis Cluster". + "connection_string": "127.0.0.1:6001,127.0.0.1:6002,127.0.0.1:6003", +} # FULL_REPO_CONFIGS contains the repo configurations (e.g. provider, offline store, # online store, test data, and more parameters) that most integration tests will test @@ -62,7 +68,9 @@ if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True": DEFAULT_FULL_REPO_CONFIGS.extend( [ + # Redis configurations IntegrationTestRepoConfig(online_store=REDIS_CONFIG), + IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), # GCP configurations IntegrationTestRepoConfig( provider="gcp",