diff --git a/packaging/package-indexer/Dockerfile b/packaging/package-indexer/Dockerfile index dcf04e7b27..87d21361c6 100644 --- a/packaging/package-indexer/Dockerfile +++ b/packaging/package-indexer/Dockerfile @@ -10,4 +10,6 @@ RUN pip install \ azure-mgmt-cdn \ azure-identity \ pyyaml \ - types-PyYAML + types-PyYAML \ + boto3 \ + types-requests diff --git a/packaging/package-indexer/cdn/__init__.py b/packaging/package-indexer/cdn/__init__.py index ea1a36d0fe..2a18e05cec 100644 --- a/packaging/package-indexer/cdn/__init__.py +++ b/packaging/package-indexer/cdn/__init__.py @@ -25,10 +25,12 @@ from .cdn import CDN from .azure_cdn import AzureCDN +from .cloudflare_cdn import CloudflareCDN __all__ = [ "CDN", "AzureCDN", + "CloudflareCDN", "get_implementations", ] diff --git a/packaging/package-indexer/cdn/azure_cdn.py b/packaging/package-indexer/cdn/azure_cdn.py index 9a6d73ff5a..b43e23a034 100644 --- a/packaging/package-indexer/cdn/azure_cdn.py +++ b/packaging/package-indexer/cdn/azure_cdn.py @@ -91,11 +91,8 @@ def from_config(cfg: dict) -> CDN: client_secret=cfg["client-secret"], ) - def refresh_cache(self, path: Path) -> None: + def _refresh_cache(self, path: Path) -> None: path_str = str(Path("/", path)) - - self._log_info("Refreshing CDN cache.", path=path_str) - poller: LROPoller = self.__cdn.endpoints.begin_purge_content( resource_group_name=self.__resource_group_name, profile_name=self.__profile_name, @@ -107,5 +104,3 @@ def refresh_cache(self, path: Path) -> None: status = poller.status() if not status == "Succeeded": raise Exception("Failed to refresh CDN cache. status: {}".format(status)) - - self._log_info("Successfully refreshed CDN cache.", path=path_str) diff --git a/packaging/package-indexer/cdn/cdn.py b/packaging/package-indexer/cdn/cdn.py index cc003cc45a..12a51a7171 100644 --- a/packaging/package-indexer/cdn/cdn.py +++ b/packaging/package-indexer/cdn/cdn.py @@ -42,9 +42,14 @@ def from_config(config: dict) -> CDN: pass @abstractmethod - def refresh_cache(self, path: Path) -> None: + def _refresh_cache(self, path: Path) -> None: pass + def refresh_cache(self, path: Path) -> None: + self._log_info("Refreshing CDN cache.", path=str(path)) + self._refresh_cache(path) + self._log_info("Successfully refreshed CDN cache.", path=str(path)) + @staticmethod def __create_logger() -> logging.Logger: logger = logging.getLogger("CDN") diff --git a/packaging/package-indexer/cdn/cloudflare_cdn.py b/packaging/package-indexer/cdn/cloudflare_cdn.py new file mode 100644 index 0000000000..f2489402cd --- /dev/null +++ b/packaging/package-indexer/cdn/cloudflare_cdn.py @@ -0,0 +1,67 @@ +############################################################################# +# Copyright (c) 2024 Attila Szakacs +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License version 2 as published +# by the Free Software Foundation, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# +# As an additional exemption you are allowed to compile & link against the +# OpenSSL libraries as published by the OpenSSL project. See the file +# COPYING for details. +# +############################################################################# + +import requests +from pathlib import Path + +from .cdn import CDN + + +class CloudflareCDN(CDN): + """ + A `CDN` implementation that can connect to a Cloudflare CDN instance. + + Example config: + + ```yaml + vendor: "cloudflare" + all: + zone-id: "secret1" + api-token: "secret2" + ``` + """ + + def __init__(self, zone_id: str, api_token: str) -> None: + self.__zone_id = zone_id + self.__api_token = api_token + + super().__init__() + + @staticmethod + def get_config_keyword() -> str: + return "cloudflare" + + @staticmethod + def from_config(cfg: dict) -> CDN: + return CloudflareCDN( + zone_id=cfg["zone-id"], + api_token=cfg["api-token"], + ) + + def _refresh_cache(self, path: Path) -> None: + url = f"https://api.cloudflare.com/client/v4/zones/{self.__zone_id}/purge_cache" + headers = {"Authorization": f"Bearer {self.__api_token}"} + data = {"purge_everything": True} + + response = requests.post(url, headers=headers, json=data).json() + if not response.get("success", False): + raise Exception("Failed to refresh CDN cache. response: {}".format(response)) diff --git a/packaging/package-indexer/indexer/deb_indexer.py b/packaging/package-indexer/indexer/deb_indexer.py index 757bdabc7b..b0c46c08bd 100644 --- a/packaging/package-indexer/indexer/deb_indexer.py +++ b/packaging/package-indexer/indexer/deb_indexer.py @@ -88,6 +88,7 @@ def __create_packages_files(self, indexed_dir: Path) -> None: command = base_command + [str(relative_pkg_dir)] packages_file_path = Path(pkg_dir, "Packages") + packages_file_path.parent.mkdir(parents=True, exist_ok=True) with packages_file_path.open("w") as packages_file: self._log_info("Creating `Packages` file.", packages_file_path=str(packages_file_path)) utils.execute_command(command, dir=dir, stdout=packages_file) @@ -111,6 +112,7 @@ def __create_release_file(self, indexed_dir: Path) -> None: command = ["apt-ftparchive", "release", "."] release_file_path = Path(indexed_dir, "Release") + release_file_path.parent.mkdir(parents=True, exist_ok=True) with release_file_path.open("w") as release_file: self._log_info("Creating `Release` file.", release_file_path=str(release_file_path)) utils.execute_command( diff --git a/packaging/package-indexer/pyproject.toml b/packaging/package-indexer/pyproject.toml index 45e10c74a0..fbfc06f443 100644 --- a/packaging/package-indexer/pyproject.toml +++ b/packaging/package-indexer/pyproject.toml @@ -8,3 +8,13 @@ disallow_untyped_defs = true module = "azure.*" follow_imports = "skip" ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "boto3.*" +follow_imports = "skip" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "botocore.*" +follow_imports = "skip" +ignore_missing_imports = true diff --git a/packaging/package-indexer/remote_storage_synchronizer/__init__.py b/packaging/package-indexer/remote_storage_synchronizer/__init__.py index b510ea11d6..306f688b9f 100644 --- a/packaging/package-indexer/remote_storage_synchronizer/__init__.py +++ b/packaging/package-indexer/remote_storage_synchronizer/__init__.py @@ -25,10 +25,12 @@ from .remote_storage_synchronizer import RemoteStorageSynchronizer from .azure_container_synchronizer import AzureContainerSynchronizer +from .s3_bucket_synchronizer import S3BucketSynchronizer __all__ = [ "RemoteStorageSynchronizer", "AzureContainerSynchronizer", + "S3BucketSynchronizer", "get_implementations", ] diff --git a/packaging/package-indexer/remote_storage_synchronizer/azure_container_synchronizer.py b/packaging/package-indexer/remote_storage_synchronizer/azure_container_synchronizer.py index 13303b0a42..0211f730b7 100644 --- a/packaging/package-indexer/remote_storage_synchronizer/azure_container_synchronizer.py +++ b/packaging/package-indexer/remote_storage_synchronizer/azure_container_synchronizer.py @@ -20,13 +20,12 @@ # ############################################################################# -from hashlib import md5 from pathlib import Path -from typing import List, Optional +from typing import Any, Dict, List from azure.storage.blob import BlobClient, ContainerClient -from .remote_storage_synchronizer import FileSyncState, RemoteStorageSynchronizer +from .remote_storage_synchronizer import RemoteStorageSynchronizer DEFAULT_ROOT_DIR = Path("/tmp/azure_container_synchronizer") @@ -55,7 +54,6 @@ class AzureContainerSynchronizer(RemoteStorageSynchronizer): def __init__(self, connection_string: str, storage_name: str) -> None: self.__client = ContainerClient.from_connection_string(conn_str=connection_string, container_name=storage_name) - self.__remote_files_cache: Optional[List[dict]] = None super().__init__( remote_root_dir=Path(""), local_root_dir=Path(DEFAULT_ROOT_DIR, storage_name), @@ -72,181 +70,38 @@ def from_config(cfg: dict) -> RemoteStorageSynchronizer: storage_name=cfg["storage-name"], ) - @property - def local_files(self) -> List[Path]: - dirs_and_files = list(self.local_dir.working_dir.rglob("*")) - return list(filter(lambda path: path.is_file(), dirs_and_files)) - - @property - def remote_files(self) -> List[dict]: - if self.__remote_files_cache is not None: - return self.__remote_files_cache - + def _list_remote_files(self) -> List[Dict[str, Any]]: file_name_prefix = "{}/".format(self.remote_dir.working_dir) - self.__remote_files_cache = [dict(blob) for blob in self.__client.list_blobs(name_starts_with=file_name_prefix)] + return [dict(blob) for blob in self.__client.list_blobs(name_starts_with=file_name_prefix)] - return self.__remote_files_cache - - def __download_file(self, relative_file_path: str) -> None: + def _download_file(self, relative_file_path: str) -> None: download_path = Path(self.local_dir.root_dir, relative_file_path).resolve() - - self._log_info( - "Downloading file.", - remote_path=relative_file_path, - local_path=str(download_path), - ) - download_path.parent.mkdir(parents=True, exist_ok=True) with download_path.open("wb") as downloaded_blob: blob_data = self.__client.download_blob(relative_file_path) blob_data.readinto(downloaded_blob) - def __upload_file(self, relative_file_path: str) -> None: + def _upload_file(self, relative_file_path: str) -> None: local_path = Path(self.local_dir.root_dir, relative_file_path) - - self._log_info( - "Uploading file.", - local_path=str(local_path), - remote_path=relative_file_path, - ) - with local_path.open("rb") as local_file_data: self.__client.upload_blob(relative_file_path, local_file_data, overwrite=True) - def __delete_local_file(self, relative_file_path: str) -> None: - local_file_path = Path(self.local_dir.root_dir, relative_file_path).resolve() - self._log_info("Deleting local file.", local_path=str(local_file_path)) - local_file_path.unlink() - - def __delete_remote_file(self, relative_file_path: str) -> None: - self._log_info("Deleting remote file.", remote_path=relative_file_path) + def _delete_remote_file(self, relative_file_path: str) -> None: self.__client.delete_blob(relative_file_path, delete_snapshots="include") - def sync_from_remote(self) -> None: - self._log_info( - "Syncing content from remote.", - remote_workdir=str(self.remote_dir.working_dir), - local_workdir=str(self.local_dir.working_dir), - ) - for file in self.__all_files: - sync_state = self.__get_file_sync_state(file) - if sync_state == FileSyncState.IN_SYNC: - continue - if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_LOCAL: - self.__download_file(file) - continue - if sync_state == FileSyncState.NOT_IN_REMOTE: - self.__delete_local_file(file) - continue - raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) - self._log_info( - "Successfully synced remote content.", - remote_workdir=str(self.remote_dir.working_dir), - local_workdir=str(self.local_dir.working_dir), - ) - - def sync_to_remote(self) -> None: - self._log_info( - "Syncing content to remote.", - local_workdir=str(self.local_dir.working_dir), - remote_workdir=str(self.remote_dir.working_dir), - ) - for file in self.__all_files: - sync_state = self.__get_file_sync_state(file) - if sync_state == FileSyncState.IN_SYNC: - continue - if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_REMOTE: - self.__upload_file(file) - continue - if sync_state == FileSyncState.NOT_IN_LOCAL: - self.__delete_remote_file(file) - continue - raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) - self._log_info( - "Successfully synced local content.", - remote_workdir=str(self.remote_dir.working_dir), - local_workdir=str(self.local_dir.working_dir), - ) - self.__invalidate_remote_files_cache() - - def create_snapshot_of_remote(self) -> None: - self._log_info("Creating snapshot of the remote container.") - for file in self.remote_files: + def _create_snapshot_of_remote(self) -> None: + for file in self._remote_files: blob_client: BlobClient = self.__client.get_blob_client(file["name"]) snapshot_properties = blob_client.create_snapshot() self._log_debug( "Successfully created snapshot of remote file.", - remote_path=self.__get_relative_file_path_for_remote_file(file), - snapshot_properties=snapshot_properties, + remote_path=self._get_relative_file_path_for_remote_file(file), + snapshot_properties=str(snapshot_properties), ) - def __get_md5_of_remote_file(self, relative_file_path: str) -> bytearray: - for file in self.remote_files: - if file["name"] == relative_file_path: - return file["content_settings"]["content_md5"] - raise FileNotFoundError - - def __get_md5_of_local_file(self, relative_file_path: str) -> bytes: - file = Path(self.local_dir.root_dir, relative_file_path) - return md5(file.read_bytes()).digest() - - def __get_file_sync_state(self, relative_file_path: str) -> FileSyncState: - try: - remote_md5 = self.__get_md5_of_remote_file(relative_file_path) - except FileNotFoundError: - self._log_debug( - "Local file is not available remotely.", - local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - unavailable_remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - ) - return FileSyncState.NOT_IN_REMOTE - - try: - local_md5 = self.__get_md5_of_local_file(relative_file_path) - except FileNotFoundError: - self._log_debug( - "Remote file is not available locally.", - remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - unavailable_local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - ) - return FileSyncState.NOT_IN_LOCAL - - if remote_md5 != local_md5: - self._log_debug( - "File differs locally and remotely.", - remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - remote_md5sum=remote_md5.hex(), - local_md5sum=local_md5.hex(), - ) - return FileSyncState.DIFFERENT - - self._log_debug( - "File is in sync.", - remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), - local_path=str(Path(self.local_dir.root_dir, relative_file_path)), - md5sum=remote_md5.hex(), - ) - return FileSyncState.IN_SYNC - - def __get_relative_file_path_for_local_file(self, file: Path) -> str: - return str(file.relative_to(self.local_dir.root_dir)) - - def __get_relative_file_path_for_remote_file(self, file: dict) -> str: + def _get_relative_file_path_for_remote_file(self, file: Dict[str, Any]) -> str: return file["name"] - @property - def __all_files(self) -> List[str]: - files = set() - for local_file in self.local_files: - files.add(self.__get_relative_file_path_for_local_file(local_file)) - for remote_file in self.remote_files: - files.add(self.__get_relative_file_path_for_remote_file(remote_file)) - return sorted(files) - - def __invalidate_remote_files_cache(self) -> None: - self.__remote_files_cache = None - def _prepare_log(self, message: str, **kwargs: str) -> str: log = "[{} :: {}]\t{}".format(self.__client.container_name, str(self.remote_dir.working_dir), message) return super()._prepare_log(log, **kwargs) diff --git a/packaging/package-indexer/remote_storage_synchronizer/remote_storage_synchronizer.py b/packaging/package-indexer/remote_storage_synchronizer/remote_storage_synchronizer.py index 7f12420642..1b5eb33cea 100644 --- a/packaging/package-indexer/remote_storage_synchronizer/remote_storage_synchronizer.py +++ b/packaging/package-indexer/remote_storage_synchronizer/remote_storage_synchronizer.py @@ -1,5 +1,6 @@ ############################################################################# # Copyright (c) 2022 One Identity +# Copyright (c) 2024 Attila Szakacs # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License version 2 as published @@ -22,10 +23,13 @@ from __future__ import annotations + import logging from abc import ABC, abstractmethod from enum import Enum, auto +from hashlib import sha512 from pathlib import Path +from typing import Any, Dict, List, Optional class WorkingDir: @@ -57,8 +61,13 @@ def __init__(self, remote_root_dir: Path, local_root_dir: Path) -> None: self.remote_dir = WorkingDir(remote_root_dir) self.local_dir = WorkingDir(local_root_dir) + self.__remote_files_cache: Optional[List[dict]] = None self.__logger = RemoteStorageSynchronizer.__create_logger() + def set_sub_dir(self, sub_dir: Path) -> None: + self.remote_dir.set_sub_dir(sub_dir) + self.local_dir.set_sub_dir(sub_dir) + @staticmethod @abstractmethod def get_config_keyword() -> str: @@ -69,21 +78,191 @@ def get_config_keyword() -> str: def from_config(cfg: dict) -> RemoteStorageSynchronizer: pass - @abstractmethod def sync_from_remote(self) -> None: + self._log_info( + "Syncing content from remote.", + remote_workdir=str(self.remote_dir.working_dir), + local_workdir=str(self.local_dir.working_dir), + ) + for file in self._all_files: + sync_state = self.__get_file_sync_state(file) + if sync_state == FileSyncState.IN_SYNC: + continue + if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_LOCAL: + self.__download_file(file) + continue + if sync_state == FileSyncState.NOT_IN_REMOTE: + self.__delete_local_file(file) + continue + raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) + self._log_info( + "Successfully synced remote content.", + remote_workdir=str(self.remote_dir.working_dir), + local_workdir=str(self.local_dir.working_dir), + ) + + def sync_to_remote(self) -> None: + self._log_info( + "Syncing content to remote.", + local_workdir=str(self.local_dir.working_dir), + remote_workdir=str(self.remote_dir.working_dir), + ) + for file in self._all_files: + sync_state = self.__get_file_sync_state(file) + if sync_state == FileSyncState.IN_SYNC: + continue + if sync_state == FileSyncState.DIFFERENT or sync_state == FileSyncState.NOT_IN_REMOTE: + self.__upload_file(file) + continue + if sync_state == FileSyncState.NOT_IN_LOCAL: + self.__delete_remote_file(file) + continue + raise NotImplementedError("Unexpected FileSyncState: {}".format(sync_state)) + self._log_info( + "Successfully synced local content.", + remote_workdir=str(self.remote_dir.working_dir), + local_workdir=str(self.local_dir.working_dir), + ) + self.__invalidate_remote_files_cache() + + @abstractmethod + def _create_snapshot_of_remote(self) -> None: pass + def create_snapshot_of_remote(self) -> None: + self._log_info("Creating snapshot of remote") + self._create_snapshot_of_remote() + + @property + def _local_files(self) -> List[Path]: + dirs_and_files = list(self.local_dir.working_dir.rglob("*")) + return list(filter(lambda path: path.is_file(), dirs_and_files)) + @abstractmethod - def sync_to_remote(self) -> None: + def _list_remote_files(self) -> List[Dict[str, Any]]: pass + @property + def _remote_files(self) -> List[Dict[str, Any]]: + if self.__remote_files_cache is not None: + return self.__remote_files_cache + + self.__remote_files_cache = self._list_remote_files() + return self.__remote_files_cache + + def __invalidate_remote_files_cache(self) -> None: + self.__remote_files_cache = None + + @property + def _all_files(self) -> List[str]: + files = set() + for local_file in self._local_files: + files.add(self.__get_relative_file_path_for_local_file(local_file)) + for remote_file in self._remote_files: + files.add(self._get_relative_file_path_for_remote_file(remote_file)) + return sorted(files) + @abstractmethod - def create_snapshot_of_remote(self) -> None: + def _download_file(self, relative_file_path: str) -> None: pass - def set_sub_dir(self, sub_dir: Path) -> None: - self.remote_dir.set_sub_dir(sub_dir) - self.local_dir.set_sub_dir(sub_dir) + def __download_file(self, relative_file_path: str) -> None: + download_path = self._get_local_file_path_for_relative_file(relative_file_path) + self._log_info("Downloading file.", remote_path=relative_file_path, local_path=str(download_path)) + self._download_file(relative_file_path) + self._log_info("Successfully downloaded file.", remote_path=relative_file_path, local_path=str(download_path)) + + sha512sum = sha512(download_path.read_bytes()).digest() + sha512sum_file_path = self.__get_remote_sha512sum_file_path(relative_file_path) + sha512sum_file_path.parent.mkdir(exist_ok=True, parents=True) + sha512sum_file_path.write_bytes(sha512sum) + + @abstractmethod + def _upload_file(self, relative_file_path: str) -> None: + pass + + def __upload_file(self, relative_file_path: str) -> None: + local_path = self._get_local_file_path_for_relative_file(relative_file_path) + + self._log_info("Uploading file.", local_path=str(local_path), remote_path=relative_file_path) + self._upload_file(relative_file_path) + self._log_info("Successfully uploaded file.", remote_path=relative_file_path, local_path=str(local_path)) + + def __delete_local_file(self, relative_file_path: str) -> None: + local_file_path = Path(self.local_dir.root_dir, relative_file_path).resolve() + self._log_info("Deleting local file.", local_path=str(local_file_path)) + local_file_path.unlink() + self._log_info("Successfully deleted local file.", local_path=str(local_file_path)) + + @abstractmethod + def _delete_remote_file(self, relative_file_path: str) -> None: + pass + + def __delete_remote_file(self, relative_file_path: str) -> None: + self._log_info("Deleting remote file.", remote_path=relative_file_path) + self._delete_remote_file(relative_file_path) + self._log_info("Successfully deleted remote file.", remote_path=relative_file_path) + + def __get_relative_file_path_for_local_file(self, file: Path) -> str: + return str(file.relative_to(self.local_dir.root_dir)) + + @abstractmethod + def _get_relative_file_path_for_remote_file(self, file: Dict[str, Any]) -> str: + pass + + def _get_local_file_path_for_relative_file(self, relative_file_path: str) -> Path: + return Path(self.local_dir.root_dir, relative_file_path).resolve() + + def __get_sha512_of_local_file(self, relative_file_path: str) -> bytes: + file = Path(self.local_dir.root_dir, relative_file_path) + return sha512(file.read_bytes()).digest() + + def __get_remote_sha512sum_file_path(self, relative_file_path: str) -> Path: + path = Path(self.local_dir.root_dir, "package-indexer-sha512sums", relative_file_path).resolve() + return Path(path.parent, path.name + ".package-indexer-sha512sum") + + def __get_sha512_of_remote_file(self, relative_file_path: str) -> bytes: + sha512sum_file_path = self.__get_remote_sha512sum_file_path(relative_file_path) + return sha512sum_file_path.read_bytes() + + def __get_file_sync_state(self, relative_file_path: str) -> FileSyncState: + try: + local_sha512 = self.__get_sha512_of_local_file(relative_file_path) + except FileNotFoundError: + self._log_debug( + "Remote file is not available locally.", + remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), + unavailable_local_path=str(Path(self.local_dir.root_dir, relative_file_path)), + ) + return FileSyncState.NOT_IN_LOCAL + + try: + remote_sha512 = self.__get_sha512_of_remote_file(relative_file_path) + except FileNotFoundError: + self._log_debug( + "Local file is not available remotely.", + local_path=str(Path(self.local_dir.root_dir, relative_file_path)), + unavailable_remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), + ) + return FileSyncState.NOT_IN_REMOTE + + if remote_sha512 != local_sha512: + self._log_debug( + "File differs locally and remotely.", + remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), + local_path=str(Path(self.local_dir.root_dir, relative_file_path)), + remote_sha512sum=remote_sha512.hex(), + local_sha512sum=local_sha512.hex(), + ) + return FileSyncState.DIFFERENT + + self._log_debug( + "File is in sync.", + remote_path=str(Path(self.remote_dir.root_dir, relative_file_path)), + local_path=str(Path(self.local_dir.root_dir, relative_file_path)), + sha512sum=remote_sha512.hex(), + ) + return FileSyncState.IN_SYNC @staticmethod def __create_logger() -> logging.Logger: @@ -96,6 +275,10 @@ def _prepare_log(self, message: str, **kwargs: str) -> str: message += "\t{}".format(kwargs) return message + def _log_error(self, message: str, **kwargs: str) -> None: + log = self._prepare_log(message, **kwargs) + self.__logger.error(log) + def _log_info(self, message: str, **kwargs: str) -> None: log = self._prepare_log(message, **kwargs) self.__logger.info(log) diff --git a/packaging/package-indexer/remote_storage_synchronizer/s3_bucket_synchronizer.py b/packaging/package-indexer/remote_storage_synchronizer/s3_bucket_synchronizer.py new file mode 100644 index 0000000000..6cdddb63d8 --- /dev/null +++ b/packaging/package-indexer/remote_storage_synchronizer/s3_bucket_synchronizer.py @@ -0,0 +1,140 @@ +############################################################################# +# Copyright (c) 2024 Attila Szakacs +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License version 2 as published +# by the Free Software Foundation, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# +# As an additional exemption you are allowed to compile & link against the +# OpenSSL libraries as published by the OpenSSL project. See the file +# COPYING for details. +# +############################################################################# + +from pathlib import Path +from typing import Any, Dict, List + +from boto3 import Session +from botocore.exceptions import ClientError, EndpointConnectionError + +from .remote_storage_synchronizer import RemoteStorageSynchronizer + +DEFAULT_ROOT_DIR = Path("/tmp/s3_bucket_synchronizer") + + +class S3BucketSynchronizer(RemoteStorageSynchronizer): + """ + A `RemoteStorageSynchronizer` implementation that can connect to an S3 Bucket instance. + + Example config: + + ```yaml + vendor: "s3" + incoming: + all: + endpoint: "url1" + bucket: "bucket1" + access-key: "accesskey1" + secret-key: "secretkey1" + indexed: + stable: + endpoint: "url2" + bucket: "bucket2" + access-key: "accesskey2" + secret-key: "secretkey2" + nightly: + endpoint: "url3" + bucket: "bucket3" + access-key: "accesskey3" + secret-key: "secretkey3" + ``` + """ + + def __init__(self, access_key: str, secret_key: str, endpoint: str, bucket: str) -> None: + self.__session = Session( + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + self.__client = self.__session.client( + service_name="s3", + endpoint_url=endpoint, + ) + self.__bucket = bucket + super().__init__( + remote_root_dir=Path(""), + local_root_dir=Path(DEFAULT_ROOT_DIR, bucket), + ) + + @staticmethod + def get_config_keyword() -> str: + return "s3" + + @staticmethod + def from_config(cfg: dict) -> RemoteStorageSynchronizer: + return S3BucketSynchronizer( + access_key=cfg["access-key"], + secret_key=cfg["secret-key"], + endpoint=cfg["endpoint"], + bucket=cfg["bucket"], + ) + + def _list_remote_files(self) -> List[Dict[str, Any]]: + objects: List[Dict[str, Any]] = [] + pagination_options: Dict[str, str] = {} + + while True: + try: + response: Dict[str, Any] = self.__client.list_objects( + Bucket=self.__bucket, + **pagination_options, + ) + except (ClientError, EndpointConnectionError) as e: + self._log_error(f"Failed to list objects of bucket: {self.__bucket} => {e}") + return [] + + try: + for obj in response.get("Contents", []): + objects.append(obj) + if not response["IsTruncated"]: + break + pagination_options = {"Marker": response["Contents"][-1]["Key"]} + except KeyError: + self._log_error( + f"Failed to list objects of bucket: {self.__bucket}/ => Unexpected response: {response}" + ) + return [] + + return objects + + def _download_file(self, relative_file_path: str) -> None: + download_path = self._get_local_file_path_for_relative_file(relative_file_path) + download_path.parent.mkdir(parents=True, exist_ok=True) + with download_path.open("wb") as downloaded_object: + self.__client.download_fileobj(self.__bucket, relative_file_path, downloaded_object) + + def _upload_file(self, relative_file_path: str) -> None: + local_path = self._get_local_file_path_for_relative_file(relative_file_path) + with local_path.open("rb") as local_file_data: + self.__client.upload_fileobj(local_file_data, self.__bucket, relative_file_path) + + def _delete_remote_file(self, relative_file_path: str) -> None: + self.__client.delete_object(Bucket=self.__bucket, Key=relative_file_path) + + def _create_snapshot_of_remote(self) -> None: + self._log_info("Cannot create snapshot, not implemented, skipping...") + + def _get_relative_file_path_for_remote_file(self, file: Dict[str, Any]) -> str: + return file["Key"] + + def _prepare_log(self, message: str, **kwargs: str) -> str: + log = "[{} :: {}]\t{}".format(self.__bucket, str(self.remote_dir.working_dir), message) + return super()._prepare_log(log, **kwargs) diff --git a/tests/copyright/policy b/tests/copyright/policy index f97b53aef8..218a1e95b5 100644 --- a/tests/copyright/policy +++ b/tests/copyright/policy @@ -273,6 +273,8 @@ modules/kvformat/tests/test_filterx_func_parse_kv.c modules/kvformat/tests/test_filterx_func_format_kv.c docker/python-modules/webhook/scl/webhook.conf docker/python-modules/webhook/source.py +packaging/package-indexer/remote_storage_synchronizer/s3_bucket_synchronizer.py +packaging/package-indexer/cdn/cloudflare_cdn.py ########################################################################### # These files are GPLd with Balabit origin.