Skip to content

Commit

Permalink
Retry if start() raises a retryable error (edgedb#228)
Browse files Browse the repository at this point in the history
This issue was found in HA failover when the master Postgres is down, it
raises a BackendUnavailableError to start a transaction, but the
retrying_transaction() couldn't capture this retryable error from
start(), because start() is called in __enter__().

Now _start() is called in tx.execute() or query*() lazily.

Also removed start(), commit() and rollback() from
retrying_transaction().
  • Loading branch information
fantix authored Aug 24, 2021
1 parent 0ea54d6 commit ffaae01
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 76 deletions.
50 changes: 42 additions & 8 deletions edgedb/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,40 @@
from . import transaction as _transaction


class AsyncIOIteration(_transaction.AsyncIOTransaction):
class AsyncIOIteration(_transaction.BaseAsyncIOTransaction):
def __init__(self, retry, owner, iteration):
super().__init__(owner, retry._options.transaction_options)
self.__retry = retry
self.__iteration = iteration
self.__started = False

async def start(self):
async def _ensure_transaction(self):
if not self._managed:
raise errors.InterfaceError(
"Only managed retriable transactions are supported. "
"Use `async with transaction:`"
)
await self._start(single_connect=self.__iteration != 0)
if not self.__started:
self.__started = True
await self._start(single_connect=self.__iteration != 0)

async def __aenter__(self):
if self._managed:
raise errors.InterfaceError(
'cannot enter context: already in an `async with` block')
self._managed = True
return self

async def __aexit__(self, extype, ex, tb):
self._managed = False
if not self.__started:
return False

try:
await super().__aexit__(extype, ex, tb)
if extype is not None:
await self._rollback()
else:
await self._commit()
except errors.EdgeDBError as err:
if ex is None:
# On commit we don't know if commit is succeeded before the
Expand Down Expand Up @@ -101,23 +118,40 @@ def __next__(self):
return iteration


class Iteration(_transaction.Transaction):
class Iteration(_transaction.BaseBlockingIOTransaction):
def __init__(self, retry, owner, iteration):
super().__init__(owner, retry._options.transaction_options)
self.__retry = retry
self.__iteration = iteration
self.__started = False

def start(self):
def _ensure_transaction(self):
if not self._managed:
raise errors.InterfaceError(
"Only managed retriable transactions are supported. "
"Use `with transaction:`"
)
self._start(single_connect=self.__iteration != 0)
if not self.__started:
self.__started = True
self._start(single_connect=self.__iteration != 0)

def __enter__(self):
if self._managed:
raise errors.InterfaceError(
'cannot enter context: already in a `with` block')
self._managed = True
return self

def __exit__(self, extype, ex, tb):
self._managed = False
if not self.__started:
return False

try:
super().__exit__(extype, ex, tb)
if extype is not None:
self._rollback()
else:
self._commit()
except errors.EdgeDBError as err:
if ex is None:
# On commit we don't know if commit is succeeded before the
Expand Down
144 changes: 83 additions & 61 deletions edgedb/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,30 +105,9 @@ def __repr__(self):
mod, self.__class__.__name__, ' '.join(attrs), id(self))


class AsyncIOTransaction(BaseTransaction, abstract.AsyncIOExecutor):
class BaseAsyncIOTransaction(BaseTransaction, abstract.AsyncIOExecutor):
__slots__ = ()

async def __aenter__(self):
if self._managed:
raise errors.InterfaceError(
'cannot enter context: already in an `async with` block')
self._managed = True
await self.start()
return self

async def __aexit__(self, extype, ex, tb):
try:
if extype is not None:
await self.__rollback()
else:
await self.__commit()
finally:
self._managed = False

async def start(self) -> None:
"""Enter the transaction or savepoint block."""
await self._start()

async def _start(self, single_connect=False) -> None:
query = self._make_start_query()
if isinstance(self._owner, base_con.BaseConnection):
Expand All @@ -152,7 +131,7 @@ async def _start(self, single_connect=False) -> None:
else:
self._state = TransactionState.STARTED

async def __commit(self):
async def _commit(self):
query = self._make_commit_query()
try:
await self._connection_impl.privileged_execute(query)
Expand All @@ -166,7 +145,7 @@ async def __commit(self):
if self._connection is not self._owner:
await self._owner._release(self._connection)

async def __rollback(self):
async def _rollback(self):
query = self._make_rollback_query()
try:
await self._connection_impl.privileged_execute(query)
Expand All @@ -180,21 +159,11 @@ async def __rollback(self):
if self._connection is not self._owner:
await self._owner._release(self._connection)

async def commit(self) -> None:
"""Exit the transaction or savepoint block and commit changes."""
if self._managed:
raise errors.InterfaceError(
'cannot manually commit from within an `async with` block')
await self.__commit()

async def rollback(self) -> None:
"""Exit the transaction or savepoint block and rollback changes."""
if self._managed:
raise errors.InterfaceError(
'cannot manually rollback from within an `async with` block')
await self.__rollback()
async def _ensure_transaction(self):
pass

async def query(self, query: str, *args, **kwargs) -> datatypes.Set:
await self._ensure_transaction()
con = self._connection_inner
result, _ = await self._connection_impl._protocol.execute_anonymous(
query=query,
Expand All @@ -207,6 +176,7 @@ async def query(self, query: str, *args, **kwargs) -> datatypes.Set:
return result

async def query_single(self, query: str, *args, **kwargs) -> typing.Any:
await self._ensure_transaction()
con = self._connection_inner
result, _ = await self._connection_impl._protocol.execute_anonymous(
query=query,
Expand All @@ -220,6 +190,7 @@ async def query_single(self, query: str, *args, **kwargs) -> typing.Any:
return result

async def query_json(self, query: str, *args, **kwargs) -> str:
await self._ensure_transaction()
con = self._connection_inner
result, _ = await self._connection_impl._protocol.execute_anonymous(
query=query,
Expand All @@ -232,6 +203,7 @@ async def query_json(self, query: str, *args, **kwargs) -> str:
return result

async def query_single_json(self, query: str, *args, **kwargs) -> str:
await self._ensure_transaction()
con = self._connection_inner
result, _ = await self._connection_impl._protocol.execute_anonymous(
query=query,
Expand All @@ -256,33 +228,52 @@ async def execute(self, query: str) -> None:
... FOR x IN {100, 200, 300} UNION INSERT MyType { a := x };
... ''')
"""
await self._ensure_transaction()
await self._connection_impl._protocol.simple_query(
query, enums.Capability.EXECUTE)


class Transaction(BaseTransaction, abstract.Executor):
class AsyncIOTransaction(BaseAsyncIOTransaction):
__slots__ = ()

def __enter__(self):
async def __aenter__(self):
if self._managed:
raise errors.InterfaceError(
'cannot enter context: already in a `with` block')
'cannot enter context: already in an `async with` block')
self._managed = True
self.start()
await self.start()
return self

def __exit__(self, extype, ex, tb):
async def __aexit__(self, extype, ex, tb):
try:
if extype is not None:
self.__rollback()
await self._rollback()
else:
self.__commit()
await self._commit()
finally:
self._managed = False

def start(self) -> None:
async def start(self) -> None:
"""Enter the transaction or savepoint block."""
self._start()
await self._start()

async def commit(self) -> None:
"""Exit the transaction or savepoint block and commit changes."""
if self._managed:
raise errors.InterfaceError(
'cannot manually commit from within an `async with` block')
await self._commit()

async def rollback(self) -> None:
"""Exit the transaction or savepoint block and rollback changes."""
if self._managed:
raise errors.InterfaceError(
'cannot manually rollback from within an `async with` block')
await self._rollback()


class BaseBlockingIOTransaction(BaseTransaction, abstract.Executor):
__slots__ = ()

def _start(self, single_connect=False) -> None:
query = self._make_start_query()
Expand All @@ -304,7 +295,7 @@ def _start(self, single_connect=False) -> None:
else:
self._state = TransactionState.STARTED

def __commit(self):
def _commit(self):
query = self._make_commit_query()
try:
self._connection_impl.privileged_execute(query)
Expand All @@ -316,7 +307,7 @@ def __commit(self):
finally:
self._connection_inner._borrowed_for = None

def __rollback(self):
def _rollback(self):
query = self._make_rollback_query()
try:
self._connection_impl.privileged_execute(query)
Expand All @@ -328,21 +319,11 @@ def __rollback(self):
finally:
self._connection_inner._borrowed_for = None

def commit(self) -> None:
"""Exit the transaction or savepoint block and commit changes."""
if self._managed:
raise errors.InterfaceError(
'cannot manually commit from within a `with` block')
self.__commit()

def rollback(self) -> None:
"""Exit the transaction or savepoint block and rollback changes."""
if self._managed:
raise errors.InterfaceError(
'cannot manually rollback from within a `with` block')
self.__rollback()
def _ensure_transaction(self):
pass

def query(self, query: str, *args, **kwargs) -> datatypes.Set:
self._ensure_transaction()
con = self._connection_inner
return self._connection_impl._protocol.sync_execute_anonymous(
query=query,
Expand All @@ -354,6 +335,7 @@ def query(self, query: str, *args, **kwargs) -> datatypes.Set:
)

def query_single(self, query: str, *args, **kwargs) -> typing.Any:
self._ensure_transaction()
con = self._connection_inner
return self._connection_impl._protocol.sync_execute_anonymous(
query=query,
Expand All @@ -366,6 +348,7 @@ def query_single(self, query: str, *args, **kwargs) -> typing.Any:
)

def query_json(self, query: str, *args, **kwargs) -> str:
self._ensure_transaction()
con = self._connection_inner
return self._connection_impl._protocol.sync_execute_anonymous(
query=query,
Expand All @@ -377,6 +360,7 @@ def query_json(self, query: str, *args, **kwargs) -> str:
)

def query_single_json(self, query: str, *args, **kwargs) -> str:
self._ensure_transaction()
con = self._connection_inner
return self._connection_impl._protocol.sync_execute_anonymous(
query=query,
Expand All @@ -389,5 +373,43 @@ def query_single_json(self, query: str, *args, **kwargs) -> str:
)

def execute(self, query: str) -> None:
self._ensure_transaction()
self._connection_impl._protocol.sync_simple_query(
query, enums.Capability.EXECUTE)


class Transaction(BaseBlockingIOTransaction):
def __enter__(self):
if self._managed:
raise errors.InterfaceError(
'cannot enter context: already in a `with` block')
self._managed = True
self.start()
return self

def __exit__(self, extype, ex, tb):
try:
if extype is not None:
self._rollback()
else:
self._commit()
finally:
self._managed = False

def start(self) -> None:
"""Enter the transaction or savepoint block."""
self._start()

def commit(self) -> None:
"""Exit the transaction or savepoint block and commit changes."""
if self._managed:
raise errors.InterfaceError(
'cannot manually commit from within a `with` block')
self._commit()

def rollback(self) -> None:
"""Exit the transaction or savepoint block and rollback changes."""
if self._managed:
raise errors.InterfaceError(
'cannot manually rollback from within a `with` block')
self._rollback()
Loading

0 comments on commit ffaae01

Please sign in to comment.