Skip to content

update internal cached mapping upon graph version change #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions redisgraph/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class VersionMismatchException(Exception):
def __init__(self, version):
self.version = version

105 changes: 77 additions & 28 deletions redisgraph/graph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from .util import *
import redis
from .query_result import QueryResult
from .exceptions import VersionMismatchException

class Graph(object):
"""
Graph, collection of nodes and edges.
Expand All @@ -9,39 +12,65 @@ def __init__(self, name, redis_con):
"""
Create a new graph.
"""
self.name = name
self.name = name # Graph key
self.redis_con = redis_con
self.nodes = {}
self.edges = []
self._labels = [] # List of node labels.
self._properties = [] # List of properties.
self._relationshipTypes = [] # List of relation types.
self._labels = [] # List of node labels.
self._properties = [] # List of properties.
self._relationshipTypes = [] # List of relation types.
self.version = 0 # Graph version

def _clear_schema(self):
self._labels = []
self._properties = []
self._relationshipTypes = []

def _refresh_schema(self):
self._clear_schema()
self._refresh_labels()
self._refresh_relations()
self._refresh_attributes()

def _refresh_labels(self):
lbls = self.labels()

# Unpack data.
self._labels = [None] * len(lbls)
for i, l in enumerate(lbls):
self._labels[i] = l[0]

def _refresh_relations(self):
rels = self.relationshipTypes()

# Unpack data.
self._relationshipTypes = [None] * len(rels)
for i, r in enumerate(rels):
self._relationshipTypes[i] = r[0]

def _refresh_attributes(self):
props = self.propertyKeys()

# Unpack data.
self._properties = [None] * len(props)
for i, p in enumerate(props):
self._properties[i] = p[0]

def get_label(self, idx):
try:
label = self._labels[idx]
except IndexError:
# Refresh graph labels.
lbls = self.labels()
# Unpack data.
self._labels = [None] * len(lbls)
for i, l in enumerate(lbls):
self._labels[i] = l[0]

# Refresh labels.
self._refresh_labels()
label = self._labels[idx]
return label

def get_relation(self, idx):
try:
relationshipType = self._relationshipTypes[idx]
except IndexError:
# Refresh graph relations.
rels = self.relationshipTypes()
# Unpack data.
self._relationshipTypes = [None] * len(rels)
for i, r in enumerate(rels):
self._relationshipTypes[i] = r[0]

# Refresh relationship types.
self._refresh_relations()
relationshipType = self._relationshipTypes[idx]
return relationshipType

Expand All @@ -50,12 +79,7 @@ def get_property(self, idx):
propertie = self._properties[idx]
except IndexError:
# Refresh properties.
props = self.propertyKeys()
# Unpack data.
self._properties = [None] * len(props)
for i, p in enumerate(props):
self._properties[i] = p[0]

self._refresh_attributes()
propertie = self._properties[idx]
return propertie

Expand Down Expand Up @@ -121,16 +145,40 @@ def query(self, q, params=None, timeout=None):
"""
Executes a query against the graph.
"""

# maintain original 'q'
query = q

# handle query parameters
if params is not None:
q = self.build_params_header(params) + q
query = self.build_params_header(params) + query

# construct query command
# ask for compact result-set format
# specify known graph version
command = ["GRAPH.QUERY", self.name, query, "--compact", "version", self.version]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use version only when server supports this feature, for older version of RedisGraph this will trigger arity error


command = ["GRAPH.QUERY", self.name, q, "--compact"]
# include timeout is specified
if timeout:
if not isinstance(timeout, int):
raise Exception("Timeout argument must be a positive integer")
command += ["timeout", timeout]
response = self.redis_con.execute_command(*command)
return QueryResult(self, response)

# issue query
try:
response = self.redis_con.execute_command(*command)
return QueryResult(self, response)
except redis.exceptions.ResponseError as e:
if "wrong number of arguments" in str(e):
print ("Note: RedisGraph Python requires server version 2.2.8 or above")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now, this is ok
if we introduce new flags there should be a module version check, so each flag is verified with its version and the query will not run if we have a version mismatch

raise e
except VersionMismatchException as e:
# client view over the graph schema is out of sync
# set client version and refresh local schema
self.version = e.version
self._refresh_schema()
# re-issue query
return self.query(q, params, timeout)

def _execution_plan_to_string(self, plan):
return "\n".join(plan)
Expand All @@ -151,6 +199,7 @@ def delete(self):
"""
Deletes graph.
"""
self._clear_schema()
return self.redis_con.execute_command("GRAPH.DELETE", self.name)

def merge(self, pattern):
Expand Down
80 changes: 48 additions & 32 deletions redisgraph/query_result.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
from .node import Node
from .edge import Edge
from .path import Path
from .exceptions import VersionMismatchException
from prettytable import PrettyTable
from redis import ResponseError

LABELS_ADDED = 'Labels added'
NODES_CREATED = 'Nodes created'
NODES_DELETED = 'Nodes deleted'
RELATIONSHIPS_DELETED = 'Relationships deleted'
PROPERTIES_SET = 'Properties set'
RELATIONSHIPS_CREATED = 'Relationships created'
INDICES_CREATED = "Indices created"
INDICES_DELETED = "Indices deleted"
CACHED_EXECUTION = "Cached execution"
INTERNAL_EXECUTION_TIME = 'internal execution time'

STATS = [LABELS_ADDED, NODES_CREATED, PROPERTIES_SET, RELATIONSHIPS_CREATED,
NODES_DELETED, RELATIONSHIPS_DELETED, INDICES_CREATED, INDICES_DELETED,
CACHED_EXECUTION, INTERNAL_EXECUTION_TIME]

class ResultSetColumnTypes(object):
COLUMN_UNKNOWN = 0
COLUMN_SCALAR = 1
COLUMN_NODE = 2 # Unused as of RedisGraph v2.1.0, retained for backwards compatibility.
COLUMN_RELATION = 3 # Unused as of RedisGraph v2.1.0, retained for backwards compatibility.


class ResultSetScalarTypes(object):
VALUE_UNKNOWN = 0
VALUE_NULL = 1
Expand All @@ -25,31 +39,33 @@ class ResultSetScalarTypes(object):
VALUE_PATH = 9

class QueryResult(object):
LABELS_ADDED = 'Labels added'
NODES_CREATED = 'Nodes created'
NODES_DELETED = 'Nodes deleted'
RELATIONSHIPS_DELETED = 'Relationships deleted'
PROPERTIES_SET = 'Properties set'
RELATIONSHIPS_CREATED = 'Relationships created'
INDICES_CREATED = "Indices created"
INDICES_DELETED = "Indices deleted"
CACHED_EXECUTION = "Cached execution"
INTERNAL_EXECUTION_TIME = 'internal execution time'

def __init__(self, graph, response):
self.graph = graph
self.header = []
self.result_set = []

# If we encountered a run-time error, the last response element will be an exception.
if isinstance(response[-1], ResponseError):
raise response[-1]
# incase of an error an exception will be raised
self._check_for_errors(response)

if len(response) == 1:
self.parse_statistics(response[0])
else:
self.parse_results(response)
# start by parsing statistics, matches the one we have
self.parse_statistics(response[-1]) # Last element.
self.parse_results(response)

def _check_for_errors(self, response):
if isinstance(response[0], ResponseError):
error = response[0]
if str(error) == "version mismatch":
version = response[1]
error = VersionMismatchException(version)
raise error

# If we encountered a run-time error, the last response element will be an exception.
if isinstance(response[-1], ResponseError):
raise response[-1]

def parse_results(self, raw_result_set):
self.header = self.parse_header(raw_result_set)
Expand All @@ -63,10 +79,12 @@ def parse_results(self, raw_result_set):
def parse_statistics(self, raw_statistics):
self.statistics = {}

stats = [self.LABELS_ADDED, self.NODES_CREATED, self.PROPERTIES_SET, self.RELATIONSHIPS_CREATED,
self.NODES_DELETED, self.RELATIONSHIPS_DELETED, self.INDICES_CREATED, self.INDICES_DELETED,
self.CACHED_EXECUTION, self.INTERNAL_EXECUTION_TIME]
for s in stats:
# decode statistics
for idx, stat in enumerate(raw_statistics):
if isinstance(stat, bytes):
raw_statistics[idx] = stat.decode()

for s in STATS:
v = self._get_value(s, raw_statistics)
if v is not None:
self.statistics[s] = v
Expand Down Expand Up @@ -223,53 +241,51 @@ def is_empty(self):
@staticmethod
def _get_value(prop, statistics):
for stat in statistics:
if isinstance(stat, bytes):
stat = stat.decode()
if prop in stat:
return float(stat.split(': ')[1].split(' ')[0])


return None

def _get_stat(self, stat):
return self.statistics[stat] if stat in self.statistics else 0

@property
def labels_added(self):
return self._get_stat(self.LABELS_ADDED)
return self._get_stat(LABELS_ADDED)

@property
def nodes_created(self):
return self._get_stat(self.NODES_CREATED)
return self._get_stat(NODES_CREATED)

@property
def nodes_deleted(self):
return self._get_stat(self.NODES_DELETED)
return self._get_stat(NODES_DELETED)

@property
def properties_set(self):
return self._get_stat(self.PROPERTIES_SET)
return self._get_stat(PROPERTIES_SET)

@property
def relationships_created(self):
return self._get_stat(self.RELATIONSHIPS_CREATED)
return self._get_stat(RELATIONSHIPS_CREATED)

@property
def relationships_deleted(self):
return self._get_stat(self.RELATIONSHIPS_DELETED)
return self._get_stat(RELATIONSHIPS_DELETED)

@property
def indices_created(self):
return self._get_stat(self.INDICES_CREATED)
return self._get_stat(INDICES_CREATED)

@property
def indices_deleted(self):
return self._get_stat(self.INDICES_DELETED)
return self._get_stat(INDICES_DELETED)

@property
def cached_execution(self):
return self._get_stat(self.CACHED_EXECUTION) == 1
return self._get_stat(CACHED_EXECUTION) == 1

@property
def run_time_ms(self):
return self._get_stat(self.INTERNAL_EXECUTION_TIME)
return self._get_stat(INTERNAL_EXECUTION_TIME)

Loading