-
Notifications
You must be signed in to change notification settings - Fork 273
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Zelin Hao <zelinhao@amazon.com>
- Loading branch information
Showing
5 changed files
with
117 additions
and
69 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
#!/usr/bin/env python | ||
|
||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# The OpenSearch Contributors require contributions made to | ||
# this file be licensed under the Apache-2.0 license or a | ||
# compatible open source license. | ||
|
||
import logging | ||
import os | ||
from pathlib import Path | ||
from typing import List | ||
from abc import ABC, abstractmethod | ||
|
||
from git.git_repository import GitRepository | ||
|
||
""" | ||
This class is responsible for signing an artifact using the OpenSearch-signer-client and verifying its signature. | ||
The signed artifacts will be found in the same location as the original artifacts. | ||
""" | ||
|
||
|
||
class Signer(ABC): | ||
git_repo: GitRepository | ||
|
||
def __init__(self) -> None: | ||
self.git_repo = GitRepository(self.get_repo_url(), "HEAD", working_subdirectory="src") | ||
self.git_repo.execute("./bootstrap") | ||
self.git_repo.execute("rm config.cfg") | ||
|
||
def sign_artifact(self, artifact: str, basepath: Path, signature_type: str) -> None: | ||
if not self.is_valid_file_type(artifact): | ||
logging.info(f"Skipping signing of file {artifact}") | ||
return | ||
self.generate_signature_and_verify(artifact, basepath, signature_type) | ||
|
||
def sign_artifacts(self, artifacts: List[str], basepath: Path, signature_type: str) -> None: | ||
for artifact in artifacts: | ||
if not self.is_valid_file_type(artifact): | ||
logging.info(f"Skipping signing of file {artifact}") | ||
continue | ||
self.generate_signature_and_verify(artifact, basepath, signature_type) | ||
|
||
@abstractmethod | ||
def generate_signature_and_verify(self, artifact: str, basepath: Path, signature_type: str) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def is_valid_file_type(self, file_name: str) -> bool: | ||
pass | ||
|
||
def get_repo_url(self) -> str: | ||
if "GITHUB_TOKEN" in os.environ: | ||
return "https://${GITHUB_TOKEN}@github.com/opensearch-project/opensearch-signer-client.git" | ||
return "https://github.com/opensearch-project/opensearch-signer-client.git" | ||
|
||
def __remove_existing_signature__(self, signature_file: str) -> None: | ||
if os.path.exists(signature_file): | ||
logging.warning(f"Removing existing signature file {signature_file}") | ||
os.remove(signature_file) | ||
|
||
@abstractmethod | ||
def sign(self, filename: str, signature_type: str) -> None: | ||
pass | ||
|
||
@abstractmethod | ||
def verify(self, filename: str) -> None: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
#!/usr/bin/env python | ||
|
||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# The OpenSearch Contributors require contributions made to | ||
# this file be licensed under the Apache-2.0 license or a | ||
# compatible open source license. | ||
|
||
from typing import Any, List, Type, Union | ||
|
||
from sign_workflow.signer_pgp import SignerPGP | ||
from sign_workflow.signer_windows import SignerWindows | ||
from sign_workflow.signer import Signer | ||
|
||
""" | ||
This class is responsible for signing an artifact using the OpenSearch-signer-client and verifying its signature. | ||
The signed artifacts will be found in the same location as the original artifacts. | ||
""" | ||
|
||
|
||
class Signers: | ||
TYPES = { | ||
"windows": SignerWindows, | ||
"linux": SignerPGP, | ||
} | ||
|
||
@classmethod | ||
def from_platform(cls, platform: str) -> Signer: | ||
klass = cls.TYPES.get(platform, None) | ||
if not klass: | ||
raise ValueError(f"Unsupported type of platform for signing: {platform}") | ||
return klass # type: ignore[return-value] | ||
|
||
@classmethod | ||
def create(cls, platform: str) -> Signer: | ||
klass = cls.from_platform(platform) | ||
return klass() # type: ignore[no-any-return, operator] |