Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to specify which connection, variable or config are being looked up in the backend using *_lookup_pattern parameters #29580

Merged
merged 6 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions airflow/providers/amazon/aws/secrets/secrets_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import annotations

import json
import re
import warnings
from typing import Any
from urllib.parse import unquote
Expand All @@ -41,13 +42,16 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections"}

For example, if secrets prefix is ``airflow/connections/smtp_default``, this would be accessible
if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``.
If variables prefix is ``airflow/variables/hello``, this would be accessible
if you provide ``{"variables_prefix": "airflow/variables"}`` and request variable key ``hello``.
And if config_prefix is ``airflow/config/sql_alchemy_conn``, this would be accessible
if you provide ``{"config_prefix": "airflow/config"}`` and request config
key ``sql_alchemy_conn``.
For example, when ``{"connections_prefix": "airflow/connections"}`` is set, if a secret is defined with
the path ``airflow/connections/smtp_default``, the connection with conn_id ``smtp_default`` would be
accessible.

When ``{"variables_prefix": "airflow/variables"}`` is set, if a secret is defined with
the path ``airflow/variables/hello``, the variable with the name ``hello`` would be accessible.

When ``{"config_prefix": "airflow/config"}`` set, if a secret is defined with
the path ``airflow/config/sql_alchemy_conn``, the config with they ``sql_alchemy_conn`` would be
accessible.

You can also pass additional keyword arguments listed in AWS Connection Extra config
to this class, and they would be used for establishing a connection and passed on to Boto3 client.
Expand Down Expand Up @@ -84,12 +88,24 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
:param connections_prefix: Specifies the prefix of the secret to read to get Connections.
If set to None (null value in the configuration), requests for connections will not be
sent to AWS Secrets Manager. If you don't want a connections_prefix, set it as an empty string
:param connections_lookup_pattern: Specifies a pattern the connection ID needs to match to be looked up in
AWS Secrets Manager. Applies only if `connections_prefix` is not None.
If set to None (null value in the configuration), all connections will be looked up first in
AWS Secrets Manager.
Comment on lines +91 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this idea. Cool feature add on!

:param variables_prefix: Specifies the prefix of the secret to read to get Variables.
If set to None (null value in the configuration), requests for variables will not be sent to
AWS Secrets Manager. If you don't want a variables_prefix, set it as an empty string
:param variables_lookup_pattern: Specifies a pattern the variable key needs to match to be looked up in
AWS Secrets Manager. Applies only if `variables_prefix` is not None.
If set to None (null value in the configuration), all variables will be looked up first in
AWS Secrets Manager.
:param config_prefix: Specifies the prefix of the secret to read to get Configurations.
If set to None (null value in the configuration), requests for configurations will not be sent to
AWS Secrets Manager. If you don't want a config_prefix, set it as an empty string
:param config_lookup_pattern: Specifies a pattern the config key needs to match to be looked up in
AWS Secrets Manager. Applies only if `config_prefix` is not None.
If set to None (null value in the configuration), all config keys will be looked up first in
AWS Secrets Manager.
:param sep: separator used to concatenate secret_prefix and secret_id. Default: "/"
:param extra_conn_words: for using just when you set full_url_mode as false and store
the secrets in different fields of secrets manager. You can add more words for each connection
Expand All @@ -101,8 +117,11 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
def __init__(
self,
connections_prefix: str = "airflow/connections",
connections_lookup_pattern: str | None = None,
variables_prefix: str = "airflow/variables",
variables_lookup_pattern: str | None = None,
config_prefix: str = "airflow/config",
config_lookup_pattern: str | None = None,
sep: str = "/",
extra_conn_words: dict[str, list[str]] | None = None,
**kwargs,
Expand All @@ -120,6 +139,9 @@ def __init__(
self.config_prefix = config_prefix.rstrip(sep)
else:
self.config_prefix = config_prefix
self.connections_lookup_pattern = connections_lookup_pattern
self.variables_lookup_pattern = variables_lookup_pattern
self.config_lookup_pattern = config_lookup_pattern
self.sep = sep

if kwargs.pop("full_url_mode", None) is not None:
Expand Down Expand Up @@ -223,7 +245,7 @@ def get_conn_value(self, conn_id: str) -> str | None:
if self.connections_prefix is None:
return None

secret = self._get_secret(self.connections_prefix, conn_id)
secret = self._get_secret(self.connections_prefix, conn_id, self.connections_lookup_pattern)

if secret is not None and secret.strip().startswith("{"):
# Before Airflow 2.3, the AWS SecretsManagerBackend added support for JSON secrets.
Expand Down Expand Up @@ -264,14 +286,14 @@ def get_conn_uri(self, conn_id: str) -> str | None:

def get_variable(self, key: str) -> str | None:
"""
Get Airflow Variable from Environment Variable
Get Airflow Variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some more description here. If we're getting this variable from multiple places maybe we should be clear about the options?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the context of this class, we are actually getting it from only one location: AWS Secrets Manager. If this function returns None, then it is fetched from other location: Environment variable then metastore but this is done outside of this class. See documentation here. Let me know if you still think I should update the documentation

:param key: Variable Key
:return: Variable Value
"""
if self.variables_prefix is None:
return None

return self._get_secret(self.variables_prefix, key)
return self._get_secret(self.variables_prefix, key, self.variables_lookup_pattern)

def get_config(self, key: str) -> str | None:
"""
Expand All @@ -282,14 +304,19 @@ def get_config(self, key: str) -> str | None:
if self.config_prefix is None:
return None

return self._get_secret(self.config_prefix, key)
return self._get_secret(self.config_prefix, key, self.config_lookup_pattern)

def _get_secret(self, path_prefix, secret_id: str) -> str | None:
def _get_secret(self, path_prefix, secret_id: str, lookup_pattern: str | None) -> str | None:
"""
Get secret value from Secrets Manager
:param path_prefix: Prefix for the Path to get Secret
:param secret_id: Secret Key
:param lookup_pattern: If provided, `secret_id` must match this pattern to look up the secret in
Secrets Manager
"""
if lookup_pattern and not re.match(lookup_pattern, secret_id, re.IGNORECASE):
return None

error_msg = "An error occurred when calling the get_secret_value operation"
if path_prefix:
secrets_path = self.build_path(path_prefix, secret_id, self.sep)
Expand Down
49 changes: 43 additions & 6 deletions airflow/providers/amazon/aws/secrets/systems_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Objects relating to sourcing connections from AWS SSM Parameter Store"""
from __future__ import annotations

import re
import warnings

from airflow.compat.functools import cached_property
Expand All @@ -41,14 +42,26 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
For example, if ssm path is ``/airflow/connections/smtp_default``, this would be accessible
if you provide ``{"connections_prefix": "/airflow/connections"}`` and request conn_id ``smtp_default``.
And if ssm path is ``/airflow/variables/hello``, this would be accessible
if you provide ``{"variables_prefix": "/airflow/variables"}`` and request conn_id ``hello``.
if you provide ``{"variables_prefix": "/airflow/variables"}`` and variable key ``hello``.

:param connections_prefix: Specifies the prefix of the secret to read to get Connections.
If set to None (null), requests for connections will not be sent to AWS SSM Parameter Store.
:param connections_lookup_pattern: Specifies a pattern the connection ID needs to match to be looked up in
AWS Parameter Store. Applies only if `connections_prefix` is not None.
If set to None (null value in the configuration), all connections will be looked up first in
AWS Parameter Store.
:param variables_prefix: Specifies the prefix of the secret to read to get Variables.
If set to None (null), requests for variables will not be sent to AWS SSM Parameter Store.
:param variables_lookup_pattern: Specifies a pattern the variable key needs to match to be looked up in
AWS Parameter Store. Applies only if `variables_prefix` is not None.
If set to None (null value in the configuration), all variables will be looked up first in
AWS Parameter Store.
:param config_prefix: Specifies the prefix of the secret to read to get Variables.
If set to None (null), requests for configurations will not be sent to AWS SSM Parameter Store.
:param config_lookup_pattern: Specifies a pattern the config key needs to match to be looked up in
AWS Parameter Store. Applies only if `config_prefix` is not None.
If set to None (null value in the configuration), all config keys will be looked up first in
AWS Parameter Store.

You can also pass additional keyword arguments listed in AWS Connection Extra config
to this class, and they would be used for establish connection and passed on to Boto3 client.
Expand All @@ -67,8 +80,11 @@ class SystemsManagerParameterStoreBackend(BaseSecretsBackend, LoggingMixin):
def __init__(
self,
connections_prefix: str = "/airflow/connections",
connections_lookup_pattern: str | None = None,
variables_prefix: str = "/airflow/variables",
variables_lookup_pattern: str | None = None,
config_prefix: str = "/airflow/config",
config_lookup_pattern: str | None = None,
**kwargs,
):
super().__init__()
Expand All @@ -85,6 +101,9 @@ def __init__(
else:
self.config_prefix = config_prefix

self.connections_lookup_pattern = connections_lookup_pattern
self.variables_lookup_pattern = variables_lookup_pattern
self.config_lookup_pattern = config_lookup_pattern
self.profile_name = kwargs.get("profile_name", None)
# Remove client specific arguments from kwargs
self.api_version = kwargs.pop("api_version", None)
Expand Down Expand Up @@ -122,7 +141,7 @@ def get_conn_value(self, conn_id: str) -> str | None:
if self.connections_prefix is None:
return None

return self._get_secret(self.connections_prefix, conn_id)
return self._get_secret(self.connections_prefix, conn_id, self.connections_lookup_pattern)

def get_conn_uri(self, conn_id: str) -> str | None:
"""
Expand All @@ -146,15 +165,15 @@ def get_conn_uri(self, conn_id: str) -> str | None:

def get_variable(self, key: str) -> str | None:
"""
Get Airflow Variable from Environment Variable
Get Airflow Variable

:param key: Variable Key
:return: Variable Value
"""
if self.variables_prefix is None:
return None

return self._get_secret(self.variables_prefix, key)
return self._get_secret(self.variables_prefix, key, self.variables_lookup_pattern)

def get_config(self, key: str) -> str | None:
"""
Expand All @@ -166,19 +185,37 @@ def get_config(self, key: str) -> str | None:
if self.config_prefix is None:
return None

return self._get_secret(self.config_prefix, key)
return self._get_secret(self.config_prefix, key, self.config_lookup_pattern)

def _get_secret(self, path_prefix: str, secret_id: str) -> str | None:
def _get_secret(self, path_prefix: str, secret_id: str, lookup_pattern: str | None) -> str | None:
"""
Get secret value from Parameter Store.

:param path_prefix: Prefix for the Path to get Secret
:param secret_id: Secret Key
:param lookup_pattern: If provided, `secret_id` must match this pattern to look up the secret in
Systems Manager
"""
if lookup_pattern and not re.match(lookup_pattern, secret_id, re.IGNORECASE):
return None

ssm_path = self.build_path(path_prefix, secret_id)
ssm_path = self._ensure_leading_slash(ssm_path)

try:
response = self.client.get_parameter(Name=ssm_path, WithDecryption=True)
return response["Parameter"]["Value"]
except self.client.exceptions.ParameterNotFound:
self.log.debug("Parameter %s not found.", ssm_path)
return None

def _ensure_leading_slash(self, ssm_path: str):
"""
AWS Systems Manager mandate to have a leading "/". Adding it dynamically if not there to the SSM path

:param ssm_path: SSM parameter path
"""
if not ssm_path.startswith("/"):
ssm_path = f"/{ssm_path}"

return ssm_path
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ Here is a sample configuration:

[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables", "profile_name": "default"}
backend_kwargs = {
"connections_prefix": "airflow/connections",
"connections_lookup_pattern": null,
"variables_prefix": "airflow/variables",
"variables_lookup_pattern": null,
"config_prefix": "airflow/config",
"config_lookup_pattern": null,
"profile_name": "default"
}

To authenticate you can either supply arguments listed in
:ref:`Amazon Webservices Connection Extra config <howto/connection:aws:configuring-the-connection>` or set
Expand All @@ -38,7 +46,12 @@ To authenticate you can either supply arguments listed in

[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables", "role_arn": "arn:aws:iam::123456789098:role/role-name"}
backend_kwargs = {
"connections_prefix": "airflow/connections",
"variables_prefix": "airflow/variables",
"config_prefix": "airflow/config",
"role_arn": "arn:aws:iam::123456789098:role/role-name"
}


Storing and Retrieving Connections
Expand Down Expand Up @@ -107,7 +120,7 @@ If you don't want to use any ``connections_prefix`` for retrieving connections,
Storing and Retrieving Variables
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

If you have set ``variables_prefix`` as ``airflow/variables``, then for an Variable key of ``hello``,
If you have set ``variables_prefix`` as ``airflow/variables``, then for a Variable key of ``hello``,
you would want to store your Variable at ``airflow/variables/hello``.

Optional lookup
Expand All @@ -118,13 +131,33 @@ This will prevent requests being sent to AWS Secrets Manager for the excluded ty

If you want to look up some and not others in AWS Secrets Manager you may do so by setting the relevant ``*_prefix`` parameter of the ones to be excluded as ``null``.

For example, if you want to set parameter ``connections_prefix`` to ``"airflow/connections"`` and not look up variables, your configuration file should look like this:
For example, if you want to set parameter ``connections_prefix`` to ``"airflow/connections"`` and not look up variables and config, your configuration file should look like this:

.. code-block:: ini

[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": null, "profile_name": "default"}
backend_kwargs = {
"connections_prefix": "airflow/connections",
"variables_prefix": null,
"config_prefix": null,
"profile_name": "default"
}

If you want to only lookup a specific subset of connections, variables or config in AWS Secrets Manager, you may do so by setting the relevant ``*_lookup_pattern`` parameter.
This parameter takes a Regex as a string as value.

For example, if you want to only lookup connections starting by "m" in AWS Secrets Manager, your configuration file should look like this:

.. code-block:: ini

[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
"connections_prefix": "airflow/connections",
"connections_lookup_pattern": "^m",
"profile_name": "default"
}

Example of storing Google Secrets in AWS Secrets Manager
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,15 @@ Here is a sample configuration:

[secrets]
backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs = {"connections_prefix": "/airflow/connections", "variables_prefix": "/airflow/variables", "profile_name": "default"}
backend_kwargs = {
"connections_prefix": "airflow/connections",
"connections_lookup_pattern": null,
"variables_prefix": "airflow/variables",
"variables_lookup_pattern": null,
"config_prefix": "airflow/config",
"config_lookup_pattern": null,
"profile_name": "default"
}

To authenticate you can either supply arguments listed in
:ref:`Amazon Webservices Connection Extra config <howto/connection:aws:configuring-the-connection>` or set
Expand All @@ -39,7 +47,12 @@ To authenticate you can either supply arguments listed in

[secrets]
backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables", "role_arn": "arn:aws:iam::123456789098:role/role-name"}
backend_kwargs = {
"connections_prefix": "airflow/connections",
"variables_prefix": "airflow/variables",
"config_prefix": "airflow/config",
"role_arn": "arn:aws:iam::123456789098:role/role-name"
}


Optional lookup
Expand All @@ -50,13 +63,33 @@ This will prevent requests being sent to AWS SSM Parameter Store for the exclude

If you want to look up some and not others in AWS SSM Parameter Store you may do so by setting the relevant ``*_prefix`` parameter of the ones to be excluded as ``null``.

For example, if you want to set parameter ``connections_prefix`` to ``"/airflow/connections"`` and not look up variables, your configuration file should look like this:
For example, if you want to set parameter ``connections_prefix`` to ``"airflow/connections"`` and not look up variables and config, your configuration file should look like this:

.. code-block:: ini

[secrets]
backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs = {"connections_prefix": "/airflow/connections", "variables_prefix": null, "profile_name": "default"}
backend_kwargs = {
"connections_prefix": "airflow/connections",
"variables_prefix": null,
"config_prefix": null,
"profile_name": "default"
}

If you want to only lookup a specific subset of connections, variables or config in AWS Secrets Manager, you may do so by setting the relevant ``*_lookup_pattern`` parameter.
This parameter takes a Regex as a string as value.

For example, if you want to only lookup connections starting by "m" in AWS Secrets Manager, your configuration file should look like this:

.. code-block:: ini

[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
"connections_prefix": "airflow/connections",
"connections_lookup_pattern": "^m",
"profile_name": "default"
}

Storing and Retrieving Connections
""""""""""""""""""""""""""""""""""
Expand Down
Loading