Skip to content

Commit

Permalink
Add binding result streaming to client (#430)
Browse files Browse the repository at this point in the history
* Add binding result streaming to client

* Remove spaces around assignment

* Satisfy the linting gods

* Fix streaming field

* Rename? Or copy...

* Put back original test
  • Loading branch information
GavinMendelGleason authored Feb 16, 2024
1 parent f738c46 commit 7731f2e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 17 deletions.
39 changes: 38 additions & 1 deletion terminusdb_client/client/Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,36 @@
# summary Python module for accessing the Terminus DB API


class WoqlResult:
"""Iterator for streaming WOQL results."""
def __init__(self, lines):
preface = json.loads(next(lines))
if not ('@type' in preface and preface['@type'] == 'PrefaceRecord'):
raise DatabaseError(response=preface)
self.preface = preface
self.postscript = {}
self.lines = lines

def _check_error(self, document):
if ('@type' in document):
if document['@type'] == 'Binding':
return document
if document['@type'] == 'PostscriptRecord':
self.postscript = document
raise StopIteration()

raise DatabaseError(response=document)

def variable_names(self):
return self.preface['names']

def __iter__(self):
return self

def __next__(self):
return self._check_error(json.loads(next(self.lines)))


class JWTAuth(requests.auth.AuthBase):
"""Class for JWT Authentication in requests"""

Expand Down Expand Up @@ -1500,8 +1530,9 @@ def query(
commit_msg: Optional[str] = None,
get_data_version: bool = False,
last_data_version: Optional[str] = None,
streaming: bool = False,
# file_dict: Optional[dict] = None,
) -> Union[dict, str]:
) -> Union[dict, str, WoqlResult]:
"""Updates the contents of the specified graph with the triples encoded in turtle format Replaces the entire graph contents
Parameters
Expand Down Expand Up @@ -1537,6 +1568,7 @@ def query(
else:
request_woql_query = woql_query
query_obj["query"] = request_woql_query
query_obj["streaming"] = streaming

headers = self._default_headers.copy()
if last_data_version is not None:
Expand All @@ -1547,7 +1579,12 @@ def query(
headers=headers,
json=query_obj,
auth=self._auth(),
stream=streaming
)

if streaming:
return WoqlResult(lines=_finish_response(result, streaming=True))

if get_data_version:
result, version = _finish_response(result, get_data_version)
result = json.loads(result)
Expand Down
1 change: 1 addition & 0 deletions terminusdb_client/tests/test_Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def test_query(mocked_requests, mocked_requests2, mocked_requests3):
"query": WoqlStar,
},
headers={"user-agent": f"terminusdb-client-python/{__version__}"},
stream=False
)


Expand Down
9 changes: 6 additions & 3 deletions terminusdb_client/woql_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _args_as_payload(args: dict) -> dict:
return {k: v for k, v in args.items() if v}


def _finish_response(request_response, get_version=False):
def _finish_response(request_response, get_version=False, streaming=False):
"""Get the response text
Parameters
Expand All @@ -43,11 +43,14 @@ def _finish_response(request_response, get_version=False):
"""
if request_response.status_code == 200:
if get_version:
if get_version and not streaming:
return request_response.text, request_response.headers.get(
"Terminusdb-Data-Version"
)
return request_response.text # if not a json it raises an error
if streaming:
return request_response.iter_lines()
else:
return request_response.text # if not a json it raises an error
elif request_response.status_code > 399 and request_response.status_code < 599:
raise DatabaseError(request_response)

Expand Down
27 changes: 14 additions & 13 deletions terminusdb_client/woqlquery/woql_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,21 +367,22 @@ def _clean_subject(self, obj):
return self._expand_node_variable(obj)
raise ValueError("Subject must be a URI string")

def _clean_predicate(self, predicate):
def _clean_predicate(self, obj):
"""Transforms whatever is passed in as the predicate (id or variable) into the appropriate json-ld form"""
pred = False
if isinstance(predicate, dict):
return predicate
if not isinstance(predicate, str):
raise ValueError("Predicate must be a URI string")
return str(predicate)
if ":" in predicate:
pred = predicate
elif self._vocab and (predicate in self._vocab):
pred = self._vocab[predicate]
else:
pred = predicate
return self._expand_node_variable(pred)
if type(obj) is dict:
return obj
elif type(obj) is str:
if ":" in obj:
pred = obj
elif self._vocab and (obj in self._vocab):
pred = self._vocab[obj]
else:
pred = obj
return self._expand_node_variable(pred)
elif isinstance(obj, Var):
return self._expand_node_variable(obj)
raise ValueError("Predicate must be a URI string")

def _clean_path_predicate(self, predicate=None):
pred = False
Expand Down

0 comments on commit 7731f2e

Please sign in to comment.