Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,27 @@ class GoogleCloudSecretManagerHook(GoogleBaseHook):
See https://cloud.google.com/secret-manager
"""

def __init__(self, location: str | None = None, **kwargs) -> None:
super().__init__(**kwargs)
self.location = location

@cached_property
def client(self):
"""
Create a Secret Manager Client.

:return: Secret Manager client.
"""
return SecretManagerServiceClient(credentials=self.get_credentials(), client_info=CLIENT_INFO)
if self.location is not None:
return SecretManagerServiceClient(
credentials=self.get_credentials(),
client_info=CLIENT_INFO,
client_options={"api_endpoint": f"secretmanager.{self.location}.rep.googleapis.com"},
)
return SecretManagerServiceClient(
credentials=self.get_credentials(),
client_info=CLIENT_INFO,
)

def get_conn(self) -> SecretManagerServiceClient:
"""
Expand All @@ -64,6 +77,60 @@ def get_conn(self) -> SecretManagerServiceClient:
"""
return self.client

def _get_parent(self, project_id: str, location: str | None = None) -> str:
"""
Return parent path.

:param project_id: Required. ID of the GCP project that owns the job.
:param location: Optional. Target location. If set to ``None`` or missing, the location provided for GoogleCloudSecretHook instantiation is used
For more details : https://cloud.google.com/secret-manager/docs/locations

:return: Parent path.
"""
_location = location or self.location
if _location is not None:
return self.client.common_location_path(project_id, _location)
return self.client.common_project_path(project_id)

def _get_secret_path(self, project_id: str, secret_id: str, location: str | None = None) -> str:
"""
Return secret path.

:param project_id: Required. ID of the GCP project that owns the job.
:param secret_id: Required. Secret ID for which path is required.
:param location: Optional. Target location. If set to ``None`` or missing, the location provided for GoogleCloudSecretHook instantiation is used
For more details : https://cloud.google.com/secret-manager/docs/locations

:return: Parent path.
"""
_location = location or self.location
if _location is not None:
# Google's client library does not provide a method to construct regional secret paths, so constructing manually.
return f"projects/{project_id}/locations/{_location}/secrets/{secret_id}"
return self.client.secret_path(project_id, secret_id)

def _get_secret_version_path(
self, project_id: str, secret_id: str, secret_version: str, location: str | None = None
) -> str:
"""
Return secret version path.

:param project_id: Required. ID of the GCP project that owns the job.
:param secret_id: Required. Secret ID for which path is required.
:param secret_version: Required. Secret version for which path is required.
:param location: Optional. Target location. If set to ``None`` or missing, the location provided for GoogleCloudSecretHook instantiation is used
For more details : https://cloud.google.com/secret-manager/docs/locations

:return: Parent path.
"""
_location = location or self.location
if _location is not None:
# Google's client library does not provide a method to construct regional secret version paths, so constructing manually.
return (
f"projects/{project_id}/locations/{_location}/secrets/{secret_id}/versions/{secret_version}"
)
return self.client.secret_version_path(project_id, secret_id, secret_version)

@GoogleBaseHook.fallback_to_default_project_id
def create_secret(
self,
Expand All @@ -73,6 +140,7 @@ def create_secret(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str | None = None,
) -> Secret:
"""
Create a secret.
Expand All @@ -88,12 +156,20 @@ def create_secret(
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param location: Optional. Location where secret should be created. Used for creating regional secret. If set to ``None`` or missing, the location provided for GoogleCloudSecretHook instantiation is used
For more details : https://cloud.google.com/secret-manager/docs/locations
:return: Secret object.
"""
_secret = secret or {"replication": {"automatic": {}}}
if not secret:
_secret: dict | Secret = {}
if (location or self.location) is None:
_secret["replication"] = {"automatic": {}}
else:
_secret = secret

response = self.client.create_secret(
request={
"parent": f"projects/{project_id}",
"parent": self._get_parent(project_id, location),
"secret_id": secret_id,
"secret": _secret,
},
Expand All @@ -113,6 +189,7 @@ def add_secret_version(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str | None = None,
) -> SecretVersion:
"""
Add a version to the secret.
Expand All @@ -128,11 +205,13 @@ def add_secret_version(
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param location: Optional. Location where secret is located. Used for adding version to regional secret. If set to ``None`` or missing, the location provided for GoogleCloudSecretHook instantiation is used
For more details : https://cloud.google.com/secret-manager/docs/locations
:return: Secret version object.
"""
response = self.client.add_secret_version(
request={
"parent": f"projects/{project_id}/secrets/{secret_id}",
"parent": self._get_secret_path(project_id, secret_id, location),
"payload": secret_payload,
},
retry=retry,
Expand All @@ -152,6 +231,7 @@ def list_secrets(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str | None = None,
) -> ListSecretsPager:
"""
List secrets.
Expand All @@ -168,11 +248,13 @@ def list_secrets(
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param location: Optional. The regional secrets stored in the provided location will be listed. If set to ``None`` or missing, the location provided for GoogleCloudSecretHook instantiation is used
For more details : https://cloud.google.com/secret-manager/docs/locations
:return: Secret List object.
"""
response = self.client.list_secrets(
request={
"parent": f"projects/{project_id}",
"parent": self._get_parent(project_id, location),
"page_size": page_size,
"page_token": page_token,
"filter": secret_filter,
Expand All @@ -185,18 +267,22 @@ def list_secrets(
return response

@GoogleBaseHook.fallback_to_default_project_id
def secret_exists(self, project_id: str, secret_id: str) -> bool:
def secret_exists(self, project_id: str, secret_id: str, location: str | None = None) -> bool:
"""
Check whether secret exists.

:param project_id: Required. ID of the GCP project that owns the job.
If set to ``None`` or missing, the default project_id from the GCP connection is used.
:param secret_id: Required. ID of the secret to find.
:param location: Optional. Location where secret is expected to be stored regionally. If set to ``None`` or missing, the location provided for GoogleCloudSecretHook instantiation is used
For more details : https://cloud.google.com/secret-manager/docs/locations
:return: True if the secret exists, False otherwise.
"""
secret_filter = f"name:{secret_id}"
secret_name = self.client.secret_path(project_id, secret_id)
for secret in self.list_secrets(project_id=project_id, page_size=100, secret_filter=secret_filter):
secret_name = self._get_secret_path(project_id, secret_id, location)
for secret in self.list_secrets(
project_id=project_id, page_size=100, secret_filter=secret_filter, location=location
):
if secret.name.split("/")[-1] == secret_id:
self.log.info("Secret %s exists.", secret_name)
return True
Expand All @@ -212,6 +298,7 @@ def access_secret(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str | None = None,
) -> AccessSecretVersionResponse:
"""
Access a secret version.
Expand All @@ -227,11 +314,13 @@ def access_secret(
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param location: Optional. Location where secret is stored regionally. If set to ``None`` or missing, the location provided for GoogleCloudSecretHook instantiation is used
For more details : https://cloud.google.com/secret-manager/docs/locations
:return: Access secret version response object.
"""
response = self.client.access_secret_version(
request={
"name": self.client.secret_version_path(project_id, secret_id, secret_version),
"name": self._get_secret_version_path(project_id, secret_id, secret_version, location),
},
retry=retry,
timeout=timeout,
Expand All @@ -248,6 +337,7 @@ def delete_secret(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str | None = None,
) -> None:
"""
Delete a secret.
Expand All @@ -262,9 +352,11 @@ def delete_secret(
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param location: Optional. Location where secret is stored regionally. If set to ``None`` or missing, the location provided for GoogleCloudSecretHook instantiation is used.
For more details : https://cloud.google.com/secret-manager/docs/locations
:return: Access secret version response object.
"""
name = self.client.secret_path(project_id, secret_id)
name = self._get_secret_path(project_id, secret_id, location)
self.client.delete_secret(
request={"name": name},
retry=retry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,61 @@
BASE_PACKAGE = "airflow.providers.google.common.hooks.base_google."
SECRETS_HOOK_PACKAGE = "airflow.providers.google.cloud.hooks.secret_manager."
SECRET_ID = "test-secret-id"
REGIONAL_SECRET_LOCATION = "test-location"
SECRET_VERSION = "test-secret-version"


class TestGoogleCloudSecretManagerHook:
def setup_method(self, method):
with patch(f"{BASE_PACKAGE}GoogleBaseHook.get_connection", return_value=MagicMock()):
self.hook = GoogleCloudSecretManagerHook()

@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook.client", new_callable=PropertyMock)
def test__get_parent(self, mock_client):
expected_value = mock_client.return_value.common_project_path.return_value
parent = self.hook._get_parent(GCP_PROJECT_ID_HOOK_UNIT_TEST)
assert expected_value == parent
mock_client.assert_called_once()
mock_client.return_value.common_project_path.assert_called_once_with(GCP_PROJECT_ID_HOOK_UNIT_TEST)

expected_value = mock_client.return_value.common_location_path.return_value
parent = self.hook._get_parent(GCP_PROJECT_ID_HOOK_UNIT_TEST, REGIONAL_SECRET_LOCATION)
assert expected_value == parent
# mock_client.assert_called_once() # will fail as already client has been triggered due to previous case.
mock_client.return_value.common_location_path.assert_called_once_with(
GCP_PROJECT_ID_HOOK_UNIT_TEST, REGIONAL_SECRET_LOCATION
)

@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook.client", new_callable=PropertyMock)
def test__get_secret_path(self, mock_client):
expected_value = mock_client.return_value.secret_path.return_value
secret_path = self.hook._get_secret_path(GCP_PROJECT_ID_HOOK_UNIT_TEST, SECRET_ID)
assert expected_value == secret_path
mock_client.assert_called_once()
mock_client.return_value.secret_path.assert_called_once_with(GCP_PROJECT_ID_HOOK_UNIT_TEST, SECRET_ID)

expected_value = f"projects/{GCP_PROJECT_ID_HOOK_UNIT_TEST}/locations/{REGIONAL_SECRET_LOCATION}/secrets/{SECRET_ID}"
parent = self.hook._get_secret_path(
GCP_PROJECT_ID_HOOK_UNIT_TEST, SECRET_ID, REGIONAL_SECRET_LOCATION
)
assert expected_value == parent

@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook.client", new_callable=PropertyMock)
def test__get_secret_version_path(self, mock_client):
expected_value = mock_client.return_value.secret_version_path.return_value
parent = self.hook._get_secret_version_path(GCP_PROJECT_ID_HOOK_UNIT_TEST, SECRET_ID, SECRET_VERSION)
assert expected_value == parent
mock_client.assert_called_once()
mock_client.return_value.secret_version_path.assert_called_once_with(
GCP_PROJECT_ID_HOOK_UNIT_TEST, SECRET_ID, SECRET_VERSION
)

expected_value = f"projects/{GCP_PROJECT_ID_HOOK_UNIT_TEST}/locations/{REGIONAL_SECRET_LOCATION}/secrets/{SECRET_ID}/versions/{SECRET_VERSION}"
parent = self.hook._get_secret_version_path(
GCP_PROJECT_ID_HOOK_UNIT_TEST, SECRET_ID, SECRET_VERSION, REGIONAL_SECRET_LOCATION
)
assert expected_value == parent

@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook.get_credentials")
@patch(f"{SECRETS_HOOK_PACKAGE}SecretManagerServiceClient")
def test_client(self, mock_client, mock_get_credentials):
Expand Down Expand Up @@ -66,9 +114,10 @@ def test_get_conn(self, mock_client):
(mock_secret := MagicMock(), mock_secret), # type: ignore[name-defined]
],
)
@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook._get_parent")
@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook.client", new_callable=PropertyMock)
def test_create_secret(self, mock_client, input_secret, expected_secret):
expected_parent = f"projects/{GCP_PROJECT_ID_HOOK_UNIT_TEST}"
def test_create_secret(self, mock_client, mock_get_parent, input_secret, expected_secret):
expected_parent = mock_get_parent.return_value # f"projects/{GCP_PROJECT_ID_HOOK_UNIT_TEST}"
expected_response = mock_client.return_value.create_secret.return_value
mock_retry, mock_timeout, mock_metadata = MagicMock(), MagicMock(), MagicMock()

Expand All @@ -83,6 +132,7 @@ def test_create_secret(self, mock_client, input_secret, expected_secret):

assert actual_response == expected_response
mock_client.assert_called_once()
mock_get_parent.assert_called_once_with(GCP_PROJECT_ID_HOOK_UNIT_TEST, None)
mock_client.return_value.create_secret.assert_called_once_with(
request={
"parent": expected_parent,
Expand All @@ -94,9 +144,10 @@ def test_create_secret(self, mock_client, input_secret, expected_secret):
metadata=mock_metadata,
)

@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook._get_secret_path")
@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook.client", new_callable=PropertyMock)
def test_add_secret_version(self, mock_client):
expected_parent = f"projects/{GCP_PROJECT_ID_HOOK_UNIT_TEST}/secrets/{SECRET_ID}"
def test_add_secret_version(self, mock_client, mock_get_secret_path):
expected_parent = mock_get_secret_path.return_value
expected_response = mock_client.return_value.add_secret_version.return_value
mock_payload, mock_retry, mock_timeout, mock_metadata = (MagicMock() for _ in range(4))

Expand All @@ -111,6 +162,7 @@ def test_add_secret_version(self, mock_client):

assert actual_response == expected_response
mock_client.assert_called_once()
mock_get_secret_path.assert_called_once_with(GCP_PROJECT_ID_HOOK_UNIT_TEST, SECRET_ID, None)
mock_client.return_value.add_secret_version.assert_called_once_with(
request={
"parent": expected_parent,
Expand All @@ -121,9 +173,10 @@ def test_add_secret_version(self, mock_client):
metadata=mock_metadata,
)

@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook._get_parent")
@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook.client", new_callable=PropertyMock)
def test_list_secrets(self, mock_client):
expected_parent = f"projects/{GCP_PROJECT_ID_HOOK_UNIT_TEST}"
def test_list_secrets(self, mock_client, mock_get_parent):
expected_parent = mock_get_parent.return_value
expected_response = mock_client.return_value.list_secrets.return_value
mock_filter, mock_retry, mock_timeout, mock_metadata = (MagicMock() for _ in range(4))
page_size, page_token = 20, "test-page-token"
Expand All @@ -140,6 +193,7 @@ def test_list_secrets(self, mock_client):

assert actual_response == expected_response
mock_client.assert_called_once()
mock_get_parent.assert_called_once_with(GCP_PROJECT_ID_HOOK_UNIT_TEST, None)
mock_client.return_value.list_secrets.assert_called_once_with(
request={
"parent": expected_parent,
Expand Down Expand Up @@ -184,7 +238,10 @@ def test_secret_exists(
assert secret_exists_actual == secret_exists_expected
mock_client.return_value.secret_path.assert_called_once_with(GCP_PROJECT_ID_HOOK_UNIT_TEST, secret_id)
mock_list_secrets.assert_called_once_with(
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST, page_size=100, secret_filter=secret_filter
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
page_size=100,
secret_filter=secret_filter,
location=None,
)

@patch(f"{SECRETS_HOOK_PACKAGE}GoogleCloudSecretManagerHook.client", new_callable=PropertyMock)
Expand Down