Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Pass in Database object to data stores. #6487

Merged
merged 8 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/postgres-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Configuration file used for testing the 'synapse_port_db' script.
# Tells the script to connect to the postgresql database that will be available in the
# CI's Docker setup at the point where this file is considered.
server_name: "test"
server_name: "localhost:8800"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a thing that should be here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. The users in the test db are from localhost:8800 and we now run the DB checks during the port DB run as they've been moved from synapse/app to the data stores.


signing_key_path: "/src/.buildkite/test.signing.key"

Expand Down
2 changes: 1 addition & 1 deletion .buildkite/sqlite-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Configuration file used for testing the 'synapse_port_db' script.
# Tells the 'update_database' script to connect to the test SQLite database to upgrade its
# schema and run background updates on it.
server_name: "test"
server_name: "localhost:8800"

signing_key_path: "/src/.buildkite/test.signing.key"

Expand Down
1 change: 1 addition & 0 deletions changelog.d/6487.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pass in `Database` object to data stores.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't a great changelog entry, but we probably want to change all the datastore refactoring PRs to group them together, which is most easily done at release time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, though I find it difficult to write sensible change logs for internal refactors tbh

36 changes: 2 additions & 34 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ from synapse.storage.data_stores.main.stats import StatsStore
from synapse.storage.data_stores.main.user_directory import (
UserDirectoryBackgroundUpdateStore,
)
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.util import Clock
Expand Down Expand Up @@ -139,39 +140,6 @@ class Store(
UserDirectoryBackgroundUpdateStore,
StatsStore,
):
def __init__(self, db_conn, hs):
super().__init__(db_conn, hs)
self.db_pool = hs.get_db_pool()

@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there must have been a reason this was here: any idea what it was, and why it no longer applies?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't inherit from SQLBaseStore previously, instead we just cherry-picked stuff out by the looks of things

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClientIpBackgroundUpdateStore inherits from SQLBaseStore ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, but we didn't inherit from that when the script was first written and runInteraction was first added. We later started inheriting from those stores without removing the runInteraction copy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm right.

def r(conn):
try:
i = 0
N = 5
while True:
try:
txn = conn.cursor()
return func(
LoggingTransaction(txn, desc, self.database_engine, [], []),
*args,
**kwargs
)
except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e):
logger.warning("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
if i < N:
i += 1
conn.rollback()
continue
raise
except Exception as e:
logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise

with PreserveLoggingContext():
return (yield self.db_pool.runWithConnection(r))

def execute(self, f, *args, **kwargs):
return self.db.runInteraction(f.__name__, f, *args, **kwargs)

Expand Down Expand Up @@ -512,7 +480,7 @@ class Porter(object):

hs = MockHomeserver(self.hs_config, engine, conn, db_pool)

store = Store(conn, hs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if we could use a DataStores here for consistency with the app code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, though we only want to include the background update stores rather than building out full data stores. (Currently you could use main_store_class to do this, but when we have future data stores that won't necessarily be true)

store = Store(Database(hs), conn, hs)

yield store.db.runInteraction(
"%s_engine.check_database" % config["name"], engine.check_database,
Expand Down
5 changes: 3 additions & 2 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
Expand All @@ -59,8 +60,8 @@ class FederationSenderSlaveStore(
SlavedDeviceStore,
SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(database, db_conn, hs)

# We pull out the current federation stream position now so that we
# always have a known value for the federation position in memory so
Expand Down
35 changes: 6 additions & 29 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
from synapse.storage import DataStore, are_all_users_on_domain
from synapse.storage import DataStore
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
Expand Down Expand Up @@ -294,22 +294,6 @@ def start_listening(self, listeners):
else:
logger.warning("Unrecognized listener type: %s", listener["type"])

def run_startup_checks(self, db_conn, database_engine):
all_users_native = are_all_users_on_domain(
db_conn.cursor(), database_engine, self.hostname
)
if not all_users_native:
quit_with_error(
"Found users in database not native to %s!\n"
"You cannot changed a synapse server_name after it's been configured"
% (self.hostname,)
)

try:
database_engine.check_database(db_conn.cursor())
except IncorrectDatabaseSetup as e:
quit_with_error(str(e))


# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
Expand Down Expand Up @@ -357,16 +341,12 @@ def setup(config_options):

synapse.config.logger.setup_logging(hs, config, use_worker_options=False)

logger.info("Preparing database: %s...", config.database_config["name"])
logger.info("Setting up server")

try:
with hs.get_db_conn(run_new_connection=False) as db_conn:
prepare_database(db_conn, database_engine, config=config)
database_engine.on_new_connection(db_conn)

hs.run_startup_checks(db_conn, database_engine)

db_conn.commit()
hs.setup()
except IncorrectDatabaseSetup as e:
quit_with_error(str(e))
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
Expand All @@ -375,9 +355,6 @@ def setup(config_options):
)
sys.exit(1)

logger.info("Database prepared in %s.", config.database_config["name"])

hs.setup()
hs.setup_master()

@defer.inlineCallbacks
Expand Down
5 changes: 3 additions & 2 deletions synapse/app/user_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
Expand All @@ -60,8 +61,8 @@ class UserDirectorySlaveStore(
UserDirectoryStore,
BaseSlavedStore,
):
def __init__(self, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(database, db_conn, hs)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine

from ._slaved_id_tracker import SlavedIdTracker
Expand All @@ -35,8 +36,8 @@ def __func__(inp):


class BaseSlavedStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(BaseSlavedStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(BaseSlavedStore, self).__init__(database, db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = SlavedIdTracker(
db_conn, "cache_invalidation_stream", "stream_id"
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage.data_stores.main.account_data import AccountDataWorkerStore
from synapse.storage.data_stores.main.tags import TagsWorkerStore
from synapse.storage.database import Database


class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
def __init__(self, database: Database, db_conn, hs):
self._account_data_id_gen = SlavedIdTracker(
db_conn, "account_data_max_stream_id", "stream_id"
)

super(SlavedAccountDataStore, self).__init__(db_conn, hs)
super(SlavedAccountDataStore, self).__init__(database, db_conn, hs)

def get_max_account_data_stream_id(self):
return self._account_data_id_gen.get_current_token()
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
# limitations under the License.

from synapse.storage.data_stores.main.client_ips import LAST_SEEN_GRANULARITY
from synapse.storage.database import Database
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import Cache

from ._base import BaseSlavedStore


class SlavedClientIpStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedClientIpStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedClientIpStore, self).__init__(database, db_conn, hs)

self.client_ip_last_seen = Cache(
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage.data_stores.main.deviceinbox import DeviceInboxWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache


class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceInboxStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedDeviceInboxStore, self).__init__(database, db_conn, hs)
self._device_inbox_id_gen = SlavedIdTracker(
db_conn, "device_max_stream_id", "stream_id"
)
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
from synapse.storage.data_stores.main.devices import DeviceWorkerStore
from synapse.storage.data_stores.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache


class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedDeviceStore, self).__init__(database, db_conn, hs)

self.hs = hs

Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
from synapse.storage.data_stores.main.stream import StreamWorkerStore
from synapse.storage.data_stores.main.user_erasure_store import UserErasureWorkerStore
from synapse.storage.database import Database

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
Expand Down Expand Up @@ -59,13 +60,13 @@ class SlavedEventStore(
RelationsWorkerStore,
BaseSlavedStore,
):
def __init__(self, db_conn, hs):
def __init__(self, database: Database, db_conn, hs):
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)

super(SlavedEventStore, self).__init__(db_conn, hs)
super(SlavedEventStore, self).__init__(database, db_conn, hs)

# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
# limitations under the License.

from synapse.storage.data_stores.main.filtering import FilteringStore
from synapse.storage.database import Database

from ._base import BaseSlavedStore


class SlavedFilteringStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedFilteringStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedFilteringStore, self).__init__(database, db_conn, hs)

# Filters are immutable so this cache doesn't need to be expired
get_user_filter = FilteringStore.__dict__["get_user_filter"]
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
# limitations under the License.

from synapse.storage import DataStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache

from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker


class SlavedGroupServerStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedGroupServerStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedGroupServerStore, self).__init__(database, db_conn, hs)

self.hs = hs

Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@

from synapse.storage import DataStore
from synapse.storage.data_stores.main.presence import PresenceStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache

from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker


class SlavedPresenceStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedPresenceStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedPresenceStore, self).__init__(database, db_conn, hs)
self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id")

self._presence_on_startup = self._get_active_presence(db_conn)
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
# limitations under the License.

from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
from synapse.storage.database import Database

from ._slaved_id_tracker import SlavedIdTracker
from .events import SlavedEventStore


class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def __init__(self, db_conn, hs):
def __init__(self, database: Database, db_conn, hs):
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id"
)
super(SlavedPushRuleStore, self).__init__(db_conn, hs)
super(SlavedPushRuleStore, self).__init__(database, db_conn, hs)

def get_push_rules_stream_token(self):
return (
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
# limitations under the License.

from synapse.storage.data_stores.main.pusher import PusherWorkerStore
from synapse.storage.database import Database

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker


class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedPusherStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedPusherStore, self).__init__(database, db_conn, hs)
self._pushers_id_gen = SlavedIdTracker(
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
)
Expand Down
Loading