Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SqliteDosStorage: Implement the storage backup #6370

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/aiida/orm/implementation/storage_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,18 @@ def backup(
:raises StorageBackupError: If an error occurred during the backup procedure.
:raises NotImplementedError: If the storage backend doesn't implement a backup procedure.
"""
from aiida.common.exceptions import LockedProfileError, StorageBackupError
from aiida.manage.configuration.settings import DEFAULT_CONFIG_FILE_NAME
from aiida.manage.profile_access import ProfileAccessManager
from aiida.storage.log import STORAGE_LOGGER

# check that the AiiDA profile is not locked and request access for the duration of this backup process
# (locked means that possibly a maintenance operation is running that could interfere with the backup)
try:
ProfileAccessManager(self._profile).request_access()
sphuber marked this conversation as resolved.
Show resolved Hide resolved
except LockedProfileError as exc:
raise StorageBackupError(f'{self._profile} is locked!') from exc

backup_manager = self._validate_or_init_backup_folder(dest, keep)

try:
Expand Down
9 changes: 0 additions & 9 deletions src/aiida/storage/psql_dos/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,6 @@ def _backup_storage(
import subprocess
import tempfile

from aiida.manage.profile_access import ProfileAccessManager

STORAGE_LOGGER.report('Starting backup...')

# This command calls `rsync` and `pg_dump` executables. check that they are in PATH
Expand All @@ -518,13 +516,6 @@ def _backup_storage(
cfg = self._profile.storage_config
container = Container(get_filepath_container(self.profile))

# check that the AiiDA profile is not locked and request access for the duration of this backup process
# (locked means that possibly a maintenance operation is running that could interfere with the backup)
try:
ProfileAccessManager(self._profile).request_access()
except exceptions.LockedProfileError as exc:
raise exceptions.StorageBackupError('The profile is locked!') from exc

# step 1: first run the storage maintenance version that can safely be performed while aiida is running
STORAGE_LOGGER.report('Running basic maintenance...')
self.maintain(full=False, compress=False)
Expand Down
85 changes: 67 additions & 18 deletions src/aiida/storage/sqlite_dos/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
from typing import TYPE_CHECKING, Optional
from uuid import uuid4

from disk_objectstore import Container
from disk_objectstore import Container, backup_utils
from pydantic import BaseModel, Field, field_validator
from sqlalchemy import insert
from sqlalchemy.orm import scoped_session, sessionmaker

from aiida.common import exceptions
from aiida.common.log import AIIDA_LOGGER
from aiida.manage import Profile
from aiida.manage.configuration.settings import AIIDA_CONFIG_FOLDER
Expand All @@ -40,6 +41,8 @@
__all__ = ('SqliteDosStorage',)

LOGGER = AIIDA_LOGGER.getChild(__file__)
FILENAME_DATABASE = 'database.sqlite'
FILENAME_CONTAINER = 'container'


class SqliteDosMigrator(PsqlDosMigrator):
Expand All @@ -52,7 +55,7 @@ class SqliteDosMigrator(PsqlDosMigrator):
"""

def __init__(self, profile: Profile) -> None:
filepath_database = Path(profile.storage_config['filepath']) / 'database.sqlite'
filepath_database = Path(profile.storage_config['filepath']) / FILENAME_DATABASE
filepath_database.touch()

self.profile = profile
Expand All @@ -64,7 +67,7 @@ def get_container(self) -> Container:

:returns: The disk-object store container configured for the repository path of the current profile.
"""
filepath_container = Path(self.profile.storage_config['filepath']) / 'container'
filepath_container = Path(self.profile.storage_config['filepath']) / FILENAME_CONTAINER
return Container(str(filepath_container))

def initialise_database(self) -> None:
Expand Down Expand Up @@ -112,6 +115,18 @@ def filepath_is_absolute(cls, value: str) -> str:
"""Return the resolved and absolute filepath."""
return str(Path(value).resolve().absolute())

@property
def filepath_root(self) -> Path:
return Path(self.profile.storage_config['filepath'])

@property
def filepath_container(self) -> Path:
return self.filepath_root / FILENAME_CONTAINER

@property
def filepath_database(self) -> Path:
return self.filepath_root / FILENAME_DATABASE

@classmethod
def initialise(cls, profile: Profile, reset: bool = False) -> bool:
filepath = Path(profile.storage_config['filepath'])
Expand All @@ -132,7 +147,7 @@ def initialise(cls, profile: Profile, reset: bool = False) -> bool:

def __str__(self) -> str:
state = 'closed' if self.is_closed else 'open'
return f'SqliteDosStorage[{self._profile.storage_config["filepath"]}]: {state},'
return f'SqliteDosStorage[{self.filepath_root}]: {state},'

def _initialise_session(self):
"""Initialise the SQLAlchemy session factory.
Expand All @@ -144,28 +159,22 @@ def _initialise_session(self):
Multi-thread support is currently required by the REST API.
Although, in the future, we may want to move the multi-thread handling to higher in the AiiDA stack.
"""
engine = create_sqla_engine(Path(self._profile.storage_config['filepath']) / 'database.sqlite')
engine = create_sqla_engine(self.filepath_database)
self._session_factory = scoped_session(sessionmaker(bind=engine, future=True, expire_on_commit=True))

def _backup(
self,
dest: str,
keep: Optional[int] = None,
):
raise NotImplementedError

def delete(self) -> None: # type: ignore[override]
"""Delete the storage and all the data."""
filepath = Path(self.profile.storage_config['filepath'])
if filepath.exists():
rmtree(filepath)
LOGGER.report(f'Deleted storage directory at `{filepath}`.')
if self.filepath_root.exists():
rmtree(self.filepath_root)
LOGGER.report(f'Deleted storage directory at `{self.filepath_root}`.')

def get_container(self) -> 'Container':
return Container(str(self.filepath_container))

def get_repository(self) -> 'DiskObjectStoreRepositoryBackend':
from aiida.repository.backend import DiskObjectStoreRepositoryBackend

container = Container(str(Path(self.profile.storage_config['filepath']) / 'container'))
return DiskObjectStoreRepositoryBackend(container=container)
return DiskObjectStoreRepositoryBackend(container=self.get_container())

@classmethod
def version_head(cls) -> str:
Expand Down Expand Up @@ -225,3 +234,43 @@ def _get_mapper_from_entity(entity_type: 'EntityTypes', with_pk: bool):
mapper = inspect(model).mapper # type: ignore[union-attr]
keys = {key for key, col in mapper.c.items() if with_pk or col not in mapper.primary_key}
return mapper, keys

def _backup(
self,
dest: str,
keep: Optional[int] = None,
):
"""Create a backup of the storage.

:param dest: Path to where the backup will be created. Can be a path on the local file system, or a path on a
remote that can be accessed over SSH in the form ``<user>@<host>:<path>``.
:param keep: The maximum number of backups to keep. If the number of copies exceeds this number, the oldest
backups are removed.
"""
try:
backup_manager = backup_utils.BackupManager(dest, keep=keep)
backup_manager.backup_auto_folders(lambda path, prev: self._backup_storage(backup_manager, path, prev))
sphuber marked this conversation as resolved.
Show resolved Hide resolved
except backup_utils.BackupError as exc:
raise exceptions.StorageBackupError(*exc.args) from exc

def _backup_storage(
sphuber marked this conversation as resolved.
Show resolved Hide resolved
self,
manager: backup_utils.BackupManager,
path: Path,
prev_backup: Path | None = None,
) -> None:
"""Create a backup of the sqlite database and disk-objectstore to the provided path.

:param manager: BackupManager from backup_utils containing utilities such as for calling the rsync.
:param path: Path to where the backup will be created.
:param prev_backup: Path to the previous backup. Rsync calls will be hard-linked to this path, making the backup
incremental and efficient.
"""
LOGGER.report('Running storage maintenance')
self.maintain(full=False, compress=False)

LOGGER.report('Backing up disk-objectstore container')
manager.call_rsync(self.filepath_container, path, link_dest=prev_backup, dest_trailing_slash=True)

LOGGER.report('Backing up sqlite database')
manager.call_rsync(self.filepath_database, path, link_dest=prev_backup, dest_trailing_slash=True)
5 changes: 3 additions & 2 deletions src/aiida/tools/pytest_fixtures/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def factory(
from aiida.manage.manager import get_manager

manager = get_manager()
storage_config = storage_config or {'filepath': str(pathlib.Path(config.dirpath) / 'storage')}
name = name or secrets.token_hex(16)
storage_config = storage_config or {'filepath': str(pathlib.Path(config.dirpath) / name / 'storage')}

if broker_backend and broker_config is None:
broker_config = {
Expand All @@ -133,7 +134,7 @@ def factory(
storage_config=storage_config,
broker_backend=broker_backend,
broker_config=broker_config,
name=name or secrets.token_hex(16),
name=name,
email=email,
is_test_profile=True,
)
Expand Down
15 changes: 14 additions & 1 deletion tests/storage/sqlite_dos/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pathlib

import pytest
from aiida.storage.sqlite_dos.backend import SqliteDosStorage
from aiida.storage.sqlite_dos.backend import FILENAME_CONTAINER, FILENAME_DATABASE, SqliteDosStorage


@pytest.mark.usefixtures('chdir_tmp_path')
Expand All @@ -25,3 +25,16 @@ def test_archive_import(aiida_config, aiida_profile_factory):
assert QueryBuilder().append(Node).count() == 0
import_archive(get_archive_file('calcjob/arithmetic.add.aiida'))
assert QueryBuilder().append(Node).count() > 0


def test_backup(aiida_config, aiida_profile_factory, tmp_path, manager):
"""Test the backup implementation."""
with aiida_profile_factory(aiida_config, storage_backend='core.sqlite_dos'):
storage = manager.get_profile_storage()
storage.backup(str(tmp_path))
sphuber marked this conversation as resolved.
Show resolved Hide resolved
filepath_last = tmp_path / 'last-backup'
assert (tmp_path / 'config.json').exists()
assert filepath_last.is_symlink()
dirpath_backup = filepath_last.resolve()
assert (dirpath_backup / FILENAME_DATABASE).exists()
assert (dirpath_backup / FILENAME_CONTAINER).exists()
Loading