Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Replace device_27_unique_idx bg update with a fg one #7562

Merged
merged 2 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions UPGRADE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb

Upgrading to v1.13.0
Upgrading to v1.14.0
====================

This version includes a database update which is run as part of the upgrade,
and which may take a couple of minutes in the case of a large server. Synapse
will not respond to HTTP requests while this update is taking place.

Upgrading to v1.13.0
====================

Incorrect database migration in old synapse versions
----------------------------------------------------
Expand Down Expand Up @@ -136,12 +142,12 @@ back to v1.12.4 you need to:
2. Decrease the schema version in the database:

.. code:: sql

UPDATE schema_version SET version = 57;

3. Downgrade Synapse by following the instructions for your installation method
in the "Rolling back to older versions" section above.


Upgrading to v1.12.0
====================
Expand Down
1 change: 1 addition & 0 deletions changelog.d/7562.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of `mark_as_sent_devices_by_remote`.
34 changes: 5 additions & 29 deletions synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@

BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"

BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX = (
"drop_device_lists_outbound_last_success_non_unique_idx"
)


class DeviceWorkerStore(SQLBaseStore):
def get_device(self, user_id, device_id):
Expand Down Expand Up @@ -749,19 +745,13 @@ def __init__(self, database: Database, db_conn, hs):
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes,
)

# create a unique index on device_lists_outbound_last_success
self.db.updates.register_background_index_update(
# a pair of background updates that were added during the 1.14 release cycle,
# but replaced with 58/06dlols_unique_idx.py
self.db.updates.register_noop_background_update(
"device_lists_outbound_last_success_unique_idx",
index_name="device_lists_outbound_last_success_unique_idx",
table="device_lists_outbound_last_success",
columns=["destination", "user_id"],
unique=True,
)

# once that completes, we can remove the old non-unique index.
self.db.updates.register_background_update_handler(
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX,
clokep marked this conversation as resolved.
Show resolved Hide resolved
self._drop_device_lists_outbound_last_success_non_unique_idx,
self.db.updates.register_noop_background_update(
"drop_device_lists_outbound_last_success_non_unique_idx",
)

@defer.inlineCallbacks
Expand Down Expand Up @@ -838,20 +828,6 @@ def _txn(txn):

return rows

async def _drop_device_lists_outbound_last_success_non_unique_idx(
self, progress, batch_size
):
def f(txn):
txn.execute("DROP INDEX IF EXISTS device_lists_outbound_last_success_idx")

await self.db.runInteraction(
"drop_device_lists_outbound_last_success_non_unique_idx", f,
)
await self.db.updates._end_background_update(
BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX
)
return 1


class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs):
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This migration rebuilds the device_lists_outbound_last_success table without duplicate
entries, and with a UNIQUE index.
"""

import logging
from io import StringIO

from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.prepare_database import execute_statements_from_stream
from synapse.storage.types import Cursor

logger = logging.getLogger(__name__)


def run_upgrade(*args, **kwargs):
pass


def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
# some instances might already have this index, in which case we can skip this
if isinstance(database_engine, PostgresEngine):
cur.execute(
"""
SELECT 1 FROM pg_class WHERE relkind = 'i'
AND relname = 'device_lists_outbound_last_success_unique_idx'
"""
)

if cur.rowcount:
logger.info(
"Unique index exists on device_lists_outbound_last_success: "
"skipping rebuild"
)
return

logger.info("Rebuilding device_lists_outbound_last_success with unique index")
execute_statements_from_stream(cur, StringIO(_rebuild_commands))


# there might be duplicates, so the easiest way to achieve this is to create a new
# table with the right data, and renaming it into place

_rebuild_commands = """
DROP TABLE IF EXISTS device_lists_outbound_last_success_new;

CREATE TABLE device_lists_outbound_last_success_new (
destination TEXT NOT NULL,
user_id TEXT NOT NULL,
stream_id BIGINT NOT NULL
);

-- this took about 30 seconds on matrix.org's 16 million rows.
INSERT INTO device_lists_outbound_last_success_new
SELECT destination, user_id, MAX(stream_id) FROM device_lists_outbound_last_success
GROUP BY destination, user_id;

-- and this another 30 seconds.
CREATE UNIQUE INDEX device_lists_outbound_last_success_unique_idx
ON device_lists_outbound_last_success_new (destination, user_id);

DROP TABLE device_lists_outbound_last_success;

ALTER TABLE device_lists_outbound_last_success_new
RENAME TO device_lists_outbound_last_success;
"""
1 change: 0 additions & 1 deletion synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
"device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
"device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
"event_search": "event_search_event_id_idx",
"device_lists_outbound_last_success": "device_lists_outbound_last_success_unique_idx",
}


Expand Down
13 changes: 9 additions & 4 deletions synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import os
import re
from collections import Counter
from typing import TextIO

import attr

from synapse.storage.engines.postgres import PostgresEngine
from synapse.storage.types import Cursor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -479,8 +481,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
)

logger.info("applying schema %s for %s", name, modname)
for statement in get_statements(stream):
cur.execute(statement)
execute_statements_from_stream(cur, stream)

# Mark as done.
cur.execute(
Expand Down Expand Up @@ -538,8 +539,12 @@ def get_statements(f):

def executescript(txn, schema_path):
with open(schema_path, "r") as f:
for statement in get_statements(f):
txn.execute(statement)
execute_statements_from_stream(txn, f)


def execute_statements_from_stream(cur: Cursor, f: TextIO):
for statement in get_statements(f):
cur.execute(statement)


def _get_or_create_schema_state(txn, database_engine):
Expand Down