Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Catch-up after Federation Outage #8096

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
18d900e
Fix wrong type annotation
reivilibre Jul 23, 2020
07a415c
Little type hint
reivilibre Aug 14, 2020
5bc321a
Add data store functions and delta
reivilibre Aug 14, 2020
c60e259
Track destination_rooms entries
reivilibre Aug 14, 2020
20c896a
Add catch-up logic
reivilibre Aug 14, 2020
d232200
Add tests!
reivilibre Aug 14, 2020
967d8c1
Merge branch 'develop' into rei/2528_catchup_fed_outage
reivilibre Aug 14, 2020
d910798
Track async/await and db_pool transitions
reivilibre Aug 14, 2020
74a6f4f
Newsfile
reivilibre Aug 14, 2020
c1b32ae
Antilint
reivilibre Aug 14, 2020
5de9313
Fix up test
reivilibre Aug 17, 2020
759e027
Remove unused method
reivilibre Aug 17, 2020
6c52666
Use Python 3.5-friendly Collection type
reivilibre Aug 17, 2020
9ba56cb
Fix logic bug in prior code
reivilibre Aug 17, 2020
af13948
Update changelog.d/8096.bugfix
reivilibre Aug 20, 2020
558af38
Handle suggestions from review
reivilibre Aug 20, 2020
44765e9
How could we ever forget you, ./scripts-dev/lint.sh?
reivilibre Aug 20, 2020
56aaa17
Apply suggestions from Rich's code review
reivilibre Aug 26, 2020
84dbc43
Apply suggestions from Rich's code review
reivilibre Aug 26, 2020
2c740a7
Foreign key on rooms, SQL comment
reivilibre Aug 26, 2020
d77e444
NOT NULL, foreign key (events)
reivilibre Aug 26, 2020
33874d4
SQL column doc
reivilibre Aug 26, 2020
c1a2b68
Behaviour confirmed reasonable-seeming
reivilibre Aug 26, 2020
16eec5c
The early bird gets the early return
reivilibre Aug 26, 2020
92517e9
Assertion on bug
reivilibre Aug 26, 2020
ef4680d
Last successful stream ordering is about destinations
reivilibre Aug 26, 2020
de5caf0
Catch-up on all cases except federation denial
reivilibre Aug 27, 2020
3e308f9
Don't explicitly store the event_id
reivilibre Aug 27, 2020
b0bdadd
Antilint
reivilibre Aug 27, 2020
843403f
Remove review question
reivilibre Aug 27, 2020
ad7124d
Merge branch 'develop' into rei/2528_catchup_fed_outage
reivilibre Aug 27, 2020
b1fd67b
Fix wrong type signatures (even if str is Iterable[str]…)
reivilibre Aug 27, 2020
e6890c7
Fix the tests after removing event_id column
reivilibre Aug 27, 2020
7cfecf3
Antilint
reivilibre Aug 27, 2020
bf51d2f
Also fix `simple_select_onecol_txn`
reivilibre Aug 27, 2020
7589a03
Antilint again :(
reivilibre Aug 27, 2020
8d9f4ba
Merge remote-tracking branch 'origin/develop' into rei/2528_catchup_f…
reivilibre Sep 1, 2020
b60ad35
Merge remote-tracking branch 'origin/develop' into rei/2528_catchup_f…
reivilibre Sep 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING, List, Tuple

from canonicaljson import json

Expand Down Expand Up @@ -54,7 +54,10 @@ def __init__(self, hs: "synapse.server.HomeServer"):

@measure_func("_send_new_transaction")
async def send_new_transaction(
self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
self,
destination: str,
pending_pdus: List[Tuple[EventBase, int]],
pending_edus: List[Edu],
):

# Make a transaction-sending opentracing span. This span follows on from
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This schema delta alters the schema to enable 'catching up' remote homeservers
-- after there has been a connectivity problem for any reason.

-- This stores, for each (destination, room) pair, the event_id and stream_ordering
-- of the latest event to be enqueued for transmission to that destination.
CREATE TABLE IF NOT EXISTS destination_rooms (
-- the destination in question
destination TEXT NOT NULL,
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
-- the ID of the room in question
room_id TEXT NOT NULL,
-- the stream_ordering of the event
stream_ordering INTEGER,
-- the event_id of the event
event_id TEXT NOT NULL,
PRIMARY KEY (destination, room_id)
);

-- this column tracks the stream_ordering of the event that was most recently
-- successfully transmitted to the destination.
ALTER TABLE destinations
ADD COLUMN last_successful_stream_ordering INTEGER;
136 changes: 135 additions & 1 deletion synapse/storage/data_stores/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from collections import namedtuple
from typing import List, Optional

from canonicaljson import encode_canonical_json

Expand Down Expand Up @@ -267,3 +267,137 @@ def _cleanup_transactions_txn(txn):
return self.db.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)

def get_last_successful_stream_ordering(self, destination: str):
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""
Gets the stream ordering of the PDU most-recently successfully sent
to the specified destination.

Args:
destination: the destination we have successfully sent to
"""
return self.db.simple_select_one_onecol(
"destinations",
{"destination": destination},
"last_successful_stream_ordering",
allow_none=True,
desc="get_last_successful_stream_ordering",
)

def set_last_successful_stream_ordering(
self, destination: str, last_successful_stream_ordering: int
):
"""
Marks that we have successfully sent the PDUs up to and including the
one specified.

Args:
destination: the destination we have successfully sent to
last_successful_stream_ordering: the stream_ordering of the most
recent successfully-sent PDU
"""
return self.db.simple_upsert(
"destinations",
keyvalues={"destination": destination},
values={"last_successful_stream_ordering": last_successful_stream_ordering},
desc="set_last_successful_stream_ordering",
)

def get_catch_up_room_event_ids(
self,
destination: str,
last_successful_stream_ordering: int,
max_stream_order: int,
):
"""
Returns 50 event IDs and their corresponding stream_orderings that
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
correspond to the least recent events that have not yet been sent to
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
a destination.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

Args:
destination: the destination in question
last_successful_stream_ordering: the stream_ordering of the
most-recently successfully-transmitted event to the destination
max_stream_order: an upper bound, inclusive, of the stream ordering
to return events for.

Returns:
event_ids
"""
return self.db.runInteraction(
"get_catch_up_room_event_ids",
self._get_catch_up_room_event_ids_txn,
destination,
last_successful_stream_ordering,
max_stream_order,
)

@staticmethod
def _get_catch_up_room_event_ids_txn(
txn,
destination: str,
last_successful_stream_ordering: int,
max_stream_order: int,
) -> List[str]:
q = """
SELECT event_id FROM destination_rooms
WHERE destination = ?
AND stream_ordering > ? AND stream_ordering <= ?
ORDER BY stream_ordering
LIMIT 50
"""
txn.execute(
q, (destination, last_successful_stream_ordering, max_stream_order),
)
event_ids = [row[0] for row in txn]
return event_ids

def get_largest_destination_rooms_stream_order(self, destination: str):
"""
Returns the largest stream_ordering from the destination_rooms table
that corresponds to this destination.
"""
return self.db.runInteraction(
"get_largest_destination_rooms_stream_order",
self._get_largest_destination_rooms_stream_order_txn,
destination,
)

@staticmethod
def _get_largest_destination_rooms_stream_order_txn(txn, destination: str) -> Optional[int]:
txn.execute(
"""
SELECT stream_ordering
FROM destination_rooms
WHERE destination = ?
ORDER BY stream_ordering DESC
LIMIT 1
""",
(destination,),
)
rows = [r[0] for r in txn]
if rows:
return rows[0]
else:
return None
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

def store_destination_rooms_entries(
self, destinations: List[str], room_id: str, event_id: str, stream_ordering: int
):
"""
Updates or creates `destination_rooms` entries in batch for a single event.
Args:
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
destinations: list of destinations
room_id: the room_id of the event
event_id: the ID of the event
stream_ordering: the stream_ordering of the event
"""
return self.db.runInteraction(
"store_destination_rooms_entries",
self.db.simple_upsert_many_txn,
"destination_rooms",
["destination", "room_id"],
[(d, room_id) for d in destinations],
["event_id", "stream_ordering"],
[(event_id, stream_ordering)] * len(destinations),
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
)
4 changes: 3 additions & 1 deletion tests/rest/client/v1/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class RestHelper(object):
resource = attr.ib()
auth_user_id = attr.ib()

def create_room_as(self, room_creator=None, is_public=True, tok=None):
def create_room_as(
self, room_creator: str = None, is_public: bool = True, tok: str = None
) -> str:
temp_id = self.auth_user_id
self.auth_user_id = room_creator
path = "/_matrix/client/r0/createRoom"
Expand Down