diff --git a/redisgraph/exceptions.py b/redisgraph/exceptions.py new file mode 100644 index 0000000..1911c78 --- /dev/null +++ b/redisgraph/exceptions.py @@ -0,0 +1,4 @@ +class VersionMismatchException(Exception): + def __init__(self, version): + self.version = version + diff --git a/redisgraph/graph.py b/redisgraph/graph.py index dd0b8d8..2323ede 100644 --- a/redisgraph/graph.py +++ b/redisgraph/graph.py @@ -1,5 +1,8 @@ from .util import * +import redis from .query_result import QueryResult +from .exceptions import VersionMismatchException + class Graph(object): """ Graph, collection of nodes and edges. @@ -9,25 +12,56 @@ def __init__(self, name, redis_con): """ Create a new graph. """ - self.name = name + self.name = name # Graph key self.redis_con = redis_con self.nodes = {} self.edges = [] - self._labels = [] # List of node labels. - self._properties = [] # List of properties. - self._relationshipTypes = [] # List of relation types. + self._labels = [] # List of node labels. + self._properties = [] # List of properties. + self._relationshipTypes = [] # List of relation types. + self.version = 0 # Graph version + + def _clear_schema(self): + self._labels = [] + self._properties = [] + self._relationshipTypes = [] + + def _refresh_schema(self): + self._clear_schema() + self._refresh_labels() + self._refresh_relations() + self._refresh_attributes() + + def _refresh_labels(self): + lbls = self.labels() + + # Unpack data. + self._labels = [None] * len(lbls) + for i, l in enumerate(lbls): + self._labels[i] = l[0] + + def _refresh_relations(self): + rels = self.relationshipTypes() + + # Unpack data. + self._relationshipTypes = [None] * len(rels) + for i, r in enumerate(rels): + self._relationshipTypes[i] = r[0] + + def _refresh_attributes(self): + props = self.propertyKeys() + + # Unpack data. + self._properties = [None] * len(props) + for i, p in enumerate(props): + self._properties[i] = p[0] def get_label(self, idx): try: label = self._labels[idx] except IndexError: - # Refresh graph labels. - lbls = self.labels() - # Unpack data. - self._labels = [None] * len(lbls) - for i, l in enumerate(lbls): - self._labels[i] = l[0] - + # Refresh labels. + self._refresh_labels() label = self._labels[idx] return label @@ -35,13 +69,8 @@ def get_relation(self, idx): try: relationshipType = self._relationshipTypes[idx] except IndexError: - # Refresh graph relations. - rels = self.relationshipTypes() - # Unpack data. - self._relationshipTypes = [None] * len(rels) - for i, r in enumerate(rels): - self._relationshipTypes[i] = r[0] - + # Refresh relationship types. + self._refresh_relations() relationshipType = self._relationshipTypes[idx] return relationshipType @@ -50,12 +79,7 @@ def get_property(self, idx): propertie = self._properties[idx] except IndexError: # Refresh properties. - props = self.propertyKeys() - # Unpack data. - self._properties = [None] * len(props) - for i, p in enumerate(props): - self._properties[i] = p[0] - + self._refresh_attributes() propertie = self._properties[idx] return propertie @@ -121,16 +145,40 @@ def query(self, q, params=None, timeout=None): """ Executes a query against the graph. """ + + # maintain original 'q' + query = q + + # handle query parameters if params is not None: - q = self.build_params_header(params) + q + query = self.build_params_header(params) + query + + # construct query command + # ask for compact result-set format + # specify known graph version + command = ["GRAPH.QUERY", self.name, query, "--compact", "version", self.version] - command = ["GRAPH.QUERY", self.name, q, "--compact"] + # include timeout is specified if timeout: if not isinstance(timeout, int): raise Exception("Timeout argument must be a positive integer") command += ["timeout", timeout] - response = self.redis_con.execute_command(*command) - return QueryResult(self, response) + + # issue query + try: + response = self.redis_con.execute_command(*command) + return QueryResult(self, response) + except redis.exceptions.ResponseError as e: + if "wrong number of arguments" in str(e): + print ("Note: RedisGraph Python requires server version 2.2.8 or above") + raise e + except VersionMismatchException as e: + # client view over the graph schema is out of sync + # set client version and refresh local schema + self.version = e.version + self._refresh_schema() + # re-issue query + return self.query(q, params, timeout) def _execution_plan_to_string(self, plan): return "\n".join(plan) @@ -151,6 +199,7 @@ def delete(self): """ Deletes graph. """ + self._clear_schema() return self.redis_con.execute_command("GRAPH.DELETE", self.name) def merge(self, pattern): diff --git a/redisgraph/query_result.py b/redisgraph/query_result.py index ce79704..6830825 100644 --- a/redisgraph/query_result.py +++ b/redisgraph/query_result.py @@ -1,9 +1,24 @@ from .node import Node from .edge import Edge from .path import Path +from .exceptions import VersionMismatchException from prettytable import PrettyTable from redis import ResponseError +LABELS_ADDED = 'Labels added' +NODES_CREATED = 'Nodes created' +NODES_DELETED = 'Nodes deleted' +RELATIONSHIPS_DELETED = 'Relationships deleted' +PROPERTIES_SET = 'Properties set' +RELATIONSHIPS_CREATED = 'Relationships created' +INDICES_CREATED = "Indices created" +INDICES_DELETED = "Indices deleted" +CACHED_EXECUTION = "Cached execution" +INTERNAL_EXECUTION_TIME = 'internal execution time' + +STATS = [LABELS_ADDED, NODES_CREATED, PROPERTIES_SET, RELATIONSHIPS_CREATED, + NODES_DELETED, RELATIONSHIPS_DELETED, INDICES_CREATED, INDICES_DELETED, + CACHED_EXECUTION, INTERNAL_EXECUTION_TIME] class ResultSetColumnTypes(object): COLUMN_UNKNOWN = 0 @@ -11,7 +26,6 @@ class ResultSetColumnTypes(object): COLUMN_NODE = 2 # Unused as of RedisGraph v2.1.0, retained for backwards compatibility. COLUMN_RELATION = 3 # Unused as of RedisGraph v2.1.0, retained for backwards compatibility. - class ResultSetScalarTypes(object): VALUE_UNKNOWN = 0 VALUE_NULL = 1 @@ -25,31 +39,33 @@ class ResultSetScalarTypes(object): VALUE_PATH = 9 class QueryResult(object): - LABELS_ADDED = 'Labels added' - NODES_CREATED = 'Nodes created' - NODES_DELETED = 'Nodes deleted' - RELATIONSHIPS_DELETED = 'Relationships deleted' - PROPERTIES_SET = 'Properties set' - RELATIONSHIPS_CREATED = 'Relationships created' - INDICES_CREATED = "Indices created" - INDICES_DELETED = "Indices deleted" - CACHED_EXECUTION = "Cached execution" - INTERNAL_EXECUTION_TIME = 'internal execution time' def __init__(self, graph, response): self.graph = graph self.header = [] self.result_set = [] - # If we encountered a run-time error, the last response element will be an exception. - if isinstance(response[-1], ResponseError): - raise response[-1] + # incase of an error an exception will be raised + self._check_for_errors(response) if len(response) == 1: self.parse_statistics(response[0]) else: - self.parse_results(response) + # start by parsing statistics, matches the one we have self.parse_statistics(response[-1]) # Last element. + self.parse_results(response) + + def _check_for_errors(self, response): + if isinstance(response[0], ResponseError): + error = response[0] + if str(error) == "version mismatch": + version = response[1] + error = VersionMismatchException(version) + raise error + + # If we encountered a run-time error, the last response element will be an exception. + if isinstance(response[-1], ResponseError): + raise response[-1] def parse_results(self, raw_result_set): self.header = self.parse_header(raw_result_set) @@ -63,10 +79,12 @@ def parse_results(self, raw_result_set): def parse_statistics(self, raw_statistics): self.statistics = {} - stats = [self.LABELS_ADDED, self.NODES_CREATED, self.PROPERTIES_SET, self.RELATIONSHIPS_CREATED, - self.NODES_DELETED, self.RELATIONSHIPS_DELETED, self.INDICES_CREATED, self.INDICES_DELETED, - self.CACHED_EXECUTION, self.INTERNAL_EXECUTION_TIME] - for s in stats: + # decode statistics + for idx, stat in enumerate(raw_statistics): + if isinstance(stat, bytes): + raw_statistics[idx] = stat.decode() + + for s in STATS: v = self._get_value(s, raw_statistics) if v is not None: self.statistics[s] = v @@ -223,12 +241,9 @@ def is_empty(self): @staticmethod def _get_value(prop, statistics): for stat in statistics: - if isinstance(stat, bytes): - stat = stat.decode() if prop in stat: return float(stat.split(': ')[1].split(' ')[0]) - return None def _get_stat(self, stat): @@ -236,40 +251,41 @@ def _get_stat(self, stat): @property def labels_added(self): - return self._get_stat(self.LABELS_ADDED) + return self._get_stat(LABELS_ADDED) @property def nodes_created(self): - return self._get_stat(self.NODES_CREATED) + return self._get_stat(NODES_CREATED) @property def nodes_deleted(self): - return self._get_stat(self.NODES_DELETED) + return self._get_stat(NODES_DELETED) @property def properties_set(self): - return self._get_stat(self.PROPERTIES_SET) + return self._get_stat(PROPERTIES_SET) @property def relationships_created(self): - return self._get_stat(self.RELATIONSHIPS_CREATED) + return self._get_stat(RELATIONSHIPS_CREATED) @property def relationships_deleted(self): - return self._get_stat(self.RELATIONSHIPS_DELETED) + return self._get_stat(RELATIONSHIPS_DELETED) @property def indices_created(self): - return self._get_stat(self.INDICES_CREATED) + return self._get_stat(INDICES_CREATED) @property def indices_deleted(self): - return self._get_stat(self.INDICES_DELETED) + return self._get_stat(INDICES_DELETED) @property def cached_execution(self): - return self._get_stat(self.CACHED_EXECUTION) == 1 + return self._get_stat(CACHED_EXECUTION) == 1 @property def run_time_ms(self): - return self._get_stat(self.INTERNAL_EXECUTION_TIME) + return self._get_stat(INTERNAL_EXECUTION_TIME) + diff --git a/test.py b/test.py index 6bd42fc..74f1362 100644 --- a/test.py +++ b/test.py @@ -228,5 +228,75 @@ def test_query_timeout(self): # Expecting an error. pass + def test_cache_sync(self): + # This test verifies that client internal graph schema cache stays + # in sync with the graph schema + # + # Client B will try to get Client A out of sync by: + # 1. deleting the graph + # 2. reconstructing the graph in a different order, this will casuse + # a differance in the current mapping between string IDs and the + # mapping Client A is aware of + # + # Client A should pick up on the changes by comparing graph versions + # and resyncing its cache. + + A = Graph('cache-sync', self.r) + B = Graph('cache-sync', self.r) + + # Build order: + # 1. introduce label 'L' and 'K' + # 2. introduce attribute 'x' and 'q' + # 3. introduce relationship-type 'R' and 'S' + + A.query("CREATE (:L)") + B.query("CREATE (:K)") + A.query("MATCH (n) SET n.x = 1") + B.query("MATCH (n) SET n.q = 1") + A.query("MATCH (n) CREATE (n)-[:R]->()") + B.query("MATCH (n) CREATE (n)-[:S]->()") + + # Cause client A to populate its cache + A.query("MATCH (n)-[e]->() RETURN n, e") + + assert(len(A._labels) == 2) + assert(len(A._properties) == 2) + assert(len(A._relationshipTypes) == 2) + assert(A._labels[0] == 'L') + assert(A._labels[1] == 'K') + assert(A._properties[0] == 'x') + assert(A._properties[1] == 'q') + assert(A._relationshipTypes[0] == 'R') + assert(A._relationshipTypes[1] == 'S') + + # Have client B reconstruct the graph in a different order. + B.delete() + + # Build order: + # 1. introduce relationship-type 'R' + # 2. introduce label 'L' + # 3. introduce attribute 'x' + B.query("CREATE ()-[:S]->()") + B.query("CREATE ()-[:R]->()") + B.query("CREATE (:K)") + B.query("CREATE (:L)") + B.query("MATCH (n) SET n.q = 1") + B.query("MATCH (n) SET n.x = 1") + + # A's internal cached mapping is now out of sync + # issue a query and make sure A's cache is synced. + A.query("MATCH (n)-[e]->() RETURN n, e") + + assert(len(A._labels) == 2) + assert(len(A._properties) == 2) + assert(len(A._relationshipTypes) == 2) + assert(A._labels[0] == 'K') + assert(A._labels[1] == 'L') + assert(A._properties[0] == 'q') + assert(A._properties[1] == 'x') + assert(A._relationshipTypes[0] == 'S') + assert(A._relationshipTypes[1] == 'R') + if __name__ == '__main__': unittest.main() +