Skip to content

Add support for async GRAPH module #2273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5e48aac
Add support for async graph
dvora-h Jul 12, 2022
f5ee75e
linters
dvora-h Jul 12, 2022
db0b650
Merge branch 'master' into async-graph
chayim Jul 17, 2022
224778f
fix docstring
dvora-h Jul 19, 2022
fb2c74b
Use retry mechanism in async version of Connection objects (#2271)
szumka Jul 21, 2022
439f2ea
fix is_connected (#2278)
dvora-h Jul 21, 2022
d7e4ea1
fix: workaround asyncio bug on connection reset by peer (#2259)
sileht Jul 24, 2022
13941b8
Fix crash: key expire while search (#2270)
dvora-h Jul 24, 2022
e993e2c
docs: Fix a few typos (#2274)
timgates42 Jul 24, 2022
efe7c7a
async_cluster: fix concurrent pipeline (#2280)
utkarshgupta137 Jul 24, 2022
b12b025
Add support for TIMESERIES 1.8 (#2296)
dvora-h Jul 24, 2022
e9f8447
Remove verbose logging from `redis-py/redis/cluster.py` (#2238)
nialdaly Jul 24, 2022
8529170
redis stream example (#2269)
pedrofrazao Jul 24, 2022
8e2ea28
Fix: `start_id` type for `XAUTOCLAIM` (#2257)
GaMeRaM Jul 24, 2022
20fa54f
Doc add timeseries example (#2267)
Iglesys347 Jul 25, 2022
a44b952
Fix warnings and resource usage problems in asyncio unittests (#2258)
kristjanvalur Jul 26, 2022
4a83d67
Graph - add counters for removed labels and properties (#2292)
DvirDukhan Jul 26, 2022
115c259
cleaning up the readme and moving docs into readthedocs (#2291)
chayim Jul 27, 2022
fe93f41
async_cluster: fix max_connections/ssl & improve args (#2217)
utkarshgupta137 Jul 27, 2022
34c5177
fix review comments
dvora-h Jul 27, 2022
24413b1
Merge branch 'master' into async-graph
dvora-h Jul 27, 2022
e60ad1c
fix
dvora-h Jul 27, 2022
2e9cab8
fix review comments
dvora-h Jul 28, 2022
85732f4
fix review comments
dvora-h Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 111 additions & 18 deletions redis/commands/graph/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from ..helpers import quote_string, random_string, stringify_param_value
from .commands import GraphCommands
from .commands import AsyncGraphCommands, GraphCommands
from .edge import Edge # noqa
from .node import Node # noqa
from .path import Path # noqa

DB_LABELS = "DB.LABELS"
DB_RAELATIONSHIPTYPES = "DB.RELATIONSHIPTYPES"
DB_PROPERTYKEYS = "DB.PROPERTYKEYS"


class Graph(GraphCommands):
"""
Expand Down Expand Up @@ -44,25 +48,19 @@ def _refresh_labels(self):
lbls = self.labels()

# Unpack data.
self._labels = [None] * len(lbls)
for i, l in enumerate(lbls):
self._labels[i] = l[0]
self._labels = [l[0] for _, l in enumerate(lbls)]

def _refresh_relations(self):
rels = self.relationship_types()

# Unpack data.
self._relationship_types = [None] * len(rels)
for i, r in enumerate(rels):
self._relationship_types[i] = r[0]
self._relationship_types = [r[0] for _, r in enumerate(rels)]

def _refresh_attributes(self):
props = self.property_keys()

# Unpack data.
self._properties = [None] * len(props)
for i, p in enumerate(props):
self._properties[i] = p[0]
self._properties = [p[0] for _, p in enumerate(props)]

def get_label(self, idx):
"""
Expand Down Expand Up @@ -108,12 +106,12 @@ def get_property(self, idx):
The index of the property
"""
try:
propertie = self._properties[idx]
p = self._properties[idx]
except IndexError:
# Refresh properties.
self._refresh_attributes()
propertie = self._properties[idx]
return propertie
p = self._properties[idx]
return p

def add_node(self, node):
"""
Expand All @@ -133,6 +131,8 @@ def add_edge(self, edge):
self.edges.append(edge)

def _build_params_header(self, params):
if params is None:
return ""
if not isinstance(params, dict):
raise TypeError("'params' must be a dict")
# Header starts with "CYPHER"
Expand All @@ -147,16 +147,109 @@ def call_procedure(self, procedure, *args, read_only=False, **kwagrs):
q = f"CALL {procedure}({','.join(args)})"

y = kwagrs.get("y", None)
if y:
q += f" YIELD {','.join(y)}"
if y is not None:
q += f"YIELD {','.join(y)}"

return self.query(q, read_only=read_only)

def labels(self):
return self.call_procedure("db.labels", read_only=True).result_set
return self.call_procedure(DB_LABELS, read_only=True).result_set

def relationship_types(self):
return self.call_procedure("db.relationshipTypes", read_only=True).result_set
return self.call_procedure(DB_RAELATIONSHIPTYPES, read_only=True).result_set

def property_keys(self):
return self.call_procedure("db.propertyKeys", read_only=True).result_set
return self.call_procedure(DB_PROPERTYKEYS, read_only=True).result_set


class AsyncGraph(Graph, AsyncGraphCommands):
"""Async version for Graph"""

async def _refresh_labels(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstrings... every function please

lbls = await self.labels()

# Unpack data.
self._labels = [l[0] for _, l in enumerate(lbls)]

async def _refresh_attributes(self):
props = await self.property_keys()

# Unpack data.
self._properties = [p[0] for _, p in enumerate(props)]

async def _refresh_relations(self):
rels = await self.relationship_types()

# Unpack data.
self._relationship_types = [r[0] for _, r in enumerate(rels)]

async def get_label(self, idx):
"""
Returns a label by it's index

Args:

idx:
The index of the label
"""
try:
label = self._labels[idx]
except IndexError:
# Refresh labels.
await self._refresh_labels()
label = self._labels[idx]
return label

async def get_property(self, idx):
"""
Returns a property by it's index

Args:

idx:
The index of the property
"""
try:
p = self._properties[idx]
except IndexError:
# Refresh properties.
await self._refresh_attributes()
p = self._properties[idx]
return p

async def get_relation(self, idx):
"""
Returns a relationship type by it's index

Args:

idx:
The index of the relation
"""
try:
relationship_type = self._relationship_types[idx]
except IndexError:
# Refresh relationship types.
await self._refresh_relations()
relationship_type = self._relationship_types[idx]
return relationship_type

async def call_procedure(self, procedure, *args, read_only=False, **kwagrs):
args = [quote_string(arg) for arg in args]
q = f"CALL {procedure}({','.join(args)})"

y = kwagrs.get("y", None)
if y is not None:
f"YIELD {','.join(y)}"
return await self.query(q, read_only=read_only)

async def labels(self):
return ((await self.call_procedure(DB_LABELS, read_only=True))).result_set

async def property_keys(self):
return (await self.call_procedure(DB_PROPERTYKEYS, read_only=True)).result_set

async def relationship_types(self):
return (
await self.call_procedure(DB_RAELATIONSHIPTYPES, read_only=True)
).result_set
146 changes: 123 additions & 23 deletions redis/commands/graph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@

from .exceptions import VersionMismatchException
from .execution_plan import ExecutionPlan
from .query_result import QueryResult
from .query_result import AsyncQueryResult, QueryResult

PROFILE_CMD = "GRAPH.PROFILE"
RO_QUERY_CMD = "GRAPH.RO_QUERY"
QUERY_CMD = "GRAPH.QUERY"
DELETE_CMD = "GRAPH.DELETE"
SLOWLOG_CMD = "GRAPH.SLOWLOG"
CONFIG_CMD = "GRAPH.CONFIG"
LIST_CMD = "GRAPH.LIST"
EXPLAIN_CMD = "GRAPH.EXPLAIN"


class GraphCommands:
Expand Down Expand Up @@ -52,33 +61,28 @@ def query(self, q, params=None, timeout=None, read_only=False, profile=False):
query = q

# handle query parameters
if params is not None:
query = self._build_params_header(params) + query
query = self._build_params_header(params) + query

# construct query command
# ask for compact result-set format
# specify known graph version
if profile:
cmd = "GRAPH.PROFILE"
cmd = PROFILE_CMD
else:
cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY"
cmd = RO_QUERY_CMD if read_only else QUERY_CMD
command = [cmd, self.name, query, "--compact"]

# include timeout is specified
if timeout:
if not isinstance(timeout, int):
raise Exception("Timeout argument must be a positive integer")
command += ["timeout", timeout]
if isinstance(timeout, int):
command.extend(["timeout", timeout])
elif timeout is not None:
raise Exception("Timeout argument must be a positive integer")

# issue query
try:
response = self.execute_command(*command)
return QueryResult(self, response, profile)
except ResponseError as e:
if "wrong number of arguments" in str(e):
print(
"Note: RedisGraph Python requires server version 2.2.8 or above"
) # noqa
if "unknown command" in str(e) and read_only:
# `GRAPH.RO_QUERY` is unavailable in older versions.
return self.query(q, params, timeout, read_only=False)
Expand Down Expand Up @@ -106,7 +110,7 @@ def delete(self):
For more information see `DELETE <https://redis.io/commands/graph.delete>`_. # noqa
"""
self._clear_schema()
return self.execute_command("GRAPH.DELETE", self.name)
return self.execute_command(DELETE_CMD, self.name)

# declared here, to override the built in redis.db.flush()
def flush(self):
Expand Down Expand Up @@ -146,7 +150,7 @@ def slowlog(self):
3. The issued query.
4. The amount of time needed for its execution, in milliseconds.
"""
return self.execute_command("GRAPH.SLOWLOG", self.name)
return self.execute_command(SLOWLOG_CMD, self.name)

def config(self, name, value=None, set=False):
"""
Expand All @@ -170,14 +174,14 @@ def config(self, name, value=None, set=False):
raise DataError(
"``value`` can be provided only when ``set`` is True"
) # noqa
return self.execute_command("GRAPH.CONFIG", *params)
return self.execute_command(CONFIG_CMD, *params)

def list_keys(self):
"""
Lists all graph keys in the keyspace.
For more information see `GRAPH.LIST <https://redis.io/commands/graph.list>`_. # noqa
"""
return self.execute_command("GRAPH.LIST")
return self.execute_command(LIST_CMD)

def execution_plan(self, query, params=None):
"""
Expand All @@ -188,10 +192,9 @@ def execution_plan(self, query, params=None):
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
plan = self.execute_command(EXPLAIN_CMD, self.name, query)
if isinstance(plan[0], bytes):
plan = [b.decode() for b in plan]
return "\n".join(plan)
Expand All @@ -206,8 +209,105 @@ def explain(self, query, params=None):
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query
query = self._build_params_header(params) + query

plan = self.execute_command(EXPLAIN_CMD, self.name, query)
return ExecutionPlan(plan)


class AsyncGraphCommands(GraphCommands):
async def query(self, q, params=None, timeout=None, read_only=False, profile=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type hints

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
Executes a query against the graph.
For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa

Args:

q : str
The query.
params : dict
Query parameters.
timeout : int
Maximum runtime for read queries in milliseconds.
read_only : bool
Executes a readonly query if set to True.
profile : bool
Return details on results produced by and time
spent in each operation.
"""

# maintain original 'q'
query = q

# handle query parameters
query = self._build_params_header(params) + query

# construct query command
# ask for compact result-set format
# specify known graph version
if profile:
cmd = PROFILE_CMD
else:
cmd = RO_QUERY_CMD if read_only else QUERY_CMD
command = [cmd, self.name, query, "--compact"]

# include timeout is specified
if isinstance(timeout, int):
command.extend(["timeout", timeout])
elif timeout is not None:
raise Exception("Timeout argument must be a positive integer")

# issue query
try:
response = await self.execute_command(*command)
return await AsyncQueryResult().initialize(self, response, profile)
except ResponseError as e:
if "unknown command" in str(e) and read_only:
# `GRAPH.RO_QUERY` is unavailable in older versions.
return await self.query(q, params, timeout, read_only=False)
raise e
except VersionMismatchException as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern for possible client side caching in the future... just a thought?

# client view over the graph schema is out of sync
# set client version and refresh local schema
self.version = e.version
self._refresh_schema()
# re-issue query
return await self.query(q, params, timeout, read_only)

async def execution_plan(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns an array of operations.

Args:
query: the query that will be executed
params: query parameters
"""
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
if isinstance(plan[0], bytes):
plan = [b.decode() for b in plan]
return "\n".join(plan)

async def explain(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns ExecutionPlan object.

Args:
query: the query that will be executed
params: query parameters
"""
query = self._build_params_header(params) + query

plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
return ExecutionPlan(plan)

async def flush(self):
"""
Commit the graph and reset the edges and the nodes to zero length.
"""
await self.commit()
self.nodes = {}
self.edges = []
Loading