From 03ce665c28afa2f147380066f05f4c4e368f1b17 Mon Sep 17 00:00:00 2001 From: qstokkink Date: Mon, 2 Dec 2024 13:49:43 +0100 Subject: [PATCH] Fixed various upgrade issues --- src/tribler/core/knowledge/community.py | 2 +- src/tribler/core/versioning/manager.py | 5 +- .../ui/src/pages/Settings/Versions.tsx | 6 + src/tribler/upgrade_script.py | 348 ++++++++++-------- 4 files changed, 201 insertions(+), 160 deletions(-) diff --git a/src/tribler/core/knowledge/community.py b/src/tribler/core/knowledge/community.py index 9a636a9eb7..48da51fb1c 100644 --- a/src/tribler/core/knowledge/community.py +++ b/src/tribler/core/knowledge/community.py @@ -85,7 +85,7 @@ def on_message(self, peer: Peer, raw: RawStatementOperationMessage) -> None: operation=operation) self.validate_operation(operation) - with db_session(): + with db_session(serializable=True): is_added = self.db.knowledge.add_operation(operation, signature.signature) if is_added: s = f"+ operation added ({operation.object!r} \"{operation.predicate}\" {operation.subject!r})" diff --git a/src/tribler/core/versioning/manager.py b/src/tribler/core/versioning/manager.py index 8791a9e032..0767b68b6d 100644 --- a/src/tribler/core/versioning/manager.py +++ b/src/tribler/core/versioning/manager.py @@ -47,7 +47,7 @@ def get_versions(self) -> list[str]: Get all versions in our state directory. """ return [p for p in os.listdir(self.config.get("state_dir")) - if os.path.isdir(os.path.join(self.config.get("state_dir"), p))] + if os.path.isdir(os.path.join(self.config.get("state_dir"), p)) and p != "dlcheckpoints"] async def check_version(self) -> str | None: """ @@ -103,6 +103,9 @@ def perform_upgrade(self) -> None: """ Upgrade old database/download files to our current version. """ + if self.task_manager.get_task("Upgrade") is not None: + logger.warning("Ignoring upgrade request: already upgrading.") + return src_dir = Path(self.config.get("state_dir")) / FROM dst_dir = Path(self.config.get_version_state_dir()) self.task_manager.register_executor_task("Upgrade", upgrade, self.config, diff --git a/src/tribler/ui/src/pages/Settings/Versions.tsx b/src/tribler/ui/src/pages/Settings/Versions.tsx index f58a2bd411..dde6792d22 100644 --- a/src/tribler/ui/src/pages/Settings/Versions.tsx +++ b/src/tribler/ui/src/pages/Settings/Versions.tsx @@ -66,6 +66,12 @@ export default function Versions() { } } case 2: { + const isUpgrading = await triblerService.isUpgrading(); + if (!(isUpgrading === undefined) && !isErrorDict(isUpgrading)) { + setIsUpgrading(isUpgrading); + } else { + break; // Don't bother the user on error, just initialize later. + } const canUpgrade = await triblerService.canUpgrade(); if (!(canUpgrade === undefined) && !isErrorDict(canUpgrade)) { setCanUpgrade(canUpgrade); diff --git a/src/tribler/upgrade_script.py b/src/tribler/upgrade_script.py index 8ba3008a76..ccef821d38 100644 --- a/src/tribler/upgrade_script.py +++ b/src/tribler/upgrade_script.py @@ -13,19 +13,30 @@ import os import shutil import sqlite3 +from itertools import islice from pathlib import Path from typing import TYPE_CHECKING from configobj import ConfigObj -from pony.orm import db_session +from pony.orm import Database, db_session if TYPE_CHECKING: + from collections.abc import Generator + from tribler.tribler_config import TriblerConfigManager FROM: str = "7.14" TO: str = "8.0" -# ruff: noqa: N802,RUF015,W291 +# ruff: noqa: B007,F841,N802,RUF015,W291 + + +def batched(results: list, n: int = 1) -> Generator[list]: + """ + Backport for ``itertools.batched()``. + """ + for start in islice(range(len(results)), None, None, 2): + yield results[start : (start+n)] def _copy_if_not_exist(src: str, dst: str) -> None: @@ -108,79 +119,88 @@ def _inject_StatementOp(abs_src_db: str, abs_dst_db: str) -> None: ;""")) src_con.close() - dst_con = sqlite3.connect(abs_dst_db) - with db_session: - for (subject_name, subject_type, - object_name, object_type, - stmt_added_count, stmt_removed_count, stmt_local_operation, - peer_public_key, peer_added_at, - stmtop_operation, stmtop_clock, stmtop_signature, stmtop_updated_at, stmtop_auto_generated) in output: - dst_con.execute("BEGIN") - try: - # Insert subject - results = list(dst_con.execute("SELECT id FROM Resource WHERE name=? AND type=?", - (subject_name, subject_type))) - if not results: - cursor = dst_con.execute("INSERT INTO Resource " - "VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), ?, ?)", - (subject_name, subject_type)) - results = [(cursor.lastrowid, )] - subject_id, = results[0] - - # Insert object - results = list( - dst_con.execute("SELECT id FROM Resource WHERE name=? AND type=?", (object_name, object_type))) - if not results: - cursor = dst_con.execute( - "INSERT INTO Resource VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), ?, ?)", - (object_name, object_type) - ) - results = [(cursor.lastrowid, )] - object_id, = results[0] - - # Insert statement - results = list(dst_con.execute("SELECT id FROM Statement WHERE object=? AND subject=?", - (object_id, subject_id))) - if not results: - cursor = dst_con.execute( - "INSERT INTO Statement VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Statement), ?, ?, ?, ?, ?)", - (subject_id, object_id, stmt_added_count, stmt_removed_count, stmt_local_operation) - ) - results = [(cursor.lastrowid, )] - statement_id, = results[0] - - # Insert peer - results = list( - dst_con.execute("SELECT id, added_at FROM Peer WHERE public_key=?", (peer_public_key, ))) - if results and results[0][1] >= peer_added_at: - dst_con.execute("UPDATE Peer SET added_at=? WHERE public_key=?", (peer_added_at, peer_public_key)) - results = [(results[0][0],)] - elif not results: - cursor = dst_con.execute( - "INSERT INTO Peer VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Peer), ?, ?)", - (peer_public_key, peer_added_at) - ) - results = [(cursor.lastrowid, )] - else: - results = [(results[0][0], )] - peer_id, = results[0] - - # Insert statement op - results = list(dst_con.execute("SELECT id FROM StatementOp WHERE statement=? AND peer=?", - (statement_id, peer_id))) - if not results: - dst_con.execute( - "INSERT INTO StatementOp VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM StatementOp), " - "?, ?, ?, ?, ?, ?, ?)", - (statement_id, peer_id, stmtop_operation, stmtop_clock, stmtop_signature, stmtop_updated_at, - stmtop_auto_generated)) - - dst_con.execute("COMMIT") - except sqlite3.DatabaseError as e: - dst_con.execute("ROLLBACK") - logging.exception(e) - dst_con.commit() - dst_con.close() + db = Database() + db.bind(provider="sqlite", filename=abs_dst_db) + for batch in batched(output, n=20): + with db_session: + for (subject_name, subject_type, + object_name, object_type, + stmt_added_count, stmt_removed_count, stmt_local_operation, + peer_public_key, peer_added_at, + stmtop_operation, stmtop_clock, stmtop_signature, stmtop_updated_at, stmtop_auto_generated) in batch: + try: + # Insert subject + results = list(db.execute("SELECT id FROM Resource WHERE name=$subject_name AND type=$subject_type", + globals(), locals())) + if not results: + cursor = db.execute("INSERT INTO Resource " + "VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), $subject_name, " + "$subject_type)", + globals(), locals()) + results = [(cursor.lastrowid, )] + subject_id, = results[0] + + # Insert object + results = list( + db.execute("SELECT id FROM Resource WHERE name=$object_name AND type=$object_type", + globals(), locals())) + if not results: + cursor = db.execute( + "INSERT INTO Resource VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), $object_name, " + "$object_type)", + globals(), locals() + ) + results = [(cursor.lastrowid, )] + object_id, = results[0] + + # Insert statement + results = list(db.execute( + "SELECT id FROM Statement WHERE object=$object_id AND subject=$subject_id", + globals(), locals())) + if not results: + cursor = db.execute( + "INSERT INTO Statement VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Statement), $subject_id, " + "$object_id, $stmt_added_count, $stmt_removed_count, $stmt_local_operation)", + globals(), locals() + ) + results = [(cursor.lastrowid, )] + statement_id, = results[0] + + # Insert peer + results = list( + db.execute("SELECT id, added_at FROM Peer WHERE public_key=$peer_public_key", + globals(), locals())) + if results and results[0][1] >= peer_added_at: + db.execute("UPDATE Peer SET added_at=$peer_added_at WHERE public_key=$peer_public_key", + globals(), locals()) + results = [(results[0][0],)] + elif not results: + cursor = db.execute( + "INSERT INTO Peer VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Peer), $peer_public_key, " + "$peer_added_at)", + globals(), locals() + ) + results = [(cursor.lastrowid, )] + else: + results = [(results[0][0], )] + peer_id, = results[0] + + # Insert statement op + results = list(db.execute("SELECT id FROM StatementOp WHERE statement=$statement_id AND " + "peer=$peer_id", + globals(), locals())) + if not results: + db.execute( + "INSERT INTO StatementOp VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM StatementOp), " + "$statement_id, $peer_id, $stmtop_operation, $stmtop_clock, $stmtop_signature, " + "$stmtop_updated_at, $stmtop_auto_generated)", + globals(), locals()) + + db.commit() + except sqlite3.DatabaseError as e: + db.rollback() + logging.exception(e) + db.disconnect() def _inject_ChannelNode(abs_src_db: str, abs_dst_db: str) -> None: @@ -198,40 +218,41 @@ def _inject_ChannelNode(abs_src_db: str, abs_dst_db: str) -> None: ;""")) src_con.close() - dst_con = sqlite3.connect(abs_dst_db) - with db_session: - for (infohash, size, torrent_date, tracker_info, title, tags, metadata_type, reserved_flags, - origin_id, public_key, id_, timestamp, signature, added_on, status, xxx, tag_processor_version, - seeders, leechers, last_check, self_checked, has_data) in output: - dst_con.execute("BEGIN") - try: - # Insert subject - results = list(dst_con.execute("SELECT rowid FROM TorrentState WHERE infohash=?", (infohash, ))) - if not results: - cursor = dst_con.execute("INSERT INTO TorrentState " - "VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TorrentState), " - "?, ?, ?, ?, ?, ?)", - (infohash, seeders, leechers, last_check, self_checked, has_data)) - results = [(cursor.lastrowid, )] - health_id, = results[0] - - # Insert channel ChannelNode - results = list(dst_con.execute("SELECT rowid FROM ChannelNode WHERE public_key=? AND id_=?", - (public_key, id_))) - if not results: - dst_con.execute( - "INSERT INTO ChannelNode VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM ChannelNode), " - "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - (infohash, size, torrent_date, tracker_info, title, tags, metadata_type, reserved_flags, - origin_id, public_key, id_, timestamp, signature, added_on, status, xxx, health_id, - tag_processor_version)) - - dst_con.execute("COMMIT") - except sqlite3.DatabaseError as e: - dst_con.execute("ROLLBACK") - logging.exception(e) - dst_con.commit() - dst_con.close() + db = Database() + db.bind(provider="sqlite", filename=abs_dst_db) + for batch in batched(output, n=20): + with db_session: + for (infohash, size, torrent_date, tracker_info, title, tags, metadata_type, reserved_flags, + origin_id, public_key, id_, timestamp, signature, added_on, status, xxx, tag_processor_version, + seeders, leechers, last_check, self_checked, has_data) in batch: + try: + # Insert subject + results = list(db.execute("SELECT rowid FROM TorrentState WHERE infohash=$infohash", + globals(), locals())) + if not results: + cursor = db.execute("INSERT INTO TorrentState " + "VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TorrentState), " + "$infohash, $seeders, $leechers, $last_check, $self_checked, $has_data)", + globals(), locals()) + results = [(cursor.lastrowid, )] + health_id, = results[0] + + # Insert channel ChannelNode + results = list(db.execute("SELECT rowid FROM ChannelNode WHERE public_key=$public_key AND id_=$id_", + globals(), locals())) + if not results: + db.execute( + "INSERT INTO ChannelNode VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM ChannelNode), " + "$infohash, $size, $torrent_date, $tracker_info, $title, $tags, $metadata_type, " + "$reserved_flags, $origin_id, $public_key, $id_, $timestamp, $signature, $added_on, " + "$status, $xxx, $health_id, $tag_processor_version)", + globals(), locals()) + + db.commit() + except sqlite3.DatabaseError as e: + db.rollback() + logging.exception(e) + db.disconnect() def _inject_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: @@ -242,32 +263,35 @@ def _inject_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: output = list(src_con.execute("SELECT url, last_check, alive, failures FROM TrackerState;")) src_con.close() - dst_con = sqlite3.connect(abs_dst_db) - with db_session: - for (url, last_check, alive, failures) in output: - dst_con.execute("BEGIN") - try: - results = list(dst_con.execute("SELECT rowid, last_check, alive, failures FROM TrackerState WHERE url=?", (url, ))) - if results: - tracker_id, n_last_check, n_alive, n_failures = results[0] - s_last_check = max(n_last_check, last_check) - s_alive = alive if last_check > n_last_check else n_alive - s_failures = int(failures) + int(n_failures) - dst_con.execute( - "UPDATE TrackerState SET last_check=?, alive=?, failures=? WHERE rowid=?", - (s_last_check, s_alive, s_failures, tracker_id)) - else: - dst_con.execute( - "INSERT INTO TrackerState VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TrackerState), " - "?, ?, ?, ?)", - (url, last_check, alive, failures)) - - dst_con.execute("COMMIT") - except sqlite3.DatabaseError as e: - dst_con.execute("ROLLBACK") - logging.exception(e) - dst_con.commit() - dst_con.close() + db = Database() + db.bind(provider="sqlite", filename=abs_dst_db) + for batch in batched(output, n=20): + with db_session: + for (url, last_check, alive, failures) in batch: + try: + results = list(db.execute("SELECT rowid, last_check, alive, failures FROM TrackerState WHERE " + "url=$url", + globals(), locals())) + if results: + tracker_id, n_last_check, n_alive, n_failures = results[0] + s_last_check = max(n_last_check, last_check) + s_alive = alive if last_check > n_last_check else n_alive + s_failures = int(failures) + int(n_failures) + db.execute( + "UPDATE TrackerState SET last_check=$s_last_check, alive=$s_alive, failures=$s_failures " + "WHERE rowid=$tracker_id", + globals(), locals()) + else: + db.execute( + "INSERT INTO TrackerState VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TrackerState), " + "$url, $last_check, $alive, $failures)", + globals(), locals()) + + db.commit() + except sqlite3.DatabaseError as e: + db.rollback() + logging.exception(e) + db.disconnect() def _inject_TorrentState_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: @@ -282,32 +306,40 @@ def _inject_TorrentState_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: ;""")) src_con.close() - dst_con = sqlite3.connect(abs_dst_db) - with db_session: - for (infohash, url) in output: - dst_con.execute("BEGIN") - try: - results = list(dst_con.execute("""SELECT TorrentState.infohash, TrackerState.url + db = Database() + db.bind(provider="sqlite", filename=abs_dst_db) + for batch in batched(output, n=20): + with db_session: + for (infohash, url) in batch: + try: + results = list(db.execute("""SELECT TorrentState.infohash, TrackerState.url FROM TorrentState_TrackerState INNER JOIN TorrentState ON TorrentState_TrackerState.torrentstate=TorrentState.rowid INNER JOIN TrackerState ON TorrentState_TrackerState.trackerstate=TrackerState.rowid -WHERE TorrentState.infohash=? AND TrackerState.url=? -;""", (infohash, url))) - if not results: - # Note: both the tracker and torrent state should've been imported already - torrent_state, = list(dst_con.execute("SELECT rowid FROM TorrentState WHERE infohash=?", - (infohash,)))[0] - tracker_state, = list(dst_con.execute("SELECT rowid FROM TrackerState WHERE url=?", - (url,)))[0] - dst_con.execute("INSERT INTO TorrentState_TrackerState VALUES (?, ?)", - (torrent_state, tracker_state)) - - dst_con.execute("COMMIT") - except sqlite3.DatabaseError as e: - dst_con.execute("ROLLBACK") - logging.exception(e) - dst_con.commit() - dst_con.close() +WHERE TorrentState.infohash=$infohash AND TrackerState.url=$url +;""", globals(), locals())) + if not results: + # Note: both the tracker and torrent state should've been imported already + torrent_states = list(db.execute("SELECT rowid FROM TorrentState WHERE infohash=$infohash", + globals(), locals())) + tracker_states = list(db.execute("SELECT rowid FROM TrackerState WHERE url=$url", + globals(), locals())) + if not torrent_states: + logging.warning("Torrent state for %s disappeared mid-upgrade!", infohash) + continue + if not tracker_states: + logging.warning("Tracker state for %s disappeared mid-upgrade!", url) + continue + torrent_state, = torrent_states[0] + tracker_state, = tracker_states[0] + db.execute("INSERT INTO TorrentState_TrackerState VALUES ($torrent_state, $tracker_state)", + globals(), locals()) + + db.commit() + except sqlite3.DatabaseError as e: + db.rollback() + logging.exception(e) + db.disconnect() def _inject_7_14_tables(src_db: str, dst_db: str, db_format: str) -> None: