diff --git a/dockers/cluster.redis.conf b/dockers/cluster.redis.conf index cd5c08b7b8..d4de46fbed 100644 --- a/dockers/cluster.redis.conf +++ b/dockers/cluster.redis.conf @@ -5,3 +5,4 @@ loadmodule /opt/redis-stack/lib/redisgraph.so loadmodule /opt/redis-stack/lib/redistimeseries.so loadmodule /opt/redis-stack/lib/rejson.so loadmodule /opt/redis-stack/lib/redisbloom.so +loadmodule /opt/redis-stack/lib/redisgears.so v8-plugin-path /opt/redis-stack/lib/libredisgears_v8_plugin.so diff --git a/redis/cluster.py b/redis/cluster.py index 3549ced35d..0c33fd2c68 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -282,6 +282,11 @@ class AbstractRedisCluster: "READONLY", "READWRITE", "TIME", + "TFUNCTION LOAD", + "TFUNCTION DELETE", + "TFUNCTION LIST", + "TFCALL", + "TFCALLASYNC", "GRAPH.CONFIG", "LATENCY HISTORY", "LATENCY LATEST", @@ -298,6 +303,7 @@ class AbstractRedisCluster: "FUNCTION LIST", "FUNCTION LOAD", "FUNCTION RESTORE", + "REDISGEARS_2.REFRESHCLUSTER", "SCAN", "SCRIPT EXISTS", "SCRIPT FLUSH", diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index cd93a85aba..691cab3def 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -30,10 +30,12 @@ AsyncACLCommands, AsyncDataAccessCommands, AsyncFunctionCommands, + AsyncGearsCommands, AsyncManagementCommands, AsyncScriptCommands, DataAccessCommands, FunctionCommands, + GearsCommands, ManagementCommands, PubSubCommands, ResponseT, @@ -689,6 +691,12 @@ def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: self.read_from_replicas = False return self.execute_command("READWRITE", target_nodes=target_nodes) + def gears_refresh_cluster(self, **kwargs) -> ResponseT: + """ + On an OSS cluster, before executing any gears function, you must call this command. # noqa + """ + return self.execute_command("REDISGEARS_2.REFRESHCLUSTER", **kwargs) + class AsyncClusterManagementCommands( ClusterManagementCommands, AsyncManagementCommands @@ -864,6 +872,7 @@ class RedisClusterCommands( ClusterDataAccessCommands, ScriptCommands, FunctionCommands, + GearsCommands, RedisModuleCommands, ): """ @@ -893,6 +902,7 @@ class AsyncRedisClusterCommands( AsyncClusterDataAccessCommands, AsyncScriptCommands, AsyncFunctionCommands, + AsyncGearsCommands, ): """ A class for all Redis Cluster commands diff --git a/redis/commands/core.py b/redis/commands/core.py index 8b1b711df9..09ec59f47c 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -6105,6 +6105,131 @@ def function_stats(self) -> Union[Awaitable[List], List]: AsyncFunctionCommands = FunctionCommands +class GearsCommands: + def tfunction_load( + self, lib_code: str, replace: bool = False, config: Union[str, None] = None + ) -> ResponseT: + """ + Load a new library to RedisGears. + + ``lib_code`` - the library code. + ``config`` - a string representation of a JSON object + that will be provided to the library on load time, + for more information refer to + https://github.com/RedisGears/RedisGears/blob/master/docs/function_advance_topics.md#library-configuration + ``replace`` - an optional argument, instructs RedisGears to replace the + function if its already exists + + For more information see https://redis.io/commands/tfunction-load/ + """ + pieces = [] + if replace: + pieces.append("REPLACE") + if config is not None: + pieces.extend(["CONFIG", config]) + pieces.append(lib_code) + return self.execute_command("TFUNCTION LOAD", *pieces) + + def tfunction_delete(self, lib_name: str) -> ResponseT: + """ + Delete a library from RedisGears. + + ``lib_name`` the library name to delete. + + For more information see https://redis.io/commands/tfunction-delete/ + """ + return self.execute_command("TFUNCTION DELETE", lib_name) + + def tfunction_list( + self, + with_code: bool = False, + verbose: int = 0, + lib_name: Union[str, None] = None, + ) -> ResponseT: + """ + List the functions with additional information about each function. + + ``with_code`` Show libraries code. + ``verbose`` output verbosity level, higher number will increase verbosity level + ``lib_name`` specifying a library name (can be used multiple times to show multiple libraries in a single command) # noqa + + For more information see https://redis.io/commands/tfunction-list/ + """ + pieces = [] + if with_code: + pieces.append("WITHCODE") + if verbose >= 1 and verbose <= 3: + pieces.append("v" * verbose) + else: + raise DataError("verbose can be 1, 2 or 3") + if lib_name is not None: + pieces.append("LIBRARY") + pieces.append(lib_name) + + return self.execute_command("TFUNCTION LIST", *pieces) + + def _tfcall( + self, + lib_name: str, + func_name: str, + keys: KeysT = None, + _async: bool = False, + *args: List, + ) -> ResponseT: + pieces = [f"{lib_name}.{func_name}"] + if keys is not None: + pieces.append(len(keys)) + pieces.extend(keys) + else: + pieces.append(0) + if args is not None: + pieces.extend(args) + if _async: + return self.execute_command("TFCALLASYNC", *pieces) + return self.execute_command("TFCALL", *pieces) + + def tfcall( + self, + lib_name: str, + func_name: str, + keys: KeysT = None, + *args: List, + ) -> ResponseT: + """ + Invoke a function. + + ``lib_name`` - the library name contains the function. + ``func_name`` - the function name to run. + ``keys`` - the keys that will be touched by the function. + ``args`` - Additional argument to pass to the function. + + For more information see https://redis.io/commands/tfcall/ + """ + return self._tfcall(lib_name, func_name, keys, False, *args) + + def tfcall_async( + self, + lib_name: str, + func_name: str, + keys: KeysT = None, + *args: List, + ) -> ResponseT: + """ + Invoke an async function (coroutine). + + ``lib_name`` - the library name contains the function. + ``func_name`` - the function name to run. + ``keys`` - the keys that will be touched by the function. + ``args`` - Additional argument to pass to the function. + + For more information see https://redis.io/commands/tfcall/ + """ + return self._tfcall(lib_name, func_name, keys, True, *args) + + +AsyncGearsCommands = GearsCommands + + class DataAccessCommands( BasicKeyCommands, HyperlogCommands, @@ -6148,6 +6273,7 @@ class CoreCommands( PubSubCommands, ScriptCommands, FunctionCommands, + GearsCommands, ): """ A class containing all of the implemented redis commands. This class is @@ -6164,6 +6290,7 @@ class AsyncCoreCommands( AsyncPubSubCommands, AsyncScriptCommands, AsyncFunctionCommands, + AsyncGearsCommands, ): """ A class containing all of the implemented redis commands. This class is diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 84654e70c3..239927f484 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -2431,6 +2431,57 @@ def teardown(): assert "client-info" in r.acl_log(count=1, target_nodes=node)[0] assert r.acl_log_reset(target_nodes=node) + def generate_lib_code(self, lib_name): + return f"""#!js api_version=1.0 name={lib_name}\n redis.registerFunction('foo', ()=>{{return 'bar'}})""" # noqa + + def try_delete_libs(self, r, *lib_names): + for lib_name in lib_names: + try: + r.tfunction_delete(lib_name) + except Exception: + pass + + @skip_if_server_version_lt("7.1.140") + def test_tfunction_load_delete(self, r): + r.gears_refresh_cluster() + self.try_delete_libs(r, "lib1") + lib_code = self.generate_lib_code("lib1") + assert r.tfunction_load(lib_code) + assert r.tfunction_delete("lib1") + + @skip_if_server_version_lt("7.1.140") + def test_tfunction_list(self, r): + r.gears_refresh_cluster() + self.try_delete_libs(r, "lib1", "lib2", "lib3") + assert r.tfunction_load(self.generate_lib_code("lib1")) + assert r.tfunction_load(self.generate_lib_code("lib2")) + assert r.tfunction_load(self.generate_lib_code("lib3")) + + # test error thrown when verbose > 4 + with pytest.raises(DataError): + assert r.tfunction_list(verbose=8) + + functions = r.tfunction_list(verbose=1) + assert len(functions) == 3 + + expected_names = [b"lib1", b"lib2", b"lib3"] + actual_names = [functions[0][13], functions[1][13], functions[2][13]] + + assert sorted(expected_names) == sorted(actual_names) + assert r.tfunction_delete("lib1") + assert r.tfunction_delete("lib2") + assert r.tfunction_delete("lib3") + + @skip_if_server_version_lt("7.1.140") + def test_tfcall(self, r): + r.gears_refresh_cluster() + self.try_delete_libs(r, "lib1") + assert r.tfunction_load(self.generate_lib_code("lib1")) + assert r.tfcall("lib1", "foo") == b"bar" + assert r.tfcall_async("lib1", "foo") == b"bar" + + assert r.tfunction_delete("lib1") + @pytest.mark.onlycluster class TestNodesManager: diff --git a/tests/test_commands.py b/tests/test_commands.py index fdf41dc5fa..9540f7f20c 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1791,6 +1791,57 @@ def test_substr(self, r): assert r.substr("a", 3, 5) == b"345" assert r.substr("a", 3, -2) == b"345678" + def generate_lib_code(self, lib_name): + return f"""#!js api_version=1.0 name={lib_name}\n redis.registerFunction('foo', ()=>{{return 'bar'}})""" # noqa + + def try_delete_libs(self, r, *lib_names): + for lib_name in lib_names: + try: + r.tfunction_delete(lib_name) + except Exception: + pass + + @pytest.mark.onlynoncluster + @skip_if_server_version_lt("7.1.140") + def test_tfunction_load_delete(self, r): + self.try_delete_libs(r, "lib1") + lib_code = self.generate_lib_code("lib1") + assert r.tfunction_load(lib_code) + assert r.tfunction_delete("lib1") + + @pytest.mark.onlynoncluster + @skip_if_server_version_lt("7.1.140") + def test_tfunction_list(self, r): + self.try_delete_libs(r, "lib1", "lib2", "lib3") + assert r.tfunction_load(self.generate_lib_code("lib1")) + assert r.tfunction_load(self.generate_lib_code("lib2")) + assert r.tfunction_load(self.generate_lib_code("lib3")) + + # test error thrown when verbose > 4 + with pytest.raises(redis.exceptions.DataError): + assert r.tfunction_list(verbose=8) + + functions = r.tfunction_list(verbose=1) + assert len(functions) == 3 + + expected_names = [b"lib1", b"lib2", b"lib3"] + actual_names = [functions[0][13], functions[1][13], functions[2][13]] + + assert sorted(expected_names) == sorted(actual_names) + assert r.tfunction_delete("lib1") + assert r.tfunction_delete("lib2") + assert r.tfunction_delete("lib3") + + @pytest.mark.onlynoncluster + @skip_if_server_version_lt("7.1.140") + def test_tfcall(self, r): + self.try_delete_libs(r, "lib1") + assert r.tfunction_load(self.generate_lib_code("lib1")) + assert r.tfcall("lib1", "foo") == b"bar" + assert r.tfcall_async("lib1", "foo") == b"bar" + + assert r.tfunction_delete("lib1") + def test_ttl(self, r): r["a"] = "1" assert r.expire("a", 10)