Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for triggered functions (TFUNCTION) #2861

Merged
merged 27 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dockers/cluster.redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ loadmodule /opt/redis-stack/lib/redisgraph.so
loadmodule /opt/redis-stack/lib/redistimeseries.so
loadmodule /opt/redis-stack/lib/rejson.so
loadmodule /opt/redis-stack/lib/redisbloom.so
loadmodule /opt/redis-stack/lib/redisgears.so v8-plugin-path /opt/redis-stack/lib/libredisgears_v8_plugin.so
6 changes: 6 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ class AbstractRedisCluster:
"READONLY",
"READWRITE",
"TIME",
"TFUNCTION LOAD",
"TFUNCTION DELETE",
"TFUNCTION LIST",
"TFCALL",
"TFCALLASYNC",
"GRAPH.CONFIG",
"LATENCY HISTORY",
"LATENCY LATEST",
Expand All @@ -298,6 +303,7 @@ class AbstractRedisCluster:
"FUNCTION LIST",
"FUNCTION LOAD",
"FUNCTION RESTORE",
"REDISGEARS_2.REFRESHCLUSTER",
"SCAN",
"SCRIPT EXISTS",
"SCRIPT FLUSH",
Expand Down
10 changes: 10 additions & 0 deletions redis/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
AsyncACLCommands,
AsyncDataAccessCommands,
AsyncFunctionCommands,
AsyncGearsCommands,
AsyncManagementCommands,
AsyncScriptCommands,
DataAccessCommands,
FunctionCommands,
GearsCommands,
ManagementCommands,
PubSubCommands,
ResponseT,
Expand Down Expand Up @@ -689,6 +691,12 @@ def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
self.read_from_replicas = False
return self.execute_command("READWRITE", target_nodes=target_nodes)

def gears_refresh_cluster(self, **kwargs) -> ResponseT:
"""
On an OSS cluster, before executing any gears function, you must call this command. # noqa
"""
return self.execute_command("REDISGEARS_2.REFRESHCLUSTER", **kwargs)


class AsyncClusterManagementCommands(
ClusterManagementCommands, AsyncManagementCommands
Expand Down Expand Up @@ -864,6 +872,7 @@ class RedisClusterCommands(
ClusterDataAccessCommands,
ScriptCommands,
FunctionCommands,
GearsCommands,
RedisModuleCommands,
):
"""
Expand Down Expand Up @@ -893,6 +902,7 @@ class AsyncRedisClusterCommands(
AsyncClusterDataAccessCommands,
AsyncScriptCommands,
AsyncFunctionCommands,
AsyncGearsCommands,
):
"""
A class for all Redis Cluster commands
Expand Down
127 changes: 127 additions & 0 deletions redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6105,6 +6105,131 @@ def function_stats(self) -> Union[Awaitable[List], List]:
AsyncFunctionCommands = FunctionCommands


class GearsCommands:
def tfunction_load(
self, lib_code: str, replace: bool = False, config: Union[str, None] = None
) -> ResponseT:
"""
Load a new library to RedisGears.

``lib_code`` - the library code.
``config`` - a string representation of a JSON object
that will be provided to the library on load time,
for more information refer to
https://github.com/RedisGears/RedisGears/blob/master/docs/function_advance_topics.md#library-configuration
``replace`` - an optional argument, instructs RedisGears to replace the
function if its already exists

For more information see https://redis.io/commands/tfunction-load/
"""
pieces = []
if replace:
pieces.append("REPLACE")
if config is not None:
pieces.extend(["CONFIG", config])
pieces.append(lib_code)
return self.execute_command("TFUNCTION LOAD", *pieces)

def tfunction_delete(self, lib_name: str) -> ResponseT:
"""
Delete a library from RedisGears.

``lib_name`` the library name to delete.

For more information see https://redis.io/commands/tfunction-delete/
"""
return self.execute_command("TFUNCTION DELETE", lib_name)

def tfunction_list(
self,
with_code: bool = False,
verbose: int = 0,
lib_name: Union[str, None] = None,
) -> ResponseT:
"""
List the functions with additional information about each function.

``with_code`` Show libraries code.
``verbose`` output verbosity level, higher number will increase verbosity level
``lib_name`` specifying a library name (can be used multiple times to show multiple libraries in a single command) # noqa

For more information see https://redis.io/commands/tfunction-list/
"""
pieces = []
if with_code:
pieces.append("WITHCODE")
if verbose >= 1 and verbose <= 3:
pieces.append("v" * verbose)
else:
raise DataError("verbose can be 1, 2 or 3")
if lib_name is not None:
pieces.append("LIBRARY")
pieces.append(lib_name)

return self.execute_command("TFUNCTION LIST", *pieces)

def _tfcall(
self,
lib_name: str,
func_name: str,
keys: KeysT = None,
_async: bool = False,
*args: List,
) -> ResponseT:
pieces = [f"{lib_name}.{func_name}"]
if keys is not None:
pieces.append(len(keys))
pieces.extend(keys)
else:
pieces.append(0)
if args is not None:
pieces.extend(args)
if _async:
return self.execute_command("TFCALLASYNC", *pieces)
return self.execute_command("TFCALL", *pieces)

def tfcall(
self,
lib_name: str,
func_name: str,
keys: KeysT = None,
*args: List,
) -> ResponseT:
"""
Invoke a function.

``lib_name`` - the library name contains the function.
``func_name`` - the function name to run.
``keys`` - the keys that will be touched by the function.
``args`` - Additional argument to pass to the function.

For more information see https://redis.io/commands/tfcall/
"""
return self._tfcall(lib_name, func_name, keys, False, *args)

def tfcall_async(
self,
lib_name: str,
func_name: str,
keys: KeysT = None,
*args: List,
) -> ResponseT:
"""
Invoke an async function (coroutine).

``lib_name`` - the library name contains the function.
``func_name`` - the function name to run.
``keys`` - the keys that will be touched by the function.
``args`` - Additional argument to pass to the function.

For more information see https://redis.io/commands/tfcall/
"""
return self._tfcall(lib_name, func_name, keys, True, *args)


AsyncGearsCommands = GearsCommands


class DataAccessCommands(
BasicKeyCommands,
HyperlogCommands,
Expand Down Expand Up @@ -6148,6 +6273,7 @@ class CoreCommands(
PubSubCommands,
ScriptCommands,
FunctionCommands,
GearsCommands,
):
"""
A class containing all of the implemented redis commands. This class is
Expand All @@ -6164,6 +6290,7 @@ class AsyncCoreCommands(
AsyncPubSubCommands,
AsyncScriptCommands,
AsyncFunctionCommands,
AsyncGearsCommands,
):
"""
A class containing all of the implemented redis commands. This class is
Expand Down
51 changes: 51 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2431,6 +2431,57 @@ def teardown():
assert "client-info" in r.acl_log(count=1, target_nodes=node)[0]
assert r.acl_log_reset(target_nodes=node)

def generate_lib_code(self, lib_name):
return f"""#!js api_version=1.0 name={lib_name}\n redis.registerFunction('foo', ()=>{{return 'bar'}})""" # noqa

def try_delete_libs(self, r, *lib_names):
for lib_name in lib_names:
try:
r.tfunction_delete(lib_name)
except Exception:
pass

@skip_if_server_version_lt("7.1.140")
def test_tfunction_load_delete(self, r):
r.gears_refresh_cluster()
self.try_delete_libs(r, "lib1")
lib_code = self.generate_lib_code("lib1")
assert r.tfunction_load(lib_code)
assert r.tfunction_delete("lib1")

@skip_if_server_version_lt("7.1.140")
def test_tfunction_list(self, r):
r.gears_refresh_cluster()
self.try_delete_libs(r, "lib1", "lib2", "lib3")
assert r.tfunction_load(self.generate_lib_code("lib1"))
assert r.tfunction_load(self.generate_lib_code("lib2"))
assert r.tfunction_load(self.generate_lib_code("lib3"))

# test error thrown when verbose > 4
with pytest.raises(DataError):
assert r.tfunction_list(verbose=8)

functions = r.tfunction_list(verbose=1)
assert len(functions) == 3

expected_names = [b"lib1", b"lib2", b"lib3"]
actual_names = [functions[0][13], functions[1][13], functions[2][13]]

assert sorted(expected_names) == sorted(actual_names)
assert r.tfunction_delete("lib1")
assert r.tfunction_delete("lib2")
assert r.tfunction_delete("lib3")

@skip_if_server_version_lt("7.1.140")
def test_tfcall(self, r):
r.gears_refresh_cluster()
self.try_delete_libs(r, "lib1")
assert r.tfunction_load(self.generate_lib_code("lib1"))
assert r.tfcall("lib1", "foo") == b"bar"
assert r.tfcall_async("lib1", "foo") == b"bar"

assert r.tfunction_delete("lib1")


@pytest.mark.onlycluster
class TestNodesManager:
Expand Down
51 changes: 51 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1791,6 +1791,57 @@ def test_substr(self, r):
assert r.substr("a", 3, 5) == b"345"
assert r.substr("a", 3, -2) == b"345678"

def generate_lib_code(self, lib_name):
return f"""#!js api_version=1.0 name={lib_name}\n redis.registerFunction('foo', ()=>{{return 'bar'}})""" # noqa

def try_delete_libs(self, r, *lib_names):
for lib_name in lib_names:
try:
r.tfunction_delete(lib_name)
except Exception:
pass

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("7.1.140")
def test_tfunction_load_delete(self, r):
self.try_delete_libs(r, "lib1")
lib_code = self.generate_lib_code("lib1")
assert r.tfunction_load(lib_code)
assert r.tfunction_delete("lib1")

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("7.1.140")
def test_tfunction_list(self, r):
self.try_delete_libs(r, "lib1", "lib2", "lib3")
assert r.tfunction_load(self.generate_lib_code("lib1"))
assert r.tfunction_load(self.generate_lib_code("lib2"))
assert r.tfunction_load(self.generate_lib_code("lib3"))

# test error thrown when verbose > 4
with pytest.raises(redis.exceptions.DataError):
assert r.tfunction_list(verbose=8)

functions = r.tfunction_list(verbose=1)
assert len(functions) == 3

expected_names = [b"lib1", b"lib2", b"lib3"]
actual_names = [functions[0][13], functions[1][13], functions[2][13]]

assert sorted(expected_names) == sorted(actual_names)
assert r.tfunction_delete("lib1")
assert r.tfunction_delete("lib2")
assert r.tfunction_delete("lib3")

@pytest.mark.onlynoncluster
@skip_if_server_version_lt("7.1.140")
def test_tfcall(self, r):
self.try_delete_libs(r, "lib1")
assert r.tfunction_load(self.generate_lib_code("lib1"))
assert r.tfcall("lib1", "foo") == b"bar"
assert r.tfcall_async("lib1", "foo") == b"bar"

assert r.tfunction_delete("lib1")

def test_ttl(self, r):
r["a"] = "1"
assert r.expire("a", 10)
Expand Down