Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Query iterator only throws exeption once (#252)
Browse files Browse the repository at this point in the history
## What is the goal of this PR?

Revert previous changes from #247 and #248, which made query queues and iterators throw the same error idempotently. However, this goes counter to standard usage of iterators and queues, which are not meant to behave idempotently (each item is only returned once, and if they have an error they should no longer be used). 


## What are the changes implemented in this PR?

* remove idempotent error state of collectors and queues, which back query iterators
  * note that we still store the error on the transaction bidirectional stream, in case the server throws an exception when there are no query iterators active
  

Note: mirrors change from typedb/typedb-driver#372
  • Loading branch information
flyingsilverfin authored Mar 22, 2022
1 parent fca5194 commit 6282d1e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 18 deletions.
5 changes: 2 additions & 3 deletions typedb/stream/bidirectional_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# under the License.
#
from queue import Empty, Queue
from typing import TypeVar, Iterator, Union, Generic, List
from typing import TypeVar, Iterator, Union
from uuid import uuid4, UUID

import typedb_protocol.common.transaction_pb2 as transaction_proto
Expand Down Expand Up @@ -133,8 +133,7 @@ def __init__(self, request_id: UUID, stream: "BidirectionalStream"):
self._stream = stream

def get(self) -> T:
value = self._stream.fetch(self._request_id)
return value
return self._stream.fetch(self._request_id)


class RequestIterator(Iterator[Union[transaction_proto.Transaction.Req, StopIteration]]):
Expand Down
21 changes: 9 additions & 12 deletions typedb/stream/response_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from typing import Generic, TypeVar, Dict, Optional
from uuid import UUID

from typedb.common.exception import TypeDBClientException, TRANSACTION_CLOSED, ILLEGAL_STATE, \
TRANSACTION_CLOSED_WITH_ERRORS
from grpc import RpcError
from typedb.common.exception import TypeDBClientException, TRANSACTION_CLOSED, ILLEGAL_STATE

R = TypeVar('R')

Expand Down Expand Up @@ -54,26 +54,23 @@ class Queue(Generic[R]):

def __init__(self):
self._response_queue: queue.Queue[Response] = queue.Queue()
self._error: TypeDBClientException = None

def get(self, block: bool) -> R:
response = self._response_queue.get(block=block)
if response.is_value():
return response.value
elif response.is_done():
self._raise_transaction_closed_error()
elif response.is_done() and response.error is None:
raise TypeDBClientException.of(TRANSACTION_CLOSED)
elif response.is_done() and response.error is not None:
raise TypeDBClientException.of_rpc(response.error)
else:
raise TypeDBClientException.of(ILLEGAL_STATE)

def _raise_transaction_closed_error(self):
raise TypeDBClientException.of(TRANSACTION_CLOSED_WITH_ERRORS, self._error) if self._error else TypeDBClientException.of(TRANSACTION_CLOSED)

def put(self, response: R):
self._response_queue.put(ValueResponse(response))

def close(self, error: Optional[TypeDBClientException]):
self._error = error
self._response_queue.put(DoneResponse())
self._response_queue.put(DoneResponse(error))


class Response:
Expand All @@ -96,8 +93,8 @@ def is_value(self):

class DoneResponse(Response):

def __init__(self):
pass
def __init__(self, error: Optional[RpcError]):
self.error = error

def is_done(self):
return True
4 changes: 1 addition & 3 deletions typedb/stream/response_part_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ def _has_next(self) -> bool:
raise TypeDBClientException.of(ILLEGAL_STATE)

def __next__(self) -> transaction_proto.Transaction.ResPart:
if self._bidirectional_stream.get_error() is not None:
raise self._bidirectional_stream.get_error()
elif not self._has_next():
if not self._has_next():
raise StopIteration
else:
self._state = ResponsePartIterator.State.EMPTY
Expand Down

0 comments on commit 6282d1e

Please sign in to comment.