From 8079cc4536f1e5d59743db8e609ca4c3b701985b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Nov 2019 12:36:02 +0000 Subject: [PATCH 1/2] Make sure that we close cursors before returning from a query There are lots of words in the comment as to why this is a good idea. --- changelog.d/6408.bugfix | 1 + synapse/storage/_base.py | 45 ++++++++++++++++---- synapse/storage/data_stores/main/receipts.py | 2 +- 3 files changed, 38 insertions(+), 10 deletions(-) create mode 100644 changelog.d/6408.bugfix diff --git a/changelog.d/6408.bugfix b/changelog.d/6408.bugfix new file mode 100644 index 000000000000..c9babe599b7e --- /dev/null +++ b/changelog.d/6408.bugfix @@ -0,0 +1 @@ +Fix an intermittent exception when handling read-receipts. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6b8a9cd89abe..d2af3786f4b4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -409,16 +409,16 @@ def _new_transaction( i = 0 N = 5 while True: + cursor = conn.cursor() + cursor = LoggingTransaction( + cursor, + name, + self.database_engine, + after_callbacks, + exception_callbacks, + ) try: - txn = conn.cursor() - txn = LoggingTransaction( - txn, - name, - self.database_engine, - after_callbacks, - exception_callbacks, - ) - r = func(txn, *args, **kwargs) + r = func(cursor, *args, **kwargs) conn.commit() return r except self.database_engine.module.OperationalError as e: @@ -456,6 +456,33 @@ def _new_transaction( ) continue raise + finally: + # we're either about to retry with a new cursor, or we're about to + # release the connection. Once we release the connection, it could + # get used for another query, which might do a conn.rollback(). + # + # In the latter case, even though that probably wouldn't affect the + # results of this transaction, python's sqlite will reset all + # statements on the connection [1], which will make our cursor + # invalid [2]. + # + # While the above probably doesn't apply to postgres, we still need + # to make sure that we have done with the cursor before we release + # the connection, for compatibility with sqlite. + # + # In any case, continuing to read rows after commit()ing seems + # dubious from the PoV of ACID transactional semantics + # (sqlite explicitly says that once you commit, you may see rows + # from subsequent updates.) + # + # In short, if we haven't finished with the cursor yet, that's a + # problem waiting to bite us. + # + # TL;DR: we're done with the cursor, so we can close it. + # + # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465 + # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236 + cursor.close() except Exception as e: logger.debug("[TXN FAIL] {%s} %s", name, e) raise diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index 0c24430f2825..9dc29aa01402 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -280,7 +280,7 @@ def get_all_updated_receipts_txn(txn): args.append(limit) txn.execute(sql, args) - return (r[0:5] + (json.loads(r[5]),) for r in txn) + return [r[0:5] + (json.loads(r[5]),) for r in txn] return self.runInteraction( "get_all_updated_receipts", get_all_updated_receipts_txn From f29f821fbf43fa22fa6798b7f4b45028fbe221d2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Nov 2019 18:05:56 +0000 Subject: [PATCH 2/2] address review comments --- synapse/storage/_base.py | 18 ++++++++++++------ synapse/storage/data_stores/main/receipts.py | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d2af3786f4b4..459901ac60a4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -409,9 +409,8 @@ def _new_transaction( i = 0 N = 5 while True: - cursor = conn.cursor() cursor = LoggingTransaction( - cursor, + conn.cursor(), name, self.database_engine, after_callbacks, @@ -466,15 +465,22 @@ def _new_transaction( # statements on the connection [1], which will make our cursor # invalid [2]. # - # While the above probably doesn't apply to postgres, we still need - # to make sure that we have done with the cursor before we release - # the connection, for compatibility with sqlite. - # # In any case, continuing to read rows after commit()ing seems # dubious from the PoV of ACID transactional semantics # (sqlite explicitly says that once you commit, you may see rows # from subsequent updates.) # + # In psycopg2, cursors are essentially a client-side fabrication - + # all the data is transferred to the client side when the statement + # finishes executing - so in theory we could go on streaming results + # from the cursor, but attempting to do so would make us + # incompatible with sqlite, so let's make sure we're not doing that + # by closing the cursor. + # + # (*named* cursors in psycopg2 are different and are proper server- + # side things, but (a) we don't use them and (b) they are implicitly + # closed by ending the transaction anyway.) + # # In short, if we haven't finished with the cursor yet, that's a # problem waiting to bite us. # diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index 9dc29aa01402..8b17334ff434 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -280,7 +280,7 @@ def get_all_updated_receipts_txn(txn): args.append(limit) txn.execute(sql, args) - return [r[0:5] + (json.loads(r[5]),) for r in txn] + return list(r[0:5] + (json.loads(r[5]),) for r in txn) return self.runInteraction( "get_all_updated_receipts", get_all_updated_receipts_txn